""" KKS Bluetooth Badge TCP Server Asyncio-based TCP server that manages persistent connections with KKS badge devices. Each device authenticates via a login packet containing its IMEI, after which the server routes incoming packets to protocol-specific handlers, persists telemetry/alarm/attendance data, and exposes helper methods that the REST API can call to push commands or messages to connected devices. """ from __future__ import annotations import asyncio import logging import struct from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional, Tuple from sqlalchemy import select, update from app.config import settings from app.database import async_session from app.geocoding import geocode_location, reverse_geocode from app.websocket_manager import ws_manager from app.models import ( AlarmRecord, AttendanceRecord, BeaconConfig, BluetoothRecord, CommandLog, Device, HeartbeatRecord, LocationRecord, ) from app.protocol.constants import ( ALARM_TYPES, PROTOCOL_NAMES, PROTO_ADDRESS_QUERY, PROTO_ALARM_ACK, PROTO_ALARM_LBS_4G, PROTO_ALARM_MULTI_FENCE, PROTO_ALARM_SINGLE_FENCE, PROTO_ALARM_WIFI, PROTO_ATTENDANCE, PROTO_ATTENDANCE_4G, PROTO_BT_LOCATION, PROTO_BT_PUNCH, PROTO_GENERAL_INFO, PROTO_GPS, PROTO_GPS_4G, PROTO_HEARTBEAT, PROTO_HEARTBEAT_EXT, PROTO_LBS_4G, PROTO_LBS_4G_ADDRESS_REQ, PROTO_LBS_ADDRESS_REQ, PROTO_LBS_MULTI, PROTO_LBS_MULTI_REPLY, PROTO_LOGIN, PROTO_MESSAGE, PROTO_ONLINE_CMD, PROTO_ONLINE_CMD_REPLY, PROTO_TIME_SYNC, PROTO_TIME_SYNC_2, PROTO_ADDRESS_REPLY_EN, PROTO_WIFI, PROTO_WIFI_4G, START_MARKER_LONG, START_MARKER_SHORT, STOP_MARKER, ) from app.protocol.crc import crc_itu logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Lightweight packet parser / builder embedded here so the module is # self-contained. These have a TCP-buffering-friendly interface (returning # remaining bytes) while staying consistent with the protocol module's # length/CRC semantics. # # Length field semantics (matching app.protocol.builder): # length = proto(1) + info_content(N) + serial(2) + crc(2) # # Total packet: # short: start(2) + length_field(1) + + stop(2) # long: start(2) + length_field(2) + + stop(2) # --------------------------------------------------------------------------- class PacketParser: """Extract and parse complete KKS protocol packets from a byte stream.""" @staticmethod def find_packets(buffer: bytes) -> Tuple[list, bytes]: """Scan *buffer* for complete packets. Returns ------- (packets, remaining) *packets* is a list of dicts, each with keys: start_marker, length, protocol, serial, content, raw *remaining* is the unconsumed tail of *buffer*. """ packets: list = [] pos = 0 while pos < len(buffer): # Look for a start marker idx_short = buffer.find(START_MARKER_SHORT, pos) idx_long = buffer.find(START_MARKER_LONG, pos) # Pick whichever marker comes first; -1 means not found candidates = [] if idx_short != -1: candidates.append((idx_short, "short")) if idx_long != -1: candidates.append((idx_long, "long")) if not candidates: break # no more markers candidates.sort(key=lambda c: c[0]) idx, marker_type = candidates[0] if idx > pos: # Skip garbage bytes before the marker logger.debug("Skipping %d garbage bytes before marker", idx - pos) pos = idx if marker_type == "short": # 78 78 LEN PROTO INFO... SERIAL(2) CRC(2) 0D 0A # length field = proto(1) + info(N) + serial(2) + crc(2) header_size = 3 # marker(2) + length(1) if pos + header_size > len(buffer): break # need more data for header pkt_len = buffer[pos + 2] # total = start(2) + len_field(1) + pkt_len + stop(2) total = 2 + 1 + pkt_len + 2 if pos + total > len(buffer): break # incomplete packet raw = buffer[pos : pos + total] # Validate stop marker if raw[-2:] != STOP_MARKER: logger.warning( "Invalid stop marker at pos %d, skipping byte", pos ) pos += 1 continue # payload region (between length byte and stop marker) # = proto(1) + info(N) + serial(2) + crc(2) payload = raw[3 : -2] # everything between len_field and stop protocol = payload[0] serial = struct.unpack("!H", payload[-4:-2])[0] content = payload[1:-4] # info_content between protocol and serial packets.append( { "start_marker": "short", "length": pkt_len, "protocol": protocol, "serial": serial, "content": content, "raw": raw, } ) pos += total else: # 79 79 LEN_H LEN_L PROTO INFO... SERIAL(2) CRC(2) 0D 0A header_size = 4 # marker(2) + length(2) if pos + header_size > len(buffer): break pkt_len = struct.unpack("!H", buffer[pos + 2 : pos + 4])[0] # total = start(2) + len_field(2) + pkt_len + stop(2) total = 2 + 2 + pkt_len + 2 if pos + total > len(buffer): break raw = buffer[pos : pos + total] if raw[-2:] != STOP_MARKER: logger.warning( "Invalid stop marker (long) at pos %d, skipping byte", pos ) pos += 1 continue payload = raw[4 : -2] protocol = payload[0] serial = struct.unpack("!H", payload[-4:-2])[0] content = payload[1:-4] packets.append( { "start_marker": "long", "length": pkt_len, "protocol": protocol, "serial": serial, "content": content, "raw": raw, } ) pos += total remaining = buffer[pos:] return packets, remaining class PacketBuilder: """Thin wrapper delegating to app.protocol.builder.PacketBuilder. Preserves the (protocol, payload, serial) call signature used throughout tcp_server.py. """ from app.protocol.builder import PacketBuilder as _ProtoBuilder @staticmethod def build_response( protocol: int, payload: bytes, serial: int, *, long: bool = False ) -> bytes: return PacketBuilder._ProtoBuilder.build_response(protocol, serial, payload) # --------------------------------------------------------------------------- # Connection information container # --------------------------------------------------------------------------- class ConnectionInfo: """Metadata about a single device TCP connection.""" __slots__ = ("imei", "addr", "connected_at", "last_activity", "serial_counter") def __init__(self, addr: Tuple[str, int]) -> None: self.imei: Optional[str] = None self.addr = addr self.connected_at = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) self.last_activity = self.connected_at self.serial_counter: int = 1 def next_serial(self) -> int: val = self.serial_counter self.serial_counter = (self.serial_counter + 1) & 0xFFFF return val # --------------------------------------------------------------------------- # Helper: look up device_id from IMEI # --------------------------------------------------------------------------- async def _get_device_id(session, imei: str) -> Optional[int]: """Query the Device table and return the integer id for the given IMEI.""" result = await session.execute( select(Device.id).where(Device.imei == imei) ) row = result.scalar_one_or_none() return row # --------------------------------------------------------------------------- # TCPManager -- the core singleton # --------------------------------------------------------------------------- class TCPManager: """Manages all badge device TCP connections and message routing.""" def __init__(self) -> None: # {imei: (reader, writer, connection_info)} self.connections: Dict[str, Tuple[asyncio.StreamReader, asyncio.StreamWriter, ConnectionInfo]] = {} self._server: Optional[asyncio.AbstractServer] = None # Protocol number -> handler coroutine mapping self._handlers: Dict[int, Any] = { PROTO_LOGIN: self.handle_login, PROTO_HEARTBEAT: self.handle_heartbeat, PROTO_HEARTBEAT_EXT: self.handle_heartbeat, PROTO_GPS: self.handle_gps, PROTO_GPS_4G: self.handle_gps_4g, PROTO_LBS_MULTI: self.handle_lbs_multi, PROTO_LBS_MULTI_REPLY: self.handle_lbs_multi, PROTO_LBS_4G: self.handle_lbs_4g, PROTO_WIFI: self.handle_wifi, PROTO_WIFI_4G: self.handle_wifi_4g, PROTO_TIME_SYNC: self.handle_time_sync, PROTO_TIME_SYNC_2: self.handle_time_sync_2, PROTO_ALARM_SINGLE_FENCE: self.handle_alarm_single_fence, PROTO_ALARM_MULTI_FENCE: self.handle_alarm_multi_fence, PROTO_ALARM_LBS_4G: self.handle_alarm_lbs, PROTO_ALARM_WIFI: self.handle_alarm_wifi, PROTO_LBS_ADDRESS_REQ: self.handle_lbs_address_req, PROTO_ADDRESS_QUERY: self.handle_address_query, PROTO_LBS_4G_ADDRESS_REQ: self.handle_lbs_4g_address_req, PROTO_ATTENDANCE: self.handle_attendance, PROTO_ATTENDANCE_4G: self.handle_attendance_4g, PROTO_BT_PUNCH: self.handle_bt_punch, PROTO_BT_LOCATION: self.handle_bt_location, PROTO_GENERAL_INFO: self.handle_general_info, PROTO_ONLINE_CMD_REPLY: self.handle_online_cmd_reply, } # ------------------------------------------------------------------ # Connection lifecycle # ------------------------------------------------------------------ async def _handle_connection( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: """Callback for each new TCP connection.""" addr = writer.get_extra_info("peername") conn_info = ConnectionInfo(addr) logger.info("New TCP connection from %s:%d", addr[0], addr[1]) recv_buffer = b"" try: while True: data = await reader.read(4096) if not data: logger.info( "Connection closed by remote %s:%d (IMEI=%s)", addr[0], addr[1], conn_info.imei, ) break recv_buffer += data conn_info.last_activity = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) logger.info("Received %d bytes from %s:%d (IMEI=%s): %s", len(data), addr[0], addr[1], conn_info.imei, data[:50].hex()) packets, recv_buffer = PacketParser.find_packets(recv_buffer) for pkt in packets: proto = pkt["protocol"] proto_name = PROTOCOL_NAMES.get(proto, f"0x{proto:02X}") logger.debug( "Packet from IMEI=%s proto=%s serial=%d len=%d", conn_info.imei, proto_name, pkt["serial"], pkt["length"], ) handler = self._handlers.get(proto) if handler is not None: try: await handler(pkt, reader, writer, conn_info) except Exception: logger.exception( "Error handling proto %s for IMEI=%s", proto_name, conn_info.imei, ) else: logger.warning( "No handler for protocol 0x%02X from IMEI=%s", proto, conn_info.imei, ) # Safety: prevent unbounded buffer growth from garbage data if len(recv_buffer) > 65536: logger.warning( "Receive buffer overflow for IMEI=%s, discarding", conn_info.imei, ) recv_buffer = b"" except asyncio.CancelledError: logger.info( "Connection task cancelled for IMEI=%s (%s:%d)", conn_info.imei, addr[0], addr[1], ) except ConnectionResetError: logger.warning( "Connection reset by IMEI=%s (%s:%d)", conn_info.imei, addr[0], addr[1], ) except Exception: logger.exception( "Unexpected error on connection IMEI=%s (%s:%d)", conn_info.imei, addr[0], addr[1], ) finally: await self._cleanup_connection(conn_info, writer) async def _cleanup_connection( self, conn_info: ConnectionInfo, writer: asyncio.StreamWriter ) -> None: """Remove connection from tracking and update the device status.""" imei = conn_info.imei if imei and imei in self.connections: # Only remove if this is still the active connection (not replaced by reconnect) _, stored_writer, _ = self.connections[imei] if stored_writer is writer: del self.connections[imei] logger.info("Device IMEI=%s removed from active connections", imei) else: logger.info("Device IMEI=%s has reconnected, keeping new connection", imei) # Don't mark offline since device reconnected try: writer.close() await writer.wait_closed() except Exception: pass return # Mark device offline in DB try: async with async_session() as session: async with session.begin(): await session.execute( update(Device) .where(Device.imei == imei) .values(status="offline") ) # Broadcast device offline ws_manager.broadcast_nonblocking("device_status", { "imei": imei, "status": "offline", }) except Exception: logger.exception("Failed to set IMEI=%s offline in DB", imei) try: writer.close() await writer.wait_closed() except Exception: pass # ------------------------------------------------------------------ # Helper: send bytes to a writer # ------------------------------------------------------------------ @staticmethod async def _send(writer: asyncio.StreamWriter, data: bytes) -> None: writer.write(data) await writer.drain() # ------------------------------------------------------------------ # Helper: build and send an address reply placeholder # ------------------------------------------------------------------ async def _send_address_reply( self, protocol: int, serial: int, writer: asyncio.StreamWriter, address: Optional[str] = None, phone: str = "", is_alarm: bool = False, ) -> None: """Send address reply (0x17 Chinese / 0x97 English). Format: cmd_length + server_flag(4) + ADDRESS/ALARMSMS(7-8) + && + addr(UTF16BE) + && + phone(21) + ## """ server_flag = b"\x00\x00\x00\x00" marker = b"ALARMSMS" if is_alarm else b"ADDRESS" separator = b"&&" terminator = b"##" addr_text = address or "" addr_bytes = addr_text.encode("utf-16-be") # Phone field: 21 bytes ASCII, zero-padded phone_bytes = phone.encode("ascii", errors="ignore")[:21].ljust(21, b"0") # Content after cmd_length: server_flag + marker + && + addr + && + phone + ## inner = server_flag + marker + separator + addr_bytes + separator + phone_bytes + terminator # Protocol 0x97 (English) uses 2-byte cmd_length; 0x17 (Chinese) uses 1-byte if protocol == PROTO_ADDRESS_REPLY_EN: payload = struct.pack("!H", len(inner)) + inner else: # Chinese (0x17): 1-byte length cmd_len = min(len(inner), 0xFF) payload = bytes([cmd_len]) + inner use_long = len(payload) > 200 pkt = PacketBuilder.build_response(protocol, payload, serial, long=use_long) await self._send(writer, pkt) # ------------------------------------------------------------------ # Helper: parse GPS data from content # ------------------------------------------------------------------ @staticmethod def _parse_gps_from_content(content: bytes, offset: int = 6) -> Dict[str, Any]: """Parse GPS fields from packet content starting at *offset*. Expected layout at offset (12 bytes): gps_info_byte(1) + latitude(4) + longitude(4) + speed(1) + course_status(2) Returns a dict with latitude, longitude, speed, course, gps_satellites, gps_positioned, is_realtime. """ result: Dict[str, Any] = {} if len(content) < offset + 12: return result gps_byte = content[offset] satellites = gps_byte & 0x0F lat_raw = struct.unpack("!I", content[offset + 1 : offset + 5])[0] lon_raw = struct.unpack("!I", content[offset + 5 : offset + 9])[0] speed = content[offset + 9] course_status = struct.unpack("!H", content[offset + 10 : offset + 12])[0] # Decode course/status bits: # bit 13 (0x2000): real-time GPS # bit 12 (0x1000): GPS is positioned # bit 11 (0x0800): East longitude (0=West) # bit 10 (0x0400): North latitude (0=South) # bits 9-0: course (0-360) is_realtime = bool(course_status & 0x2000) gps_positioned = bool(course_status & 0x1000) is_west = bool(course_status & 0x0800) # bit 11: 0=East, 1=West is_north = bool(course_status & 0x0400) # bit 10: 0=South, 1=North course = course_status & 0x03FF latitude = lat_raw / 1_800_000.0 longitude = lon_raw / 1_800_000.0 # Apply hemisphere if not is_north: latitude = -latitude if is_west: longitude = -longitude result["latitude"] = latitude result["longitude"] = longitude result["speed"] = float(speed) result["course"] = float(course) result["gps_satellites"] = satellites result["gps_positioned"] = gps_positioned result["is_realtime"] = is_realtime return result # ------------------------------------------------------------------ # Helper: parse datetime from content (6 bytes YY MM DD HH MM SS) # ------------------------------------------------------------------ @staticmethod def _parse_datetime(content: bytes, offset: int = 0) -> Optional[datetime]: """Parse a 6-byte datetime field at *offset* (UTC) and return CST (UTC+8) naive datetime.""" if len(content) < offset + 6: return None yy, mo, dd, hh, mi, ss = struct.unpack_from("BBBBBB", content, offset) try: utc_dt = datetime(2000 + yy, mo, dd, hh, mi, ss, tzinfo=timezone.utc) # Convert to CST (UTC+8) and strip tzinfo for SQLite cst_dt = utc_dt + timedelta(hours=8) return cst_dt.replace(tzinfo=None) except ValueError: return None # ------------------------------------------------------------------ # Protocol Handlers # ------------------------------------------------------------------ async def handle_login( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle login packet (0x01). Content layout: IMEI (8 bytes BCD-encoded) + type_code (2 bytes) + timezone_language (2 bytes). """ content = pkt["content"] # IMEI is BCD-encoded in the first 8 bytes if len(content) < 8: logger.warning("Login packet too short (%d bytes)", len(content)) return # BCD-encoded IMEI: 8 bytes = 16 hex digits, first digit is padding 0 raw_hex = content[:8].hex() # Strip exactly one leading '0' (BCD padding) to get 15-digit IMEI # Use slicing instead of lstrip to avoid removing multiple leading zeros imei = raw_hex[1:] if raw_hex.startswith("0") else raw_hex conn_info.imei = imei # Close existing connection if device reconnects old_conn = self.connections.get(imei) if old_conn is not None: _, old_writer, old_info = old_conn logger.info("Closing stale connection for IMEI=%s (old %s:%d)", imei, old_info.addr[0], old_info.addr[1]) try: old_writer.close() except Exception: pass self.connections[imei] = (reader, writer, conn_info) logger.info( "Device login: IMEI=%s from %s:%d", imei, conn_info.addr[0], conn_info.addr[1] ) # Extract device type code (2 bytes after IMEI) type_code: Optional[int] = None device_type = "P240" # default if len(content) >= 10: type_code = struct.unpack("!H", content[8:10])[0] device_type = f"0x{type_code:04X}" # Extract timezone/language (2 bytes after device type) tz_str: Optional[str] = None lang_str: Optional[str] = None if len(content) >= 12: tz_lang = struct.unpack("!H", content[10:12])[0] # bit15-04: timezone value * 100, bit03: 0=East/1=West, bit00: language tz_raw = (tz_lang >> 4) & 0xFFF tz_val = tz_raw / 100.0 tz_west = bool(tz_lang & 0x08) if tz_west: tz_str = f"W{tz_val}" else: tz_str = f"E{tz_val}" lang_code = tz_lang & 0x03 lang_str = "zh" if lang_code == 1 else "en" if lang_code == 2 else str(lang_code) # Persist device record now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) try: async with async_session() as session: async with session.begin(): result = await session.execute( select(Device).where(Device.imei == imei) ) device = result.scalar_one_or_none() if device is None: device = Device( imei=imei, device_type=device_type, status="online", last_login=now, timezone=tz_str, language=lang_str, ) session.add(device) else: device.status = "online" device.last_login = now if tz_str: device.timezone = tz_str if lang_str: device.language = lang_str # Don't overwrite user-set device_type with raw hex code # Broadcast device online ws_manager.broadcast_nonblocking("device_status", { "imei": imei, "status": "online", }) except Exception: logger.exception("DB error during login for IMEI=%s", imei) # Send login response (empty payload) response = PacketBuilder.build_response(PROTO_LOGIN, b"", pkt["serial"]) await self._send(writer, response) async def handle_heartbeat( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle heartbeat (0x13) and extended heartbeat (0x36). Heartbeat content layout: terminal_info(1) + battery_level(1) + gsm_signal(1) + reserved(2) Extended heartbeat (0x36) layout: terminal_info(1) + battery_level(1) + gsm_signal(1) + language(2) + extension(N) Terminal info byte bits: bit 7: oil/electricity (0=connected, 1=disconnected) bit 6: GPS positioned (0=no, 1=yes) bits 5-4: work status bit 3: charging bit 2: ACC bit 1: armed bit 0: reserved """ content = pkt["content"] proto = pkt["protocol"] imei = conn_info.imei if not imei: logger.warning("Heartbeat received before login") return # Ensure device is tracked in active connections (e.g. after server restart) if imei not in self.connections: self.connections[imei] = (reader, writer, conn_info) logger.info("Device IMEI=%s re-registered via heartbeat", imei) terminal_info: int = 0 battery_level: Optional[int] = None gsm_signal: Optional[int] = None extension_data: Optional[dict] = None if len(content) >= 1: terminal_info = content[0] if len(content) >= 2: battery_level = content[1] # 0-100 percentage if len(content) >= 3: gsm_signal = content[2] # 0x00-0x04 # For extended heartbeat (0x36), parse language and extension modules if proto == PROTO_HEARTBEAT_EXT: ext_info: Dict[str, Any] = {} if len(content) >= 5: ext_info["language"] = struct.unpack("!H", content[3:5])[0] if len(content) > 5: ext_info["extension_raw"] = content[5:].hex() if ext_info: extension_data = ext_info now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) try: async with async_session() as session: async with session.begin(): device_id = await _get_device_id(session, imei) if device_id is None: logger.warning("Heartbeat for unknown IMEI=%s", imei) return # Update device record (also ensure status=online if heartbeat is coming in) await session.execute( update(Device) .where(Device.id == device_id) .values( status="online", battery_level=battery_level, gsm_signal=gsm_signal, last_heartbeat=now, ) ) # Store heartbeat record record = HeartbeatRecord( device_id=device_id, imei=conn_info.imei, protocol_number=proto, terminal_info=terminal_info, battery_level=battery_level if battery_level is not None else 0, gsm_signal=gsm_signal if gsm_signal is not None else 0, extension_data=extension_data, ) session.add(record) except Exception: logger.exception("DB error during heartbeat for IMEI=%s", imei) # Send heartbeat response response = PacketBuilder.build_response(pkt["protocol"], b"", pkt["serial"]) await self._send(writer, response) async def handle_gps( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle GPS data packet (0x22). No response required.""" await self._store_location(pkt, conn_info, location_type="gps") async def handle_gps_4g( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle GPS 4G data packet (0xA0). No response required.""" await self._store_location(pkt, conn_info, location_type="gps_4g") async def handle_lbs_multi( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle LBS multi-cell data (0x28 / 0x2E). No response for 0x28; send response for 0x2E. """ await self._store_location(pkt, conn_info, location_type="lbs") if pkt["protocol"] == PROTO_LBS_MULTI_REPLY: response = PacketBuilder.build_response( pkt["protocol"], b"", pkt["serial"] ) await self._send(writer, response) async def handle_lbs_4g( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle LBS 4G data (0xA1). No response.""" await self._store_location(pkt, conn_info, location_type="lbs_4g") async def handle_wifi( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle WIFI data (0x2C). No response.""" await self._store_location(pkt, conn_info, location_type="wifi") async def handle_wifi_4g( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle WIFI 4G data (0xA2). No response.""" await self._store_location(pkt, conn_info, location_type="wifi_4g") # -- Location storage helper ------------------------------------------------ async def _store_location( self, pkt: dict, conn_info: ConnectionInfo, location_type: str, ) -> Optional[str]: imei = conn_info.imei if not imei: logger.warning("Location data received before login (type=%s)", location_type) return None content = pkt["content"] proto = pkt["protocol"] now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) # Parse recorded_at from the 6-byte datetime at offset 0 recorded_at = self._parse_datetime(content, 0) or now # Attempt to extract lat/lon from GPS-type payloads latitude: Optional[float] = None longitude: Optional[float] = None speed: Optional[float] = None course: Optional[float] = None gps_satellites: Optional[int] = None gps_positioned: bool = False is_realtime: bool = True mcc: Optional[int] = None mnc: Optional[int] = None lac: Optional[int] = None cell_id: Optional[int] = None report_mode: Optional[int] = None mileage: Optional[float] = None if location_type in ("gps", "gps_4g") and len(content) >= 18: # datetime(6) + gps(12) + lbs fields + ... gps_data = self._parse_gps_from_content(content, offset=6) latitude = gps_data.get("latitude") longitude = gps_data.get("longitude") speed = gps_data.get("speed") course = gps_data.get("course") gps_satellites = gps_data.get("gps_satellites") gps_positioned = gps_data.get("gps_positioned", False) is_realtime = gps_data.get("is_realtime", True) # Parse LBS fields after GPS (offset 18) pos = 18 if location_type == "gps" and len(content) >= pos + 3: # MCC(2) + MNC(1 or 2) mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0] mnc_2byte = bool(mcc_raw & 0x8000) mcc = mcc_raw & 0x7FFF pos += 2 if mnc_2byte and len(content) >= pos + 2: mnc = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 elif len(content) >= pos + 1: mnc = content[pos] pos += 1 if len(content) >= pos + 2: lac = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 if len(content) >= pos + 3: cell_id = int.from_bytes(content[pos : pos + 3], "big") pos += 3 # skip acc(1) if len(content) >= pos + 1: pos += 1 if len(content) >= pos + 1: report_mode = content[pos] pos += 1 # skip realtime_upload(1) if len(content) >= pos + 1: pos += 1 if len(content) >= pos + 4: mileage = float(struct.unpack("!I", content[pos : pos + 4])[0]) pos += 4 elif location_type == "gps_4g" and len(content) >= pos + 3: mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0] mnc_2byte = bool(mcc_raw & 0x8000) mcc = mcc_raw & 0x7FFF pos += 2 if mnc_2byte and len(content) >= pos + 2: mnc = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 elif len(content) >= pos + 1: mnc = content[pos] pos += 1 if len(content) >= pos + 4: lac = struct.unpack("!I", content[pos : pos + 4])[0] pos += 4 if len(content) >= pos + 8: cell_id = struct.unpack("!Q", content[pos : pos + 8])[0] pos += 8 # skip acc(1) if len(content) >= pos + 1: pos += 1 if len(content) >= pos + 1: report_mode = content[pos] pos += 1 # skip realtime_upload(1) if len(content) >= pos + 1: pos += 1 if len(content) >= pos + 4: mileage = float(struct.unpack("!I", content[pos : pos + 4])[0]) pos += 4 elif location_type in ("lbs", "lbs_4g") and len(content) >= 9: # datetime(6) + mcc(2) + mnc(1-2) + stations... pos = 6 if len(content) >= pos + 2: mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0] mnc_2byte = bool(mcc_raw & 0x8000) mcc = mcc_raw & 0x7FFF pos += 2 if mnc_2byte and len(content) >= pos + 2: mnc = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 elif len(content) >= pos + 1: mnc = content[pos] pos += 1 # Main station LAC + cell_id if location_type == "lbs" and len(content) >= pos + 5: lac = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 cell_id = int.from_bytes(content[pos : pos + 3], "big") pos += 3 elif location_type == "lbs_4g" and len(content) >= pos + 12: lac = struct.unpack("!I", content[pos : pos + 4])[0] pos += 4 cell_id = struct.unpack("!Q", content[pos : pos + 8])[0] pos += 8 elif location_type in ("wifi", "wifi_4g") and len(content) >= 9: # Same header as LBS: datetime(6) + mcc(2) + mnc(1-2) + stations + wifi_aps pos = 6 if len(content) >= pos + 2: mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0] mnc_2byte = bool(mcc_raw & 0x8000) mcc = mcc_raw & 0x7FFF pos += 2 if mnc_2byte and len(content) >= pos + 2: mnc = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 elif len(content) >= pos + 1: mnc = content[pos] pos += 1 # Main station LAC + cell_id (4G format: LAC=4, CellID=8) if location_type == "wifi_4g" and len(content) >= pos + 12: lac = struct.unpack("!I", content[pos : pos + 4])[0] pos += 4 cell_id = struct.unpack("!Q", content[pos : pos + 8])[0] pos += 8 elif location_type == "wifi" and len(content) >= pos + 5: lac = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 cell_id = int.from_bytes(content[pos : pos + 3], "big") pos += 3 # --- Skip LBS/WiFi records with empty cell data (device hasn't acquired cells yet) --- if location_type in ("lbs", "lbs_4g", "wifi", "wifi_4g") and latitude is None: mcc_val = mcc & 0x7FFF if mcc else 0 if mcc_val == 0 and (lac is None or lac == 0) and (cell_id is None or cell_id == 0): logger.debug("Skipping empty LBS/WiFi packet for IMEI=%s (no cell data)", imei) return # --- Geocoding for LBS/WiFi locations (no GPS coordinates) --- neighbor_cells_data: Optional[list] = None wifi_data_list: Optional[list] = None if location_type in ("lbs", "lbs_4g", "wifi", "wifi_4g") and latitude is None: # Parse neighbor cells and WiFi APs from raw content for geocoding neighbor_cells_data, wifi_data_list = self._parse_extra_location_data( content, location_type ) try: lat, lon = await geocode_location( mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id, wifi_list=wifi_data_list, neighbor_cells=neighbor_cells_data, imei=imei, location_type=location_type, ) if lat is not None and lon is not None: latitude = lat longitude = lon logger.info( "Geocoded %s for IMEI=%s: lat=%.6f, lon=%.6f", location_type, imei, lat, lon, ) except Exception: logger.exception("Geocoding failed for %s IMEI=%s", location_type, imei) # --- Reverse geocoding: coordinates -> address --- address: Optional[str] = None if latitude is not None and longitude is not None: try: address = await reverse_geocode(latitude, longitude) except Exception: logger.exception("Reverse geocoding failed for IMEI=%s", imei) try: async with async_session() as session: async with session.begin(): device_id = await _get_device_id(session, imei) if device_id is None: logger.warning("Location for unknown IMEI=%s", imei) return record = LocationRecord( device_id=device_id, imei=conn_info.imei, location_type=location_type, latitude=latitude, longitude=longitude, speed=speed, course=course, gps_satellites=gps_satellites, gps_positioned=gps_positioned, mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id, neighbor_cells=neighbor_cells_data, wifi_data=wifi_data_list, report_mode=report_mode, is_realtime=is_realtime, mileage=mileage, address=address, raw_data=content.hex(), recorded_at=recorded_at, ) session.add(record) # Broadcast to WebSocket subscribers ws_manager.broadcast_nonblocking("location", { "imei": imei, "device_id": device_id, "location_type": location_type, "latitude": latitude, "longitude": longitude, "speed": speed, "address": address, "recorded_at": str(recorded_at), }) except Exception: logger.exception( "DB error storing %s location for IMEI=%s", location_type, imei ) # --- Fence auto-attendance check --- if settings.FENCE_CHECK_ENABLED and latitude is not None and longitude is not None: try: from app.services.fence_checker import check_device_fences async with async_session() as fence_session: async with fence_session.begin(): device_id_for_fence = device_id if device_id_for_fence is None: # Resolve device_id if not available from above device_id_for_fence = await _get_device_id(fence_session, imei) if device_id_for_fence is not None: fence_events = await check_device_fences( fence_session, device_id_for_fence, imei, latitude, longitude, location_type, address, recorded_at, mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id, ) for evt in fence_events: ws_manager.broadcast_nonblocking("fence_attendance", evt) ws_manager.broadcast_nonblocking("attendance", evt) except Exception: logger.exception("Fence check failed for IMEI=%s", imei) return address @staticmethod def _parse_extra_location_data( content: bytes, location_type: str ) -> tuple[Optional[list], Optional[list]]: """Parse neighbor cells and WiFi APs from LBS/WiFi raw content for geocoding.""" neighbor_cells: list[dict] = [] wifi_list: list[dict] = [] try: pos = 6 # skip datetime if len(content) < pos + 2: return (None, None) # Skip MCC/MNC mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0] mnc_2byte = bool(mcc_raw & 0x8000) pos += 2 if mnc_2byte: pos += 2 else: pos += 1 # Parse stations (main + up to 6 neighbors) is_4g = location_type in ("lbs_4g", "wifi_4g") lac_size = 4 if is_4g else 2 cid_size = 8 if is_4g else 3 station_size = lac_size + cid_size + 1 # +1 for RSSI for i in range(7): if len(content) < pos + station_size: break if is_4g: s_lac = struct.unpack("!I", content[pos : pos + 4])[0] pos += 4 s_cid = struct.unpack("!Q", content[pos : pos + 8])[0] pos += 8 else: s_lac = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 s_cid = int.from_bytes(content[pos : pos + 3], "big") pos += 3 s_rssi = content[pos] pos += 1 if i > 0 and (s_lac != 0 or s_cid != 0): neighbor_cells.append({ "lac": s_lac, "cell_id": s_cid, "rssi": s_rssi, }) # Skip timing_advance (1 byte) if len(content) > pos: pos += 1 # Parse WiFi APs (only for wifi types) if location_type in ("wifi", "wifi_4g") and len(content) > pos: wifi_count = content[pos] pos += 1 for _ in range(wifi_count): if len(content) < pos + 7: # MAC(6) + signal(1) break mac_bytes = content[pos : pos + 6] mac = ":".join(f"{b:02X}" for b in mac_bytes) pos += 6 signal = content[pos] pos += 1 wifi_list.append({"mac": mac, "signal": signal}) except Exception: pass return ( neighbor_cells if neighbor_cells else None, wifi_list if wifi_list else None, ) # -- Alarm parsing helpers ------------------------------------------------- @staticmethod def _parse_mcc_mnc_from_content( content: bytes, pos: int ) -> tuple[Optional[int], Optional[int], int]: """Parse MCC (2 bytes) + MNC (1 or 2 bytes) from content at pos. MCC high bit (0x8000) indicates MNC is 2 bytes instead of 1. Returns (mcc, mnc, new_pos). """ mcc: Optional[int] = None mnc: Optional[int] = None if len(content) >= pos + 2: mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0] mnc_2byte = bool(mcc_raw & 0x8000) mcc = mcc_raw & 0x7FFF pos += 2 if mnc_2byte and len(content) >= pos + 2: mnc = struct.unpack("!H", content[pos : pos + 2])[0] pos += 2 elif len(content) >= pos + 1: mnc = content[pos] pos += 1 return mcc, mnc, pos @staticmethod def _parse_alarm_tail( content: bytes, pos: int ) -> tuple[Optional[int], Optional[int], Optional[int], str, int]: """Parse the common alarm tail fields. Layout: terminal_info(1) + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1) Returns (terminal_info, battery_level, gsm_signal, alarm_type_name, new_pos). """ terminal_info: Optional[int] = None battery_level: Optional[int] = None gsm_signal: Optional[int] = None alarm_type_name = "unknown" if len(content) >= pos + 1: terminal_info = content[pos] pos += 1 if len(content) >= pos + 1: # voltage_level is 1 byte: 0x00-0x06 (level, not millivolts) voltage_level = content[pos] # Map voltage level to approximate battery percentage battery_level = min(voltage_level * 17, 100) if voltage_level <= 6 else None pos += 1 if len(content) >= pos + 1: gsm_signal = content[pos] pos += 1 if len(content) >= pos + 1: alarm_code = content[pos] alarm_type_name = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}") pos += 1 if len(content) >= pos + 1: # language byte - skip pos += 1 return terminal_info, battery_level, gsm_signal, alarm_type_name, pos # -- Time Sync handlers --------------------------------------------------- async def handle_time_sync( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle time sync request (0x1F). Protocol requires: 4-byte Unix timestamp + 2-byte language. Chinese (0x0001) uses GMT+8 timestamp, English (0x0002) uses GMT+0. """ content = pkt["content"] # Parse language from request (last 2 bytes of content) language = 0x0001 # default Chinese if len(content) >= 8: language = struct.unpack("!H", content[6:8])[0] now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) if language == 0x0001: # Chinese: use GMT+8 timestamp ts = int(now.timestamp()) + 8 * 3600 else: # English / other: use GMT+0 timestamp ts = int(now.timestamp()) # Payload: 4-byte Unix timestamp + 2-byte language payload = struct.pack("!IH", ts, language) response = PacketBuilder.build_response(PROTO_TIME_SYNC, payload, pkt["serial"]) await self._send(writer, response) logger.debug("Time sync response sent to IMEI=%s (ts=%d, lang=0x%04X)", conn_info.imei, ts, language) async def handle_time_sync_2( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle time sync 2 request (0x8A). Respond with YY MM DD HH MM SS.""" now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) payload = bytes( [ now.year % 100, now.month, now.day, now.hour, now.minute, now.second, ] ) response = PacketBuilder.build_response(PROTO_TIME_SYNC_2, payload, pkt["serial"]) await self._send(writer, response) logger.debug("Time sync 2 response sent to IMEI=%s", conn_info.imei) # -- Alarm handlers ------------------------------------------------------- async def _send_alarm_ack( self, serial: int, writer: asyncio.StreamWriter ) -> None: """Send alarm acknowledgement using protocol 0x26.""" pkt = PacketBuilder.build_response(PROTO_ALARM_ACK, b"", serial) await self._send(writer, pkt) async def _send_alarm_with_address( self, pkt: dict, writer: asyncio.StreamWriter, address: Optional[str] ) -> None: """Send 0x26 ACK + 0x17 Chinese address reply for alarm packets.""" await self._send_alarm_ack(pkt["serial"], writer) if address: await self._send_address_reply( PROTO_LBS_ADDRESS_REQ, pkt["serial"], writer, address=address, is_alarm=True, ) async def handle_alarm_single_fence( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle single-fence alarm (0xA3). Store, send 0x26 ACK + address.""" address = await self._store_alarm(pkt, conn_info, alarm_source="single_fence") await self._send_alarm_with_address(pkt, writer, address) async def handle_alarm_multi_fence( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle multi-fence alarm (0xA4). Store, send 0x26 ACK + address.""" address = await self._store_alarm(pkt, conn_info, alarm_source="multi_fence") await self._send_alarm_with_address(pkt, writer, address) async def handle_alarm_lbs( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle LBS alarm (0xA5). Store, send 0x26 ACK + address.""" address = await self._store_alarm(pkt, conn_info, alarm_source="lbs") await self._send_alarm_with_address(pkt, writer, address) async def handle_alarm_wifi( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle WIFI alarm (0xA9). Store, send 0x26 ACK + address.""" address = await self._store_alarm(pkt, conn_info, alarm_source="wifi") await self._send_alarm_with_address(pkt, writer, address) async def _store_alarm( self, pkt: dict, conn_info: ConnectionInfo, alarm_source: str, ) -> Optional[str]: imei = conn_info.imei if not imei: logger.warning("Alarm received before login (source=%s)", alarm_source) return None content = pkt["content"] proto = pkt["protocol"] now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) recorded_at = self._parse_datetime(content, 0) or now # Extract GPS data if present latitude: Optional[float] = None longitude: Optional[float] = None speed: Optional[float] = None course: Optional[float] = None mcc: Optional[int] = None mnc: Optional[int] = None lac: Optional[int] = None cell_id: Optional[int] = None battery_level: Optional[int] = None gsm_signal: Optional[int] = None wifi_data: Optional[list] = None fence_data: Optional[dict] = None # For alarm packets (0xA3, 0xA4, 0xA9), the terminal_info byte is # located after GPS + LBS data. Extract alarm type from terminal_info bits. alarm_type_name = "unknown" terminal_info: Optional[int] = None if proto in (PROTO_ALARM_SINGLE_FENCE, PROTO_ALARM_MULTI_FENCE): # 0xA3/0xA4: datetime(6) + gps(12) + lbs_length(1) + mcc(2) + mnc(1-2) # + lac(4) + cell_id(8) + terminal_info(1) + voltage_level(1) + gsm_signal(1) # + alarm_code(1) + language(1) [+ fence_id(1) for 0xA4] gps_data = self._parse_gps_from_content(content, offset=6) if gps_data.get("gps_positioned", False): latitude = gps_data.get("latitude") longitude = gps_data.get("longitude") speed = gps_data.get("speed") course = gps_data.get("course") pos = 18 # after datetime(6) + gps(12) if len(content) > pos: lbs_length = content[pos] pos += 1 mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos) if len(content) >= pos + 4: lac = struct.unpack("!I", content[pos : pos + 4])[0] pos += 4 if len(content) >= pos + 8: cell_id = struct.unpack("!Q", content[pos : pos + 8])[0] pos += 8 terminal_info, battery_level, gsm_signal, alarm_type_name, pos = \ self._parse_alarm_tail(content, pos) # Extract fence_id for 0xA4 multi-fence alarm if proto == PROTO_ALARM_MULTI_FENCE and len(content) >= pos + 1: fence_id = content[pos] fence_data = {"fence_id": fence_id} pos += 1 elif proto == PROTO_ALARM_LBS_4G: # 0xA5: NO datetime, NO GPS, NO lbs_length # mcc(2) + mnc(1-2) + lac(4) + cell_id(8) + terminal_info(1) # + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1) recorded_at = now # 0xA5 has no datetime field pos = 0 # content starts directly with MCC mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos) if len(content) >= pos + 4: lac = struct.unpack("!I", content[pos : pos + 4])[0] pos += 4 if len(content) >= pos + 8: cell_id = struct.unpack("!Q", content[pos : pos + 8])[0] pos += 8 terminal_info, battery_level, gsm_signal, alarm_type_name, pos = \ self._parse_alarm_tail(content, pos) elif proto == PROTO_ALARM_WIFI: # 0xA9: datetime(6) + mcc(2) + mnc(1-2) + cell_type(1) + cell_count(1) # + [lac + ci + rssi]*N + timing_advance(1) # + wifi_count(1) + [mac(6) + signal(1)]*N + alarm_code(1) + language(1) pos = 6 # after datetime mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos) # cell_type and cell_count cell_type = 0 # 0=2G, 1=4G cell_count = 0 if len(content) >= pos + 2: cell_type = content[pos] cell_count = content[pos + 1] pos += 2 # Parse cell stations for i in range(cell_count): if cell_type == 1: # 4G: LAC(4) + CI(8) + RSSI(1) if len(content) < pos + 13: break if i == 0: lac = struct.unpack("!I", content[pos : pos + 4])[0] cell_id = struct.unpack("!Q", content[pos + 4 : pos + 12])[0] gsm_signal = content[pos + 12] pos += 13 else: # 2G: LAC(2) + CI(3) + RSSI(1) if len(content) < pos + 6: break if i == 0: lac = struct.unpack("!H", content[pos : pos + 2])[0] cell_id = int.from_bytes(content[pos + 2 : pos + 5], "big") gsm_signal = content[pos + 5] pos += 6 # timing_advance(1) if len(content) >= pos + 1: pos += 1 # WiFi APs wifi_data_list = [] if len(content) >= pos + 1: wifi_count = content[pos] pos += 1 for _ in range(wifi_count): if len(content) < pos + 7: break mac = ":".join(f"{b:02X}" for b in content[pos : pos + 6]) signal = content[pos + 6] wifi_data_list.append({"mac": mac, "signal": signal}) pos += 7 if wifi_data_list: wifi_data = wifi_data_list # alarm_code(1) + language(1) if len(content) >= pos + 1: alarm_code = content[pos] alarm_type_name = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}") pos += 1 # --- Forward geocoding for LBS/WiFi alarms (no GPS coordinates) --- if latitude is None and mcc is not None and lac is not None and cell_id is not None: try: wifi_list_for_geocode = wifi_data_list if proto == PROTO_ALARM_WIFI else None alarm_is_4g = proto in (PROTO_ALARM_SINGLE_FENCE, PROTO_ALARM_MULTI_FENCE, PROTO_ALARM_LBS_4G) lat, lon = await geocode_location( mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id, wifi_list=wifi_list_for_geocode, imei=imei, location_type="lbs_4g" if alarm_is_4g else "lbs", ) if lat is not None and lon is not None: latitude = lat longitude = lon logger.info("Geocoded alarm for IMEI=%s: lat=%.6f, lon=%.6f", imei, lat, lon) except Exception: logger.exception("Geocoding failed for alarm IMEI=%s", imei) # --- Reverse geocoding for alarm location --- address: Optional[str] = None if latitude is not None and longitude is not None: try: address = await reverse_geocode(latitude, longitude) except Exception: logger.exception("Reverse geocoding failed for alarm IMEI=%s", imei) try: async with async_session() as session: async with session.begin(): device_id = await _get_device_id(session, imei) if device_id is None: logger.warning("Alarm for unknown IMEI=%s", imei) return record = AlarmRecord( device_id=device_id, imei=conn_info.imei, alarm_type=alarm_type_name, alarm_source=alarm_source, protocol_number=proto, latitude=latitude, longitude=longitude, speed=speed, course=course, mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id, battery_level=battery_level, gsm_signal=gsm_signal, address=address, wifi_data=wifi_data, fence_data=fence_data, recorded_at=recorded_at, ) session.add(record) # Broadcast alarm to WebSocket subscribers ws_manager.broadcast_nonblocking("alarm", { "imei": imei, "device_id": device_id, "alarm_type": alarm_type_name, "alarm_source": alarm_source, "latitude": latitude, "longitude": longitude, "address": address, "recorded_at": str(recorded_at), }) except Exception: logger.exception( "DB error storing alarm for IMEI=%s (source=%s)", imei, alarm_source ) return address # -- Address / LBS request handlers ---------------------------------------- @staticmethod def _extract_language_from_content(content: bytes) -> int: """Extract language field (last 2 bytes) from address request content.""" if len(content) >= 2: return struct.unpack("!H", content[-2:])[0] return 0x0001 # default Chinese async def handle_lbs_address_req( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle LBS address request (0x17). Store data, send address reply.""" address = await self._store_location(pkt, conn_info, location_type="lbs_address_req") lang = self._extract_language_from_content(pkt["content"]) reply_proto = PROTO_ADDRESS_REPLY_EN if lang == 0x0002 else PROTO_LBS_ADDRESS_REQ await self._send_address_reply(reply_proto, pkt["serial"], writer, address=address) async def handle_address_query( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle address query (0x1A). Store data, send address reply.""" address = await self._store_location(pkt, conn_info, location_type="address_query") lang = self._extract_language_from_content(pkt["content"]) reply_proto = PROTO_ADDRESS_REPLY_EN if lang == 0x0002 else PROTO_LBS_ADDRESS_REQ await self._send_address_reply(reply_proto, pkt["serial"], writer, address=address) async def handle_lbs_4g_address_req( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle LBS 4G address request (0xA7). Store data, send address reply.""" address = await self._store_location(pkt, conn_info, location_type="lbs_4g_address_req") lang = self._extract_language_from_content(pkt["content"]) reply_proto = PROTO_ADDRESS_REPLY_EN if lang == 0x0002 else PROTO_LBS_ADDRESS_REQ await self._send_address_reply(reply_proto, pkt["serial"], writer, address=address) # -- Attendance handlers --------------------------------------------------- async def handle_attendance( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle attendance packet (0xB0). Parse GPS+LBS+WiFi, store, respond.""" att_type, reserved_bytes, datetime_bytes = await self._store_attendance( pkt, conn_info, is_4g=False ) # Response: datetime(6) + status(1) + type(1) + reserved(2) punch_type = 1 if att_type == "clock_in" else 2 payload = datetime_bytes + struct.pack("BB", 1, punch_type) + reserved_bytes response = PacketBuilder.build_response(pkt["protocol"], payload, pkt["serial"]) await self._send(writer, response) async def handle_attendance_4g( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle attendance 4G packet (0xB1). Like 0xB0 but MNC=2B, LAC=4B, CI=8B.""" att_type, reserved_bytes, datetime_bytes = await self._store_attendance( pkt, conn_info, is_4g=True ) punch_type = 1 if att_type == "clock_in" else 2 payload = datetime_bytes + struct.pack("BB", 1, punch_type) + reserved_bytes response = PacketBuilder.build_response(pkt["protocol"], payload, pkt["serial"]) await self._send(writer, response) async def _store_attendance( self, pkt: dict, conn_info: ConnectionInfo, is_4g: bool, ) -> tuple[str, bytes, bytes]: """Parse and store attendance record (0xB0 / 0xB1). Returns (attendance_type, reserved_bytes, datetime_bytes) for building response. """ imei = conn_info.imei content = pkt["content"] proto = pkt["protocol"] now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) # -- Parse fields -- pos = 0 # datetime (6 bytes) recorded_at = self._parse_datetime(content, pos) or now datetime_bytes = content[pos:pos + 6] if len(content) >= 6 else b"\x00" * 6 pos += 6 # GPS positioned flag (1 byte) gps_positioned = False if len(content) > pos: gps_positioned = content[pos] == 1 pos += 1 # Terminal reserved info (2 bytes) - echo back in response reserved_bytes = content[pos:pos + 2] if len(content) >= pos + 2 else b"\x00\x00" pos += 2 # GPS block (12 bytes): gps_info(1) + lat(4) + lon(4) + speed(1) + course_status(2) gps = self._parse_gps_from_content(content, offset=pos) latitude = gps.get("latitude") longitude = gps.get("longitude") speed = gps.get("speed") course = gps.get("course") gps_satellites = gps.get("gps_satellites") if gps.get("gps_positioned") is not None: gps_positioned = gps["gps_positioned"] pos += 12 # Terminal info (1 byte) - parse status bits (clock_out disabled due to GPS drift) attendance_type = "clock_in" terminal_info = 0 if len(content) > pos: terminal_info = content[pos] # status_code = (terminal_info >> 2) & 0x0F — always use clock_in pos += 1 # voltage_level (1 byte) battery_level: Optional[int] = None if len(content) > pos: vl = content[pos] battery_level = min(vl * 17, 100) if vl <= 6 else None pos += 1 # GSM signal (1 byte) gsm_signal: Optional[int] = None if len(content) > pos: gsm_signal = content[pos] pos += 1 # Reserved extension (2 bytes) pos += 2 # LBS: MCC/MNC/LAC/CI + neighbors mcc: Optional[int] = None mnc: Optional[int] = None lac: Optional[int] = None cell_id: Optional[int] = None neighbor_cells = [] if is_4g: # 0xB1: MNC=2B fixed, LAC=4B, CI=8B if len(content) >= pos + 2: mcc_raw = struct.unpack("!H", content[pos:pos + 2])[0] mcc = mcc_raw & 0x7FFF pos += 2 if len(content) >= pos + 2: mnc = struct.unpack("!H", content[pos:pos + 2])[0] pos += 2 if len(content) >= pos + 4: lac = struct.unpack("!I", content[pos:pos + 4])[0] pos += 4 if len(content) >= pos + 8: cell_id = struct.unpack("!Q", content[pos:pos + 8])[0] pos += 8 # RSSI if len(content) > pos: pos += 1 # main RSSI # Neighbor cells (6): LAC(4) + CI(8) + RSSI(1) each for _ in range(6): if len(content) >= pos + 13: n_lac = struct.unpack("!I", content[pos:pos + 4])[0] n_ci = struct.unpack("!Q", content[pos + 4:pos + 12])[0] n_rssi = content[pos + 12] if n_lac or n_ci: neighbor_cells.append({"lac": n_lac, "cell_id": n_ci, "rssi": n_rssi}) pos += 13 else: # 0xB0: standard MCC/MNC/LAC(2B)/CI(3B) mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos) if len(content) >= pos + 2: lac = struct.unpack("!H", content[pos:pos + 2])[0] pos += 2 if len(content) >= pos + 3: cell_id = (content[pos] << 16) | (content[pos + 1] << 8) | content[pos + 2] pos += 3 # RSSI if len(content) > pos: pos += 1 # Neighbor cells (6): LAC(2) + CI(3) + RSSI(1) each for _ in range(6): if len(content) >= pos + 6: n_lac = struct.unpack("!H", content[pos:pos + 2])[0] n_ci = (content[pos + 2] << 16) | (content[pos + 3] << 8) | content[pos + 4] n_rssi = content[pos + 5] if n_lac or n_ci: neighbor_cells.append({"lac": n_lac, "cell_id": n_ci, "rssi": n_rssi}) pos += 6 # TA (1 byte) if len(content) > pos: pos += 1 # WiFi data wifi_data_list: Optional[list] = None if len(content) > pos: wifi_count = content[pos] pos += 1 if wifi_count > 0: wifi_data_list = [] for _ in range(wifi_count): if len(content) >= pos + 7: mac = ":".join(f"{b:02X}" for b in content[pos:pos + 6]) signal = content[pos + 6] wifi_data_list.append({"mac": mac, "signal": signal}) pos += 7 # Forward geocoding if no GPS address: Optional[str] = None if not gps_positioned or latitude is None: if mcc is not None and lac is not None and cell_id is not None: try: att_loc_type = "wifi_4g" if is_4g and wifi_data_list else ("lbs_4g" if is_4g else "lbs") lat, lon = await geocode_location( mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id, wifi_list=wifi_data_list, location_type=att_loc_type, ) if lat is not None and lon is not None: latitude, longitude = lat, lon except Exception: logger.exception("Geocoding failed for attendance IMEI=%s", imei) # Reverse geocoding if latitude is not None and longitude is not None: try: address = await reverse_geocode(latitude, longitude) except Exception: logger.exception("Reverse geocoding failed for attendance IMEI=%s", imei) if not imei: logger.warning("Attendance received before login") return attendance_type, reserved_bytes, datetime_bytes lbs_data = None if mcc is not None: lbs_data = { "mcc": mcc, "mnc": mnc, "lac": lac, "cell_id": cell_id, "neighbors": neighbor_cells or None, } try: async with async_session() as session: async with session.begin(): device_id = await _get_device_id(session, imei) if device_id is None: logger.warning("Attendance for unknown IMEI=%s", imei) return attendance_type, reserved_bytes, datetime_bytes # Determine attendance source from protocol _att_source = "bluetooth" if proto == 0xB2 else "device" # Daily dedup: one clock_in / clock_out per device per day from app.services.fence_checker import _has_attendance_today if await _has_attendance_today(session, device_id, attendance_type): logger.info( "Attendance dedup: IMEI=%s already has %s today, skip", imei, attendance_type, ) return attendance_type, reserved_bytes, datetime_bytes record = AttendanceRecord( device_id=device_id, imei=conn_info.imei, attendance_type=attendance_type, attendance_source=_att_source, protocol_number=proto, gps_positioned=gps_positioned, latitude=latitude, longitude=longitude, speed=speed, course=course, gps_satellites=gps_satellites, battery_level=battery_level, gsm_signal=gsm_signal, mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id, wifi_data=wifi_data_list, lbs_data=lbs_data, address=address, recorded_at=recorded_at, ) session.add(record) logger.info( "Attendance %s stored: IMEI=%s type=%s GPS=%s pos=(%s,%s) addr=%s", "0xB1" if is_4g else "0xB0", imei, attendance_type, gps_positioned, latitude, longitude, address, ) # Broadcast attendance to WebSocket subscribers ws_manager.broadcast_nonblocking("attendance", { "imei": imei, "attendance_type": attendance_type, "latitude": latitude, "longitude": longitude, "address": address, "recorded_at": str(recorded_at), }) except Exception: logger.exception("DB error storing attendance for IMEI=%s", imei) return attendance_type, reserved_bytes, datetime_bytes # -- Bluetooth handlers ---------------------------------------------------- async def handle_bt_punch( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle Bluetooth punch (0xB2). Parse iBeacon data (MAC/UUID/Major/Minor/RSSI/battery), determine clock_in/clock_out, store and respond. """ content = pkt["content"] imei = conn_info.imei now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) # -- Parse 0xB2 fields -- pos = 0 recorded_at = self._parse_datetime(content, pos) or now datetime_bytes = content[pos:pos + 6] if len(content) >= 6 else b"\x00" * 6 pos += 6 # RSSI (1 byte, signed) rssi: Optional[int] = None if len(content) > pos: rssi = struct.unpack_from("b", content, pos)[0] pos += 1 # MAC address (6 bytes) beacon_mac: Optional[str] = None if len(content) >= pos + 6: beacon_mac = ":".join(f"{b:02X}" for b in content[pos:pos + 6]) pos += 6 # UUID (16 bytes) beacon_uuid: Optional[str] = None if len(content) >= pos + 16: uuid_bytes = content[pos:pos + 16] beacon_uuid = ( f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-" f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-" f"{uuid_bytes[10:16].hex()}" ).upper() pos += 16 # Major (2 bytes) beacon_major: Optional[int] = None if len(content) >= pos + 2: beacon_major = struct.unpack("!H", content[pos:pos + 2])[0] pos += 2 # Minor (2 bytes) beacon_minor: Optional[int] = None if len(content) >= pos + 2: beacon_minor = struct.unpack("!H", content[pos:pos + 2])[0] pos += 2 # Beacon battery (2 bytes, unsigned, unit 0.01V) beacon_battery: Optional[float] = None if len(content) >= pos + 2: raw_batt = struct.unpack("!H", content[pos:pos + 2])[0] beacon_battery = raw_batt * 0.01 pos += 2 # Terminal info (1 byte) - parse but always use clock_in (clock_out disabled) attendance_type = "clock_in" if len(content) > pos: pos += 1 # Terminal reserved (2 bytes) - echo back reserved_bytes = content[pos:pos + 2] if len(content) >= pos + 2 else b"\x00\x00" # -- Store record -- if imei: try: async with async_session() as session: async with session.begin(): device_id = await _get_device_id(session, imei) if device_id is not None: # Look up beacon location from beacon_configs beacon_lat = None beacon_lon = None beacon_addr = None if beacon_mac: bcfg = await session.execute( select(BeaconConfig).where( BeaconConfig.beacon_mac == beacon_mac, BeaconConfig.status == "active", ) ) beacon_cfg = bcfg.scalar_one_or_none() if beacon_cfg: beacon_lat = beacon_cfg.latitude beacon_lon = beacon_cfg.longitude beacon_addr = beacon_cfg.address logger.info( "BT punch: matched beacon '%s' at (%s, %s)", beacon_cfg.name, beacon_lat, beacon_lon, ) record = BluetoothRecord( device_id=device_id, imei=conn_info.imei, record_type="punch", protocol_number=pkt["protocol"], beacon_mac=beacon_mac, beacon_uuid=beacon_uuid, beacon_major=beacon_major, beacon_minor=beacon_minor, rssi=rssi, beacon_battery=beacon_battery, beacon_battery_unit="V", attendance_type=attendance_type, latitude=beacon_lat, longitude=beacon_lon, bluetooth_data={ "mac": beacon_mac, "uuid": beacon_uuid, "major": beacon_major, "minor": beacon_minor, "rssi": rssi, "battery_v": beacon_battery, "beacon_name": beacon_cfg.name if beacon_mac and beacon_cfg else None, "beacon_address": beacon_addr, }, recorded_at=recorded_at, ) session.add(record) logger.info( "BT punch stored: IMEI=%s type=%s MAC=%s UUID=%s " "Major=%s Minor=%s RSSI=%s Battery=%.2fV", imei, attendance_type, beacon_mac, beacon_uuid, beacon_major, beacon_minor, rssi, beacon_battery or 0, ) # Create AttendanceRecord for bluetooth punch from app.services.fence_checker import _has_attendance_today if not await _has_attendance_today(session, device_id, attendance_type): device = await session.get(Device, device_id) att_record = AttendanceRecord( device_id=device_id, imei=imei, attendance_type=attendance_type, attendance_source="bluetooth", protocol_number=pkt["protocol"], gps_positioned=False, latitude=beacon_lat, longitude=beacon_lon, address=beacon_addr, battery_level=device.battery_level if device else None, gsm_signal=device.gsm_signal if device else None, lbs_data={ "source": "bluetooth", "beacon_mac": beacon_mac, "beacon_name": beacon_cfg.name if beacon_cfg else None, }, recorded_at=recorded_at, ) session.add(att_record) logger.info( "BT attendance created: IMEI=%s type=%s beacon=%s", imei, attendance_type, beacon_mac, ) else: logger.info( "BT attendance dedup: IMEI=%s already has %s today", imei, attendance_type, ) # Broadcast bluetooth punch ws_manager.broadcast_nonblocking("bluetooth", { "imei": imei, "record_type": "punch", "beacon_mac": beacon_mac, "attendance_type": attendance_type, "recorded_at": str(recorded_at), }) ws_manager.broadcast_nonblocking("attendance", { "imei": imei, "attendance_type": attendance_type, "latitude": beacon_lat, "longitude": beacon_lon, "address": beacon_addr, "recorded_at": str(recorded_at), "source": "bluetooth", }) except Exception: logger.exception("DB error storing BT punch for IMEI=%s", imei) # -- Response: datetime(6) + status(1) + type(1) + reserved(2) -- punch_type = 1 if attendance_type == "clock_in" else 2 payload = datetime_bytes + struct.pack("BB", 1, punch_type) + reserved_bytes response = PacketBuilder.build_response(pkt["protocol"], payload, pkt["serial"]) await self._send(writer, response) async def handle_bt_location( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle Bluetooth location (0xB3). Parse multiple beacon data. Uses 0x7979 long packet format. No response needed per protocol spec. """ content = pkt["content"] imei = conn_info.imei now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) pos = 0 recorded_at = self._parse_datetime(content, pos) or now pos += 6 # Beacon count (1 byte) beacon_count = 0 if len(content) > pos: beacon_count = content[pos] pos += 1 # Parse each beacon: RSSI(1) + MAC(6) + UUID(16) + Major(2) + Minor(2) + Battery(2) + BattUnit(1) = 30 bytes beacons = [] for i in range(beacon_count): if len(content) < pos + 30: logger.warning( "BT location: truncated beacon %d/%d at pos=%d (have %d bytes)", i + 1, beacon_count, pos, len(content), ) break rssi = struct.unpack_from("b", content, pos)[0] pos += 1 mac = ":".join(f"{b:02X}" for b in content[pos:pos + 6]) pos += 6 uuid_bytes = content[pos:pos + 16] uuid_str = ( f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-" f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-" f"{uuid_bytes[10:16].hex()}" ).upper() pos += 16 major = struct.unpack("!H", content[pos:pos + 2])[0] pos += 2 minor = struct.unpack("!H", content[pos:pos + 2])[0] pos += 2 raw_batt = struct.unpack("!H", content[pos:pos + 2])[0] pos += 2 batt_unit_byte = content[pos] pos += 1 if batt_unit_byte == 0: battery_val = raw_batt * 0.01 battery_unit = "V" else: battery_val = float(raw_batt) battery_unit = "%" beacons.append({ "rssi": rssi, "mac": mac, "uuid": uuid_str, "major": major, "minor": minor, "battery": battery_val, "battery_unit": battery_unit, }) if not imei: logger.warning("BT location received before login") return logger.info( "BT location: IMEI=%s count=%d beacons=%s", imei, beacon_count, [(b["mac"], b["rssi"]) for b in beacons], ) # Store one record per beacon for easier querying, # plus keep full list in bluetooth_data for the first record. # Look up beacon locations from beacon_configs table. try: async with async_session() as session: async with session.begin(): device_id = await _get_device_id(session, imei) if device_id is None: logger.warning("BT location for unknown IMEI=%s", imei) return # Look up all beacon locations by MAC beacon_locations: dict[str, BeaconConfig] = {} if beacons: macs = [b["mac"] for b in beacons] bcfg_result = await session.execute( select(BeaconConfig).where( BeaconConfig.beacon_mac.in_(macs), BeaconConfig.status == "active", ) ) for cfg in bcfg_result.scalars().all(): beacon_locations[cfg.beacon_mac] = cfg # Find the strongest RSSI beacon with known location best_beacon = None best_rssi = -999 for b in beacons: cfg = beacon_locations.get(b["mac"]) if cfg and cfg.latitude and cfg.longitude and b["rssi"] > best_rssi: best_rssi = b["rssi"] best_beacon = cfg if best_beacon: logger.info( "BT location: best beacon '%s' MAC=%s RSSI=%d at (%s, %s)", best_beacon.name, best_beacon.beacon_mac, best_rssi, best_beacon.latitude, best_beacon.longitude, ) for idx, b in enumerate(beacons): cfg = beacon_locations.get(b["mac"]) record = BluetoothRecord( device_id=device_id, imei=conn_info.imei, record_type="location", protocol_number=pkt["protocol"], beacon_mac=b["mac"], beacon_uuid=b["uuid"], beacon_major=b["major"], beacon_minor=b["minor"], rssi=b["rssi"], beacon_battery=b["battery"], beacon_battery_unit=b["battery_unit"], latitude=cfg.latitude if cfg else None, longitude=cfg.longitude if cfg else None, bluetooth_data={ "beacons": beacons, "best_beacon_mac": best_beacon.beacon_mac if best_beacon else None, "best_beacon_name": best_beacon.name if best_beacon else None, } if idx == 0 else ( {"beacon_name": cfg.name} if cfg else None ), recorded_at=recorded_at, ) session.add(record) else: # No beacons parsed, store raw record = BluetoothRecord( device_id=device_id, imei=conn_info.imei, record_type="location", protocol_number=pkt["protocol"], bluetooth_data={"raw": content.hex(), "beacon_count": beacon_count}, recorded_at=recorded_at, ) session.add(record) # Broadcast bluetooth location ws_manager.broadcast_nonblocking("bluetooth", { "imei": imei, "record_type": "location", "beacon_count": beacon_count, "recorded_at": str(recorded_at), }) except Exception: logger.exception("DB error storing BT location for IMEI=%s", imei) # 0xB3 does NOT require a response per protocol spec # -- General info handler -------------------------------------------------- async def handle_general_info( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle general information (0x94). The content begins with a sub-protocol byte. Common sub-protocols include ICCID retrieval. We store the raw data and attempt to extract known sub-protocol fields. """ content = pkt["content"] imei = conn_info.imei if not imei: logger.warning("General info received before login") return sub_protocol: Optional[int] = None iccid: Optional[str] = None if len(content) >= 1: sub_protocol = content[0] imsi: Optional[str] = None # Sub-protocol 0x0A: IMEI(8) + IMSI(8) + ICCID(10) = 26 bytes after sub-proto if sub_protocol == 0x0A and len(content) >= 27: # Skip IMEI (8 bytes, already known), extract IMSI and ICCID imsi = content[9:17].hex().lstrip("0") or "0" iccid = content[17:27].hex() logger.info("IMEI=%s IMSI=%s ICCID=%s", imei, imsi, iccid) # Sub-protocol 0x00: legacy ICCID query -- ICCID is 10 bytes BCD elif sub_protocol == 0x00 and len(content) >= 11: iccid = content[1:11].hex() logger.info("IMEI=%s ICCID=%s (legacy)", imei, iccid) # Sub-protocol 0x09: GPS satellite info elif sub_protocol == 0x09 and len(content) >= 2: gps_status = content[1] status_names = {0: "OFF", 1: "Searching", 2: "2D", 3: "3D", 4: "Sleep"} logger.info("IMEI=%s GPS status=%s (%d)", imei, status_names.get(gps_status, "Unknown"), gps_status) # Store ICCID/IMSI on the device record if available if iccid or imsi: update_vals: dict = {} if iccid: update_vals["iccid"] = iccid if imsi: update_vals["imsi"] = imsi try: async with async_session() as session: async with session.begin(): await session.execute( update(Device) .where(Device.imei == imei) .values(**update_vals) ) except Exception: logger.exception("DB error storing device info for IMEI=%s", imei) # Protocol doc states 0x94 does NOT require a server response # -- Online command reply handler ------------------------------------------ async def handle_online_cmd_reply( self, pkt: dict, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn_info: ConnectionInfo, ) -> None: """Handle device reply to a server-issued command (0x81). Update the corresponding CommandLog entry with the response. """ content = pkt["content"] imei = conn_info.imei if not imei: logger.warning("Command reply received before login") return # Content: length(1) + server_flag(4) + command_content(variable) + language(2) response_text = "" if len(content) > 7: try: # Strip last 2 bytes (language/reserved) response_text = content[5:-2].decode("utf-8", errors="replace") except Exception: response_text = content[5:-2].hex() elif len(content) > 5: try: response_text = content[5:].decode("utf-8", errors="replace") except Exception: response_text = content[5:].hex() now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None) try: async with async_session() as session: async with session.begin(): device_id = await _get_device_id(session, imei) if device_id is None: logger.warning("Command reply for unknown IMEI=%s", imei) return # Find the most recent sent command for this device result = await session.execute( select(CommandLog) .where(CommandLog.device_id == device_id, CommandLog.status == "sent") .order_by(CommandLog.created_at.desc()) .limit(1) ) cmd_log = result.scalar_one_or_none() if cmd_log is not None: cmd_log.response_content = response_text cmd_log.status = "success" cmd_log.response_at = now else: logger.warning( "No pending CommandLog for reply from IMEI=%s", imei ) except Exception: logger.exception( "DB error updating command log for IMEI=%s", imei ) # ------------------------------------------------------------------ # Command sending (called from API layer) # ------------------------------------------------------------------ async def send_command( self, imei: str, command_type: str, command_content: str ) -> bool: """Send a command to a connected device. Parameters ---------- imei : str The IMEI of the target device. command_type : str An application-level label for the command (e.g. "restart", "config"). command_content : str The ASCII command string to send to the device. Returns ------- bool ``True`` if the command was successfully written to the socket. """ conn = self.connections.get(imei) if conn is None: logger.warning("Cannot send command to IMEI=%s: not connected", imei) return False _reader, writer, conn_info = conn serial = conn_info.next_serial() # Build 0x80 online-command packet # Payload: length(1) + server_flag(4) + content_bytes + language(2) content_bytes = command_content.encode("utf-8") server_flag_bytes = b"\x00\x00\x00\x00" server_flag_str = server_flag_bytes.hex() language_bytes = b"\x00\x01" # 0x01=Chinese, 0x02=English inner_len = len(server_flag_bytes) + len(content_bytes) + len(language_bytes) payload = bytes([inner_len]) + server_flag_bytes + content_bytes + language_bytes packet = PacketBuilder.build_response( PROTO_ONLINE_CMD, payload, serial, long=len(payload) > 200 ) try: await self._send(writer, packet) logger.info( "Command sent to IMEI=%s type=%s content=%r", imei, command_type, command_content, ) except Exception: logger.exception("Failed to send command to IMEI=%s", imei) return False return True async def send_message(self, imei: str, message: str) -> bool: """Send a text message (0x82) to a connected device. Parameters ---------- imei : str Target device IMEI. message : str The message text to send. Returns ------- bool ``True`` if the message was successfully written to the socket. """ conn = self.connections.get(imei) if conn is None: logger.warning("Cannot send message to IMEI=%s: not connected", imei) return False _reader, writer, conn_info = conn serial = conn_info.next_serial() msg_bytes = message.encode("utf-16-be") server_flag = b"\x00\x00\x00\x00" language = b"\x00\x01" # 0x0001 = Chinese (UTF16BE) # Message payload: cmd_length(1) + server_flag(4) + content(UTF16BE) + language(2) inner = server_flag + msg_bytes + language cmd_len = len(inner) if cmd_len > 0xFF: payload = struct.pack("!H", cmd_len) + inner else: payload = bytes([cmd_len]) + inner packet = PacketBuilder.build_response( PROTO_MESSAGE, payload, serial, long=len(payload) > 200 ) try: await self._send(writer, packet) logger.info("Message sent to IMEI=%s (%d bytes)", imei, len(msg_bytes)) return True except Exception: logger.exception("Failed to send message to IMEI=%s", imei) return False # ------------------------------------------------------------------ # Server lifecycle # ------------------------------------------------------------------ async def start(self, host: str, port: int) -> None: """Start the TCP server. Parameters ---------- host : str Bind address (e.g. ``"0.0.0.0"``). port : int Bind port. """ self._server = await asyncio.start_server( self._handle_connection, host, port ) addrs = ", ".join(str(s.getsockname()) for s in self._server.sockets) logger.info("KKS TCP server listening on %s", addrs) await self._server.serve_forever() async def stop(self) -> None: """Gracefully shut down the TCP server and close all connections.""" if self._server is not None: self._server.close() await self._server.wait_closed() logger.info("KKS TCP server stopped") # Close all active device connections for imei, (_reader, writer, _info) in list(self.connections.items()): try: writer.close() await writer.wait_closed() except Exception: pass logger.info("Closed connection for IMEI=%s", imei) self.connections.clear() @property def is_running(self) -> bool: return self._server is not None and self._server.is_serving() # --------------------------------------------------------------------------- # Module-level singleton # --------------------------------------------------------------------------- tcp_manager = TCPManager() async def start_tcp_server( host: Optional[str] = None, port: Optional[int] = None ) -> None: """Convenience function to start the global TCP server. Parameters ---------- host : str, optional Bind address. Defaults to ``settings.TCP_HOST``. port : int, optional Bind port. Defaults to ``settings.TCP_PORT``. """ await tcp_manager.start( host=host or settings.TCP_HOST, port=port or settings.TCP_PORT, )