Files
desungongpai/app/services/tcp_command_service.py
default d54e53e0b7 feat: KKS P240/P241 蓝牙工牌管理系统初始提交
FastAPI + SQLAlchemy + asyncio TCP 服务器,支持设备管理、实时定位、
告警、考勤打卡、蓝牙记录、指令下发、TTS语音播报等功能。
2026-03-27 10:19:34 +00:00

32 lines
995 B
Python

"""
TCP Command Service — Abstraction layer for sending commands to devices via TCP.
Breaks the circular import between routers/commands.py and tcp_server.py
by lazily importing tcp_manager only when needed.
"""
import logging
logger = logging.getLogger(__name__)
def _get_tcp_manager():
"""Lazily import tcp_manager to avoid circular imports."""
from app.tcp_server import tcp_manager
return tcp_manager
def is_device_online(imei: str) -> bool:
"""Check if a device is currently connected via TCP."""
return imei in _get_tcp_manager().connections
async def send_command(imei: str, command_type: str, command_content: str) -> bool:
"""Send an online command (0x80) to a connected device."""
return await _get_tcp_manager().send_command(imei, command_type, command_content)
async def send_message(imei: str, message: str) -> bool:
"""Send a text message (0x82) to a connected device."""
return await _get_tcp_manager().send_message(imei, message)