Files
desungongpai/app/tcp_server.py

2619 lines
104 KiB
Python
Raw Permalink Normal View History

"""
KKS Bluetooth Badge TCP Server
Asyncio-based TCP server that manages persistent connections with KKS badge
devices. Each device authenticates via a login packet containing its IMEI,
after which the server routes incoming packets to protocol-specific handlers,
persists telemetry/alarm/attendance data, and exposes helper methods that the
REST API can call to push commands or messages to connected devices.
"""
from __future__ import annotations
import asyncio
import logging
import struct
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional, Tuple
from sqlalchemy import select, update
from app.config import settings
from app.database import async_session
from app.geocoding import geocode_location, reverse_geocode
from app.websocket_manager import ws_manager
from app.models import (
AlarmRecord,
AttendanceRecord,
BeaconConfig,
BluetoothRecord,
CommandLog,
Device,
HeartbeatRecord,
LocationRecord,
)
from app.protocol.constants import (
ALARM_TYPES,
PROTOCOL_NAMES,
PROTO_ADDRESS_QUERY,
PROTO_ALARM_ACK,
PROTO_ALARM_LBS_4G,
PROTO_ALARM_MULTI_FENCE,
PROTO_ALARM_SINGLE_FENCE,
PROTO_ALARM_WIFI,
PROTO_ATTENDANCE,
PROTO_ATTENDANCE_4G,
PROTO_BT_LOCATION,
PROTO_BT_PUNCH,
PROTO_GENERAL_INFO,
PROTO_GPS,
PROTO_GPS_4G,
PROTO_HEARTBEAT,
PROTO_HEARTBEAT_EXT,
PROTO_LBS_4G,
PROTO_LBS_4G_ADDRESS_REQ,
PROTO_LBS_ADDRESS_REQ,
PROTO_LBS_MULTI,
PROTO_LBS_MULTI_REPLY,
PROTO_LOGIN,
PROTO_MESSAGE,
PROTO_ONLINE_CMD,
PROTO_ONLINE_CMD_REPLY,
PROTO_TIME_SYNC,
PROTO_TIME_SYNC_2,
PROTO_ADDRESS_REPLY_EN,
PROTO_WIFI,
PROTO_WIFI_4G,
START_MARKER_LONG,
START_MARKER_SHORT,
STOP_MARKER,
)
from app.protocol.crc import crc_itu
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Lightweight packet parser / builder embedded here so the module is
# self-contained. These have a TCP-buffering-friendly interface (returning
# remaining bytes) while staying consistent with the protocol module's
# length/CRC semantics.
#
# Length field semantics (matching app.protocol.builder):
# length = proto(1) + info_content(N) + serial(2) + crc(2)
#
# Total packet:
# short: start(2) + length_field(1) + <length bytes> + stop(2)
# long: start(2) + length_field(2) + <length bytes> + stop(2)
# ---------------------------------------------------------------------------
class PacketParser:
"""Extract and parse complete KKS protocol packets from a byte stream."""
@staticmethod
def find_packets(buffer: bytes) -> Tuple[list, bytes]:
"""Scan *buffer* for complete packets.
Returns
-------
(packets, remaining)
*packets* is a list of dicts, each with keys:
start_marker, length, protocol, serial, content, raw
*remaining* is the unconsumed tail of *buffer*.
"""
packets: list = []
pos = 0
while pos < len(buffer):
# Look for a start marker
idx_short = buffer.find(START_MARKER_SHORT, pos)
idx_long = buffer.find(START_MARKER_LONG, pos)
# Pick whichever marker comes first; -1 means not found
candidates = []
if idx_short != -1:
candidates.append((idx_short, "short"))
if idx_long != -1:
candidates.append((idx_long, "long"))
if not candidates:
break # no more markers
candidates.sort(key=lambda c: c[0])
idx, marker_type = candidates[0]
if idx > pos:
# Skip garbage bytes before the marker
logger.debug("Skipping %d garbage bytes before marker", idx - pos)
pos = idx
if marker_type == "short":
# 78 78 LEN PROTO INFO... SERIAL(2) CRC(2) 0D 0A
# length field = proto(1) + info(N) + serial(2) + crc(2)
header_size = 3 # marker(2) + length(1)
if pos + header_size > len(buffer):
break # need more data for header
pkt_len = buffer[pos + 2]
# total = start(2) + len_field(1) + pkt_len + stop(2)
total = 2 + 1 + pkt_len + 2
if pos + total > len(buffer):
break # incomplete packet
raw = buffer[pos : pos + total]
# Validate stop marker
if raw[-2:] != STOP_MARKER:
logger.warning(
"Invalid stop marker at pos %d, skipping byte", pos
)
pos += 1
continue
# payload region (between length byte and stop marker)
# = proto(1) + info(N) + serial(2) + crc(2)
payload = raw[3 : -2] # everything between len_field and stop
protocol = payload[0]
serial = struct.unpack("!H", payload[-4:-2])[0]
content = payload[1:-4] # info_content between protocol and serial
packets.append(
{
"start_marker": "short",
"length": pkt_len,
"protocol": protocol,
"serial": serial,
"content": content,
"raw": raw,
}
)
pos += total
else:
# 79 79 LEN_H LEN_L PROTO INFO... SERIAL(2) CRC(2) 0D 0A
header_size = 4 # marker(2) + length(2)
if pos + header_size > len(buffer):
break
pkt_len = struct.unpack("!H", buffer[pos + 2 : pos + 4])[0]
# total = start(2) + len_field(2) + pkt_len + stop(2)
total = 2 + 2 + pkt_len + 2
if pos + total > len(buffer):
break
raw = buffer[pos : pos + total]
if raw[-2:] != STOP_MARKER:
logger.warning(
"Invalid stop marker (long) at pos %d, skipping byte", pos
)
pos += 1
continue
payload = raw[4 : -2]
protocol = payload[0]
serial = struct.unpack("!H", payload[-4:-2])[0]
content = payload[1:-4]
packets.append(
{
"start_marker": "long",
"length": pkt_len,
"protocol": protocol,
"serial": serial,
"content": content,
"raw": raw,
}
)
pos += total
remaining = buffer[pos:]
return packets, remaining
class PacketBuilder:
"""Thin wrapper delegating to app.protocol.builder.PacketBuilder.
Preserves the (protocol, payload, serial) call signature used throughout tcp_server.py.
"""
from app.protocol.builder import PacketBuilder as _ProtoBuilder
@staticmethod
def build_response(
protocol: int, payload: bytes, serial: int, *, long: bool = False
) -> bytes:
return PacketBuilder._ProtoBuilder.build_response(protocol, serial, payload)
# ---------------------------------------------------------------------------
# Connection information container
# ---------------------------------------------------------------------------
class ConnectionInfo:
"""Metadata about a single device TCP connection."""
__slots__ = ("imei", "device_id", "addr", "connected_at", "last_activity", "serial_counter")
def __init__(self, addr: Tuple[str, int]) -> None:
self.imei: Optional[str] = None
self.device_id: Optional[int] = None
self.addr = addr
self.connected_at = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
self.last_activity = self.connected_at
self.serial_counter: int = 1
def next_serial(self) -> int:
val = self.serial_counter
self.serial_counter = (self.serial_counter + 1) & 0xFFFF
return val
# ---------------------------------------------------------------------------
# Helper: look up device_id from IMEI
# ---------------------------------------------------------------------------
async def _get_device_id(session, imei: str, conn_info: Optional["ConnectionInfo"] = None) -> Optional[int]:
"""Return the device id for the given IMEI, using ConnectionInfo cache if available."""
if conn_info is not None and conn_info.device_id is not None:
return conn_info.device_id
result = await session.execute(
select(Device.id).where(Device.imei == imei)
)
row = result.scalar_one_or_none()
if row is not None and conn_info is not None:
conn_info.device_id = row
return row
# ---------------------------------------------------------------------------
# TCPManager -- the core singleton
# ---------------------------------------------------------------------------
class TCPManager:
"""Manages all badge device TCP connections and message routing."""
def __init__(self) -> None:
# {imei: (reader, writer, connection_info)}
self.connections: Dict[str, Tuple[asyncio.StreamReader, asyncio.StreamWriter, ConnectionInfo]] = {}
self._conn_lock = asyncio.Lock()
self._server: Optional[asyncio.AbstractServer] = None
# Protocol number -> handler coroutine mapping
self._handlers: Dict[int, Any] = {
PROTO_LOGIN: self.handle_login,
PROTO_HEARTBEAT: self.handle_heartbeat,
PROTO_HEARTBEAT_EXT: self.handle_heartbeat,
PROTO_GPS: self.handle_gps,
PROTO_GPS_4G: self.handle_gps_4g,
PROTO_LBS_MULTI: self.handle_lbs_multi,
PROTO_LBS_MULTI_REPLY: self.handle_lbs_multi,
PROTO_LBS_4G: self.handle_lbs_4g,
PROTO_WIFI: self.handle_wifi,
PROTO_WIFI_4G: self.handle_wifi_4g,
PROTO_TIME_SYNC: self.handle_time_sync,
PROTO_TIME_SYNC_2: self.handle_time_sync_2,
PROTO_ALARM_SINGLE_FENCE: self.handle_alarm_single_fence,
PROTO_ALARM_MULTI_FENCE: self.handle_alarm_multi_fence,
PROTO_ALARM_LBS_4G: self.handle_alarm_lbs,
PROTO_ALARM_WIFI: self.handle_alarm_wifi,
PROTO_LBS_ADDRESS_REQ: self.handle_lbs_address_req,
PROTO_ADDRESS_QUERY: self.handle_address_query,
PROTO_LBS_4G_ADDRESS_REQ: self.handle_lbs_4g_address_req,
PROTO_ATTENDANCE: self.handle_attendance,
PROTO_ATTENDANCE_4G: self.handle_attendance_4g,
PROTO_BT_PUNCH: self.handle_bt_punch,
PROTO_BT_LOCATION: self.handle_bt_location,
PROTO_GENERAL_INFO: self.handle_general_info,
PROTO_ONLINE_CMD_REPLY: self.handle_online_cmd_reply,
}
# ------------------------------------------------------------------
# Connection lifecycle
# ------------------------------------------------------------------
async def _handle_connection(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""Callback for each new TCP connection."""
addr = writer.get_extra_info("peername")
conn_info = ConnectionInfo(addr)
logger.info("New TCP connection from %s:%d", addr[0], addr[1])
recv_buffer = bytearray()
try:
idle_timeout = settings.TCP_IDLE_TIMEOUT or None
while True:
try:
if idle_timeout:
data = await asyncio.wait_for(reader.read(4096), timeout=idle_timeout)
else:
data = await reader.read(4096)
except asyncio.TimeoutError:
logger.info(
"Idle timeout (%ds) for %s:%d (IMEI=%s), closing",
idle_timeout, addr[0], addr[1], conn_info.imei,
)
break
if not data:
logger.info(
"Connection closed by remote %s:%d (IMEI=%s)",
addr[0],
addr[1],
conn_info.imei,
)
break
recv_buffer += data
conn_info.last_activity = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
logger.info("Received %d bytes from %s:%d (IMEI=%s): %s",
len(data), addr[0], addr[1], conn_info.imei, data[:50].hex())
packets, recv_buffer = PacketParser.find_packets(recv_buffer)
for pkt in packets:
proto = pkt["protocol"]
proto_name = PROTOCOL_NAMES.get(proto, f"0x{proto:02X}")
logger.debug(
"Packet from IMEI=%s proto=%s serial=%d len=%d",
conn_info.imei,
proto_name,
pkt["serial"],
pkt["length"],
)
handler = self._handlers.get(proto)
if handler is not None:
try:
await handler(pkt, reader, writer, conn_info)
except Exception:
logger.exception(
"Error handling proto %s for IMEI=%s",
proto_name,
conn_info.imei,
)
else:
logger.warning(
"No handler for protocol 0x%02X from IMEI=%s",
proto,
conn_info.imei,
)
# Safety: prevent unbounded buffer growth from garbage data
if len(recv_buffer) > 65536:
logger.warning(
"Receive buffer overflow for IMEI=%s, discarding",
conn_info.imei,
)
recv_buffer = bytearray()
except asyncio.CancelledError:
logger.info(
"Connection task cancelled for IMEI=%s (%s:%d)",
conn_info.imei,
addr[0],
addr[1],
)
except ConnectionResetError:
logger.warning(
"Connection reset by IMEI=%s (%s:%d)",
conn_info.imei,
addr[0],
addr[1],
)
except Exception:
logger.exception(
"Unexpected error on connection IMEI=%s (%s:%d)",
conn_info.imei,
addr[0],
addr[1],
)
finally:
await self._cleanup_connection(conn_info, writer)
async def _cleanup_connection(
self, conn_info: ConnectionInfo, writer: asyncio.StreamWriter
) -> None:
"""Remove connection from tracking and update the device status."""
imei = conn_info.imei
async with self._conn_lock:
if imei and imei in self.connections:
# Only remove if this is still the active connection (not replaced by reconnect)
_, stored_writer, _ = self.connections[imei]
if stored_writer is writer:
del self.connections[imei]
logger.info("Device IMEI=%s removed from active connections", imei)
else:
logger.info("Device IMEI=%s has reconnected, keeping new connection", imei)
# Don't mark offline since device reconnected
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
return
# Mark device offline in DB
try:
async with async_session() as session:
async with session.begin():
await session.execute(
update(Device)
.where(Device.imei == imei)
.values(status="offline")
)
# Broadcast device offline
ws_manager.broadcast_nonblocking("device_status", {
"imei": imei, "status": "offline",
})
except Exception:
logger.exception("Failed to set IMEI=%s offline in DB", imei)
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
# ------------------------------------------------------------------
# Helper: send bytes to a writer
# ------------------------------------------------------------------
@staticmethod
async def _send(writer: asyncio.StreamWriter, data: bytes) -> None:
writer.write(data)
await writer.drain()
# ------------------------------------------------------------------
# Helper: build and send an address reply placeholder
# ------------------------------------------------------------------
async def _send_address_reply(
self,
protocol: int,
serial: int,
writer: asyncio.StreamWriter,
address: Optional[str] = None,
phone: str = "",
is_alarm: bool = False,
) -> None:
"""Send address reply (0x17 Chinese / 0x97 English).
Format: cmd_length + server_flag(4) + ADDRESS/ALARMSMS(7-8) + && + addr(UTF16BE) + && + phone(21) + ##
"""
server_flag = b"\x00\x00\x00\x00"
marker = b"ALARMSMS" if is_alarm else b"ADDRESS"
separator = b"&&"
terminator = b"##"
addr_text = address or ""
addr_bytes = addr_text.encode("utf-16-be")
# Phone field: 21 bytes ASCII, zero-padded
phone_bytes = phone.encode("ascii", errors="ignore")[:21].ljust(21, b"0")
# Content after cmd_length: server_flag + marker + && + addr + && + phone + ##
inner = server_flag + marker + separator + addr_bytes + separator + phone_bytes + terminator
# Protocol 0x97 (English) uses 2-byte cmd_length; 0x17 (Chinese) uses 1-byte
if protocol == PROTO_ADDRESS_REPLY_EN:
payload = struct.pack("!H", len(inner)) + inner
else:
# Chinese (0x17): 1-byte length
cmd_len = min(len(inner), 0xFF)
payload = bytes([cmd_len]) + inner
use_long = len(payload) > 200
pkt = PacketBuilder.build_response(protocol, payload, serial, long=use_long)
await self._send(writer, pkt)
# ------------------------------------------------------------------
# Helper: parse GPS data from content
# ------------------------------------------------------------------
@staticmethod
def _parse_gps_from_content(content: bytes, offset: int = 6) -> Dict[str, Any]:
"""Parse GPS fields from packet content starting at *offset*.
Expected layout at offset (12 bytes):
gps_info_byte(1) + latitude(4) + longitude(4) + speed(1) + course_status(2)
Returns a dict with latitude, longitude, speed, course,
gps_satellites, gps_positioned, is_realtime.
"""
result: Dict[str, Any] = {}
if len(content) < offset + 12:
return result
gps_byte = content[offset]
satellites = gps_byte & 0x0F
lat_raw = struct.unpack("!I", content[offset + 1 : offset + 5])[0]
lon_raw = struct.unpack("!I", content[offset + 5 : offset + 9])[0]
speed = content[offset + 9]
course_status = struct.unpack("!H", content[offset + 10 : offset + 12])[0]
# Decode course/status bits:
# bit 13 (0x2000): real-time GPS
# bit 12 (0x1000): GPS is positioned
# bit 11 (0x0800): East longitude (0=West)
# bit 10 (0x0400): North latitude (0=South)
# bits 9-0: course (0-360)
is_realtime = bool(course_status & 0x2000)
gps_positioned = bool(course_status & 0x1000)
is_west = bool(course_status & 0x0800) # bit 11: 0=East, 1=West
is_north = bool(course_status & 0x0400) # bit 10: 0=South, 1=North
course = course_status & 0x03FF
latitude = lat_raw / 1_800_000.0
longitude = lon_raw / 1_800_000.0
# Apply hemisphere
if not is_north:
latitude = -latitude
if is_west:
longitude = -longitude
result["latitude"] = latitude
result["longitude"] = longitude
result["speed"] = float(speed)
result["course"] = float(course)
result["gps_satellites"] = satellites
result["gps_positioned"] = gps_positioned
result["is_realtime"] = is_realtime
return result
# ------------------------------------------------------------------
# Helper: parse datetime from content (6 bytes YY MM DD HH MM SS)
# ------------------------------------------------------------------
@staticmethod
def _parse_datetime(content: bytes, offset: int = 0) -> Optional[datetime]:
"""Parse a 6-byte datetime field at *offset* (UTC) and return CST (UTC+8) naive datetime."""
if len(content) < offset + 6:
return None
yy, mo, dd, hh, mi, ss = struct.unpack_from("BBBBBB", content, offset)
try:
utc_dt = datetime(2000 + yy, mo, dd, hh, mi, ss, tzinfo=timezone.utc)
# Convert to CST (UTC+8) and strip tzinfo for SQLite
cst_dt = utc_dt + timedelta(hours=8)
return cst_dt.replace(tzinfo=None)
except ValueError:
return None
# ------------------------------------------------------------------
# Protocol Handlers
# ------------------------------------------------------------------
async def handle_login(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle login packet (0x01).
Content layout: IMEI (8 bytes BCD-encoded) + type_code (2 bytes)
+ timezone_language (2 bytes).
"""
content = pkt["content"]
# IMEI is BCD-encoded in the first 8 bytes
if len(content) < 8:
logger.warning("Login packet too short (%d bytes)", len(content))
return
# BCD-encoded IMEI: 8 bytes = 16 hex digits, first digit is padding 0
raw_hex = content[:8].hex()
# Strip exactly one leading '0' (BCD padding) to get 15-digit IMEI
# Use slicing instead of lstrip to avoid removing multiple leading zeros
imei = raw_hex[1:] if raw_hex.startswith("0") else raw_hex
conn_info.imei = imei
# Close existing connection if device reconnects
async with self._conn_lock:
old_conn = self.connections.get(imei)
if old_conn is not None:
_, old_writer, old_info = old_conn
logger.info("Closing stale connection for IMEI=%s (old %s:%d)", imei, old_info.addr[0], old_info.addr[1])
try:
old_writer.close()
except Exception:
pass
self.connections[imei] = (reader, writer, conn_info)
logger.info(
"Device login: IMEI=%s from %s:%d", imei, conn_info.addr[0], conn_info.addr[1]
)
# Extract device type code (2 bytes after IMEI)
type_code: Optional[int] = None
device_type = "P240" # default
if len(content) >= 10:
type_code = struct.unpack("!H", content[8:10])[0]
device_type = f"0x{type_code:04X}"
# Extract timezone/language (2 bytes after device type)
tz_str: Optional[str] = None
lang_str: Optional[str] = None
if len(content) >= 12:
tz_lang = struct.unpack("!H", content[10:12])[0]
# bit15-04: timezone value * 100, bit03: 0=East/1=West, bit00: language
tz_raw = (tz_lang >> 4) & 0xFFF
tz_val = tz_raw / 100.0
tz_west = bool(tz_lang & 0x08)
if tz_west:
tz_str = f"W{tz_val}"
else:
tz_str = f"E{tz_val}"
lang_code = tz_lang & 0x03
lang_str = "zh" if lang_code == 1 else "en" if lang_code == 2 else str(lang_code)
# Persist device record
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
try:
async with async_session() as session:
async with session.begin():
result = await session.execute(
select(Device).where(Device.imei == imei)
)
device = result.scalar_one_or_none()
if device is None:
device = Device(
imei=imei,
device_type=device_type,
status="online",
last_login=now,
timezone=tz_str,
language=lang_str,
)
session.add(device)
else:
device.status = "online"
device.last_login = now
if tz_str:
device.timezone = tz_str
if lang_str:
device.language = lang_str
# Don't overwrite user-set device_type with raw hex code
# Broadcast device online
ws_manager.broadcast_nonblocking("device_status", {
"imei": imei, "status": "online",
})
except Exception:
logger.exception("DB error during login for IMEI=%s", imei)
# Send login response (empty payload)
response = PacketBuilder.build_response(PROTO_LOGIN, b"", pkt["serial"])
await self._send(writer, response)
async def handle_heartbeat(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle heartbeat (0x13) and extended heartbeat (0x36).
Heartbeat content layout:
terminal_info(1) + battery_level(1) + gsm_signal(1) + reserved(2)
Extended heartbeat (0x36) layout:
terminal_info(1) + battery_level(1) + gsm_signal(1) + language(2) + extension(N)
Terminal info byte bits:
bit 7: oil/electricity (0=connected, 1=disconnected)
bit 6: GPS positioned (0=no, 1=yes)
bits 5-4: work status
bit 3: charging
bit 2: ACC
bit 1: armed
bit 0: reserved
"""
content = pkt["content"]
proto = pkt["protocol"]
imei = conn_info.imei
if not imei:
logger.warning("Heartbeat received before login")
return
# Ensure device is tracked in active connections (e.g. after server restart)
if imei not in self.connections:
self.connections[imei] = (reader, writer, conn_info)
logger.info("Device IMEI=%s re-registered via heartbeat", imei)
terminal_info: int = 0
battery_level: Optional[int] = None
gsm_signal: Optional[int] = None
extension_data: Optional[dict] = None
if len(content) >= 1:
terminal_info = content[0]
if len(content) >= 2:
battery_level = content[1] # 0-100 percentage
if len(content) >= 3:
gsm_signal = content[2] # 0x00-0x04
# For extended heartbeat (0x36), parse language and extension modules
if proto == PROTO_HEARTBEAT_EXT:
ext_info: Dict[str, Any] = {}
if len(content) >= 5:
ext_info["language"] = struct.unpack("!H", content[3:5])[0]
if len(content) > 5:
ext_info["extension_raw"] = content[5:].hex()
if ext_info:
extension_data = ext_info
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
try:
async with async_session() as session:
async with session.begin():
device_id = await _get_device_id(session, imei, conn_info)
if device_id is None:
logger.warning("Heartbeat for unknown IMEI=%s", imei)
return
# Update device record (also ensure status=online if heartbeat is coming in)
await session.execute(
update(Device)
.where(Device.id == device_id)
.values(
status="online",
battery_level=battery_level,
gsm_signal=gsm_signal,
last_heartbeat=now,
)
)
# Store heartbeat record
record = HeartbeatRecord(
device_id=device_id,
imei=conn_info.imei,
protocol_number=proto,
terminal_info=terminal_info,
battery_level=battery_level if battery_level is not None else 0,
gsm_signal=gsm_signal if gsm_signal is not None else 0,
extension_data=extension_data,
)
session.add(record)
except Exception:
logger.exception("DB error during heartbeat for IMEI=%s", imei)
# Send heartbeat response
response = PacketBuilder.build_response(pkt["protocol"], b"", pkt["serial"])
await self._send(writer, response)
async def handle_gps(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle GPS data packet (0x22). No response required."""
await self._store_location(pkt, conn_info, location_type="gps")
async def handle_gps_4g(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle GPS 4G data packet (0xA0). No response required."""
await self._store_location(pkt, conn_info, location_type="gps_4g")
async def handle_lbs_multi(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle LBS multi-cell data (0x28 / 0x2E).
No response for 0x28; send response for 0x2E.
"""
await self._store_location(pkt, conn_info, location_type="lbs")
if pkt["protocol"] == PROTO_LBS_MULTI_REPLY:
response = PacketBuilder.build_response(
pkt["protocol"], b"", pkt["serial"]
)
await self._send(writer, response)
async def handle_lbs_4g(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle LBS 4G data (0xA1). No response."""
await self._store_location(pkt, conn_info, location_type="lbs_4g")
async def handle_wifi(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle WIFI data (0x2C). No response."""
await self._store_location(pkt, conn_info, location_type="wifi")
async def handle_wifi_4g(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle WIFI 4G data (0xA2). No response."""
await self._store_location(pkt, conn_info, location_type="wifi_4g")
# -- Location storage helper ------------------------------------------------
async def _store_location(
self,
pkt: dict,
conn_info: ConnectionInfo,
location_type: str,
) -> Optional[str]:
imei = conn_info.imei
if not imei:
logger.warning("Location data received before login (type=%s)", location_type)
return None
content = pkt["content"]
proto = pkt["protocol"]
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
# Parse recorded_at from the 6-byte datetime at offset 0
recorded_at = self._parse_datetime(content, 0) or now
# Attempt to extract lat/lon from GPS-type payloads
latitude: Optional[float] = None
longitude: Optional[float] = None
speed: Optional[float] = None
course: Optional[float] = None
gps_satellites: Optional[int] = None
gps_positioned: bool = False
is_realtime: bool = True
mcc: Optional[int] = None
mnc: Optional[int] = None
lac: Optional[int] = None
cell_id: Optional[int] = None
report_mode: Optional[int] = None
mileage: Optional[float] = None
if location_type in ("gps", "gps_4g") and len(content) >= 18:
# datetime(6) + gps(12) + lbs fields + ...
gps_data = self._parse_gps_from_content(content, offset=6)
latitude = gps_data.get("latitude")
longitude = gps_data.get("longitude")
speed = gps_data.get("speed")
course = gps_data.get("course")
gps_satellites = gps_data.get("gps_satellites")
gps_positioned = gps_data.get("gps_positioned", False)
is_realtime = gps_data.get("is_realtime", True)
# Parse LBS fields after GPS (offset 18)
pos = 18
if location_type == "gps" and len(content) >= pos + 3:
# MCC(2) + MNC(1 or 2)
mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0]
mnc_2byte = bool(mcc_raw & 0x8000)
mcc = mcc_raw & 0x7FFF
pos += 2
if mnc_2byte and len(content) >= pos + 2:
mnc = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
elif len(content) >= pos + 1:
mnc = content[pos]
pos += 1
if len(content) >= pos + 2:
lac = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
if len(content) >= pos + 3:
cell_id = int.from_bytes(content[pos : pos + 3], "big")
pos += 3
# skip acc(1)
if len(content) >= pos + 1:
pos += 1
if len(content) >= pos + 1:
report_mode = content[pos]
pos += 1
# skip realtime_upload(1)
if len(content) >= pos + 1:
pos += 1
if len(content) >= pos + 4:
mileage = float(struct.unpack("!I", content[pos : pos + 4])[0])
pos += 4
elif location_type == "gps_4g" and len(content) >= pos + 3:
mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0]
mnc_2byte = bool(mcc_raw & 0x8000)
mcc = mcc_raw & 0x7FFF
pos += 2
if mnc_2byte and len(content) >= pos + 2:
mnc = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
elif len(content) >= pos + 1:
mnc = content[pos]
pos += 1
if len(content) >= pos + 4:
lac = struct.unpack("!I", content[pos : pos + 4])[0]
pos += 4
if len(content) >= pos + 8:
cell_id = struct.unpack("!Q", content[pos : pos + 8])[0]
pos += 8
# skip acc(1)
if len(content) >= pos + 1:
pos += 1
if len(content) >= pos + 1:
report_mode = content[pos]
pos += 1
# skip realtime_upload(1)
if len(content) >= pos + 1:
pos += 1
if len(content) >= pos + 4:
mileage = float(struct.unpack("!I", content[pos : pos + 4])[0])
pos += 4
elif location_type in ("lbs", "lbs_4g") and len(content) >= 9:
# datetime(6) + mcc(2) + mnc(1-2) + stations...
pos = 6
if len(content) >= pos + 2:
mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0]
mnc_2byte = bool(mcc_raw & 0x8000)
mcc = mcc_raw & 0x7FFF
pos += 2
if mnc_2byte and len(content) >= pos + 2:
mnc = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
elif len(content) >= pos + 1:
mnc = content[pos]
pos += 1
# Main station LAC + cell_id
if location_type == "lbs" and len(content) >= pos + 5:
lac = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
cell_id = int.from_bytes(content[pos : pos + 3], "big")
pos += 3
elif location_type == "lbs_4g" and len(content) >= pos + 12:
lac = struct.unpack("!I", content[pos : pos + 4])[0]
pos += 4
cell_id = struct.unpack("!Q", content[pos : pos + 8])[0]
pos += 8
elif location_type in ("wifi", "wifi_4g") and len(content) >= 9:
# Same header as LBS: datetime(6) + mcc(2) + mnc(1-2) + stations + wifi_aps
pos = 6
if len(content) >= pos + 2:
mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0]
mnc_2byte = bool(mcc_raw & 0x8000)
mcc = mcc_raw & 0x7FFF
pos += 2
if mnc_2byte and len(content) >= pos + 2:
mnc = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
elif len(content) >= pos + 1:
mnc = content[pos]
pos += 1
# Main station LAC + cell_id (4G format: LAC=4, CellID=8)
if location_type == "wifi_4g" and len(content) >= pos + 12:
lac = struct.unpack("!I", content[pos : pos + 4])[0]
pos += 4
cell_id = struct.unpack("!Q", content[pos : pos + 8])[0]
pos += 8
elif location_type == "wifi" and len(content) >= pos + 5:
lac = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
cell_id = int.from_bytes(content[pos : pos + 3], "big")
pos += 3
# --- Skip LBS/WiFi records with empty cell data (device hasn't acquired cells yet) ---
if location_type in ("lbs", "lbs_4g", "wifi", "wifi_4g") and latitude is None:
mcc_val = mcc & 0x7FFF if mcc else 0
if mcc_val == 0 and (lac is None or lac == 0) and (cell_id is None or cell_id == 0):
logger.debug("Skipping empty LBS/WiFi packet for IMEI=%s (no cell data)", imei)
return
# --- Geocoding for LBS/WiFi locations (no GPS coordinates) ---
neighbor_cells_data: Optional[list] = None
wifi_data_list: Optional[list] = None
if location_type in ("lbs", "lbs_4g", "wifi", "wifi_4g") and latitude is None:
# Parse neighbor cells and WiFi APs from raw content for geocoding
neighbor_cells_data, wifi_data_list = self._parse_extra_location_data(
content, location_type
)
try:
lat, lon = await geocode_location(
mcc=mcc,
mnc=mnc,
lac=lac,
cell_id=cell_id,
wifi_list=wifi_data_list,
neighbor_cells=neighbor_cells_data,
imei=imei,
location_type=location_type,
)
if lat is not None and lon is not None:
latitude = lat
longitude = lon
logger.info(
"Geocoded %s for IMEI=%s: lat=%.6f, lon=%.6f",
location_type, imei, lat, lon,
)
except Exception:
logger.exception("Geocoding failed for %s IMEI=%s", location_type, imei)
# --- Reverse geocoding (run concurrently with DB store below) ---
address_task = None
if latitude is not None and longitude is not None:
address_task = asyncio.ensure_future(reverse_geocode(latitude, longitude))
try:
async with async_session() as session:
async with session.begin():
device_id = await _get_device_id(session, imei, conn_info)
if device_id is None:
logger.warning("Location for unknown IMEI=%s", imei)
if address_task:
address_task.cancel()
return
# Await reverse geocode result if running
address: Optional[str] = None
if address_task:
try:
address = await address_task
except Exception:
logger.exception("Reverse geocoding failed for IMEI=%s", imei)
record = LocationRecord(
device_id=device_id,
imei=conn_info.imei,
location_type=location_type,
latitude=latitude,
longitude=longitude,
speed=speed,
course=course,
gps_satellites=gps_satellites,
gps_positioned=gps_positioned,
mcc=mcc,
mnc=mnc,
lac=lac,
cell_id=cell_id,
neighbor_cells=neighbor_cells_data,
wifi_data=wifi_data_list,
report_mode=report_mode,
is_realtime=is_realtime,
mileage=mileage,
address=address,
raw_data=content.hex(),
recorded_at=recorded_at,
)
session.add(record)
# --- Fence auto-attendance check (same session) ---
fence_events = []
if settings.FENCE_CHECK_ENABLED and latitude is not None and longitude is not None:
try:
from app.services.fence_checker import check_device_fences
fence_events = await check_device_fences(
session, device_id, imei,
latitude, longitude, location_type,
address, recorded_at,
mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id,
)
except Exception:
logger.exception("Fence check failed for IMEI=%s", imei)
# Broadcast to WebSocket subscribers (after commit)
ws_manager.broadcast_nonblocking("location", {
"imei": imei, "device_id": device_id, "location_type": location_type,
"latitude": latitude, "longitude": longitude, "speed": speed,
"address": address, "recorded_at": str(recorded_at),
})
for evt in fence_events:
ws_manager.broadcast_nonblocking("fence_attendance", evt)
ws_manager.broadcast_nonblocking("attendance", evt)
except Exception:
logger.exception(
"DB error storing %s location for IMEI=%s", location_type, imei
)
return address
@staticmethod
def _parse_extra_location_data(
content: bytes, location_type: str
) -> tuple[Optional[list], Optional[list]]:
"""Parse neighbor cells and WiFi APs from LBS/WiFi raw content for geocoding."""
neighbor_cells: list[dict] = []
wifi_list: list[dict] = []
try:
pos = 6 # skip datetime
if len(content) < pos + 2:
return (None, None)
# Skip MCC/MNC
mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0]
mnc_2byte = bool(mcc_raw & 0x8000)
pos += 2
if mnc_2byte:
pos += 2
else:
pos += 1
# Parse stations (main + up to 6 neighbors)
is_4g = location_type in ("lbs_4g", "wifi_4g")
lac_size = 4 if is_4g else 2
cid_size = 8 if is_4g else 3
station_size = lac_size + cid_size + 1 # +1 for RSSI
for i in range(7):
if len(content) < pos + station_size:
break
if is_4g:
s_lac = struct.unpack("!I", content[pos : pos + 4])[0]
pos += 4
s_cid = struct.unpack("!Q", content[pos : pos + 8])[0]
pos += 8
else:
s_lac = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
s_cid = int.from_bytes(content[pos : pos + 3], "big")
pos += 3
s_rssi = content[pos]
pos += 1
if i > 0 and (s_lac != 0 or s_cid != 0):
neighbor_cells.append({
"lac": s_lac,
"cell_id": s_cid,
"rssi": s_rssi,
})
# Skip timing_advance (1 byte)
if len(content) > pos:
pos += 1
# Parse WiFi APs (only for wifi types)
if location_type in ("wifi", "wifi_4g") and len(content) > pos:
wifi_count = content[pos]
pos += 1
for _ in range(wifi_count):
if len(content) < pos + 7: # MAC(6) + signal(1)
break
mac_bytes = content[pos : pos + 6]
mac = ":".join(f"{b:02X}" for b in mac_bytes)
pos += 6
signal = content[pos]
pos += 1
wifi_list.append({"mac": mac, "signal": signal})
except Exception:
pass
return (
neighbor_cells if neighbor_cells else None,
wifi_list if wifi_list else None,
)
# -- Alarm parsing helpers -------------------------------------------------
@staticmethod
def _parse_mcc_mnc_from_content(
content: bytes, pos: int
) -> tuple[Optional[int], Optional[int], int]:
"""Parse MCC (2 bytes) + MNC (1 or 2 bytes) from content at pos.
MCC high bit (0x8000) indicates MNC is 2 bytes instead of 1.
Returns (mcc, mnc, new_pos).
"""
mcc: Optional[int] = None
mnc: Optional[int] = None
if len(content) >= pos + 2:
mcc_raw = struct.unpack("!H", content[pos : pos + 2])[0]
mnc_2byte = bool(mcc_raw & 0x8000)
mcc = mcc_raw & 0x7FFF
pos += 2
if mnc_2byte and len(content) >= pos + 2:
mnc = struct.unpack("!H", content[pos : pos + 2])[0]
pos += 2
elif len(content) >= pos + 1:
mnc = content[pos]
pos += 1
return mcc, mnc, pos
@staticmethod
def _parse_alarm_tail(
content: bytes, pos: int
) -> tuple[Optional[int], Optional[int], Optional[int], str, int]:
"""Parse the common alarm tail fields.
Layout: terminal_info(1) + voltage_level(1) + gsm_signal(1)
+ alarm_code(1) + language(1)
Returns (terminal_info, battery_level, gsm_signal, alarm_type_name, new_pos).
"""
terminal_info: Optional[int] = None
battery_level: Optional[int] = None
gsm_signal: Optional[int] = None
alarm_type_name = "unknown"
if len(content) >= pos + 1:
terminal_info = content[pos]
pos += 1
if len(content) >= pos + 1:
# voltage_level is 1 byte: 0x00-0x06 (level, not millivolts)
voltage_level = content[pos]
# Map voltage level to approximate battery percentage
battery_level = min(voltage_level * 17, 100) if voltage_level <= 6 else None
pos += 1
if len(content) >= pos + 1:
gsm_signal = content[pos]
pos += 1
if len(content) >= pos + 1:
alarm_code = content[pos]
alarm_type_name = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}")
pos += 1
if len(content) >= pos + 1:
# language byte - skip
pos += 1
return terminal_info, battery_level, gsm_signal, alarm_type_name, pos
# -- Time Sync handlers ---------------------------------------------------
async def handle_time_sync(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle time sync request (0x1F).
Protocol requires: 4-byte Unix timestamp + 2-byte language.
Chinese (0x0001) uses GMT+8 timestamp, English (0x0002) uses GMT+0.
"""
content = pkt["content"]
# Parse language from request (last 2 bytes of content)
language = 0x0001 # default Chinese
if len(content) >= 8:
language = struct.unpack("!H", content[6:8])[0]
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
if language == 0x0001:
# Chinese: use GMT+8 timestamp
ts = int(now.timestamp()) + 8 * 3600
else:
# English / other: use GMT+0 timestamp
ts = int(now.timestamp())
# Payload: 4-byte Unix timestamp + 2-byte language
payload = struct.pack("!IH", ts, language)
response = PacketBuilder.build_response(PROTO_TIME_SYNC, payload, pkt["serial"])
await self._send(writer, response)
logger.debug("Time sync response sent to IMEI=%s (ts=%d, lang=0x%04X)", conn_info.imei, ts, language)
async def handle_time_sync_2(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle time sync 2 request (0x8A). Respond with YY MM DD HH MM SS."""
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
payload = bytes(
[
now.year % 100,
now.month,
now.day,
now.hour,
now.minute,
now.second,
]
)
response = PacketBuilder.build_response(PROTO_TIME_SYNC_2, payload, pkt["serial"])
await self._send(writer, response)
logger.debug("Time sync 2 response sent to IMEI=%s", conn_info.imei)
# -- Alarm handlers -------------------------------------------------------
async def _send_alarm_ack(
self, serial: int, writer: asyncio.StreamWriter
) -> None:
"""Send alarm acknowledgement using protocol 0x26."""
pkt = PacketBuilder.build_response(PROTO_ALARM_ACK, b"", serial)
await self._send(writer, pkt)
async def _send_alarm_with_address(
self, pkt: dict, writer: asyncio.StreamWriter, address: Optional[str]
) -> None:
"""Send 0x26 ACK + 0x17 Chinese address reply for alarm packets."""
await self._send_alarm_ack(pkt["serial"], writer)
if address:
await self._send_address_reply(
PROTO_LBS_ADDRESS_REQ, pkt["serial"], writer,
address=address, is_alarm=True,
)
async def handle_alarm_single_fence(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle single-fence alarm (0xA3). Store, send 0x26 ACK + address."""
address = await self._store_alarm(pkt, conn_info, alarm_source="single_fence")
await self._send_alarm_with_address(pkt, writer, address)
async def handle_alarm_multi_fence(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle multi-fence alarm (0xA4). Store, send 0x26 ACK + address."""
address = await self._store_alarm(pkt, conn_info, alarm_source="multi_fence")
await self._send_alarm_with_address(pkt, writer, address)
async def handle_alarm_lbs(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle LBS alarm (0xA5). Store, send 0x26 ACK + address."""
address = await self._store_alarm(pkt, conn_info, alarm_source="lbs")
await self._send_alarm_with_address(pkt, writer, address)
async def handle_alarm_wifi(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle WIFI alarm (0xA9). Store, send 0x26 ACK + address."""
address = await self._store_alarm(pkt, conn_info, alarm_source="wifi")
await self._send_alarm_with_address(pkt, writer, address)
async def _store_alarm(
self,
pkt: dict,
conn_info: ConnectionInfo,
alarm_source: str,
) -> Optional[str]:
imei = conn_info.imei
if not imei:
logger.warning("Alarm received before login (source=%s)", alarm_source)
return None
content = pkt["content"]
proto = pkt["protocol"]
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
recorded_at = self._parse_datetime(content, 0) or now
# Extract GPS data if present
latitude: Optional[float] = None
longitude: Optional[float] = None
speed: Optional[float] = None
course: Optional[float] = None
mcc: Optional[int] = None
mnc: Optional[int] = None
lac: Optional[int] = None
cell_id: Optional[int] = None
battery_level: Optional[int] = None
gsm_signal: Optional[int] = None
wifi_data: Optional[list] = None
fence_data: Optional[dict] = None
# For alarm packets (0xA3, 0xA4, 0xA9), the terminal_info byte is
# located after GPS + LBS data. Extract alarm type from terminal_info bits.
alarm_type_name = "unknown"
terminal_info: Optional[int] = None
if proto in (PROTO_ALARM_SINGLE_FENCE, PROTO_ALARM_MULTI_FENCE):
# 0xA3/0xA4: datetime(6) + gps(12) + lbs_length(1) + mcc(2) + mnc(1-2)
# + lac(4) + cell_id(8) + terminal_info(1) + voltage_level(1) + gsm_signal(1)
# + alarm_code(1) + language(1) [+ fence_id(1) for 0xA4]
gps_data = self._parse_gps_from_content(content, offset=6)
if gps_data.get("gps_positioned", False):
latitude = gps_data.get("latitude")
longitude = gps_data.get("longitude")
speed = gps_data.get("speed")
course = gps_data.get("course")
pos = 18 # after datetime(6) + gps(12)
if len(content) > pos:
lbs_length = content[pos]
pos += 1
mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos)
if len(content) >= pos + 4:
lac = struct.unpack("!I", content[pos : pos + 4])[0]
pos += 4
if len(content) >= pos + 8:
cell_id = struct.unpack("!Q", content[pos : pos + 8])[0]
pos += 8
terminal_info, battery_level, gsm_signal, alarm_type_name, pos = \
self._parse_alarm_tail(content, pos)
# Extract fence_id for 0xA4 multi-fence alarm
if proto == PROTO_ALARM_MULTI_FENCE and len(content) >= pos + 1:
fence_id = content[pos]
fence_data = {"fence_id": fence_id}
pos += 1
elif proto == PROTO_ALARM_LBS_4G:
# 0xA5: NO datetime, NO GPS, NO lbs_length
# mcc(2) + mnc(1-2) + lac(4) + cell_id(8) + terminal_info(1)
# + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1)
recorded_at = now # 0xA5 has no datetime field
pos = 0 # content starts directly with MCC
mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos)
if len(content) >= pos + 4:
lac = struct.unpack("!I", content[pos : pos + 4])[0]
pos += 4
if len(content) >= pos + 8:
cell_id = struct.unpack("!Q", content[pos : pos + 8])[0]
pos += 8
terminal_info, battery_level, gsm_signal, alarm_type_name, pos = \
self._parse_alarm_tail(content, pos)
elif proto == PROTO_ALARM_WIFI:
# 0xA9: datetime(6) + mcc(2) + mnc(1-2) + cell_type(1) + cell_count(1)
# + [lac + ci + rssi]*N + timing_advance(1)
# + wifi_count(1) + [mac(6) + signal(1)]*N + alarm_code(1) + language(1)
pos = 6 # after datetime
mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos)
# cell_type and cell_count
cell_type = 0 # 0=2G, 1=4G
cell_count = 0
if len(content) >= pos + 2:
cell_type = content[pos]
cell_count = content[pos + 1]
pos += 2
# Parse cell stations
for i in range(cell_count):
if cell_type == 1: # 4G: LAC(4) + CI(8) + RSSI(1)
if len(content) < pos + 13:
break
if i == 0:
lac = struct.unpack("!I", content[pos : pos + 4])[0]
cell_id = struct.unpack("!Q", content[pos + 4 : pos + 12])[0]
gsm_signal = content[pos + 12]
pos += 13
else: # 2G: LAC(2) + CI(3) + RSSI(1)
if len(content) < pos + 6:
break
if i == 0:
lac = struct.unpack("!H", content[pos : pos + 2])[0]
cell_id = int.from_bytes(content[pos + 2 : pos + 5], "big")
gsm_signal = content[pos + 5]
pos += 6
# timing_advance(1)
if len(content) >= pos + 1:
pos += 1
# WiFi APs
wifi_data_list = []
if len(content) >= pos + 1:
wifi_count = content[pos]
pos += 1
for _ in range(wifi_count):
if len(content) < pos + 7:
break
mac = ":".join(f"{b:02X}" for b in content[pos : pos + 6])
signal = content[pos + 6]
wifi_data_list.append({"mac": mac, "signal": signal})
pos += 7
if wifi_data_list:
wifi_data = wifi_data_list
# alarm_code(1) + language(1)
if len(content) >= pos + 1:
alarm_code = content[pos]
alarm_type_name = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}")
pos += 1
# --- Forward geocoding for LBS/WiFi alarms (no GPS coordinates) ---
if latitude is None and mcc is not None and lac is not None and cell_id is not None:
try:
wifi_list_for_geocode = wifi_data_list if proto == PROTO_ALARM_WIFI else None
alarm_is_4g = proto in (PROTO_ALARM_SINGLE_FENCE, PROTO_ALARM_MULTI_FENCE, PROTO_ALARM_LBS_4G)
lat, lon = await geocode_location(
mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id,
wifi_list=wifi_list_for_geocode,
imei=imei,
location_type="lbs_4g" if alarm_is_4g else "lbs",
)
if lat is not None and lon is not None:
latitude = lat
longitude = lon
logger.info("Geocoded alarm for IMEI=%s: lat=%.6f, lon=%.6f", imei, lat, lon)
except Exception:
logger.exception("Geocoding failed for alarm IMEI=%s", imei)
# --- Reverse geocoding for alarm location ---
address: Optional[str] = None
if latitude is not None and longitude is not None:
try:
address = await reverse_geocode(latitude, longitude)
except Exception:
logger.exception("Reverse geocoding failed for alarm IMEI=%s", imei)
try:
async with async_session() as session:
async with session.begin():
device_id = await _get_device_id(session, imei, conn_info)
if device_id is None:
logger.warning("Alarm for unknown IMEI=%s", imei)
return
record = AlarmRecord(
device_id=device_id,
imei=conn_info.imei,
alarm_type=alarm_type_name,
alarm_source=alarm_source,
protocol_number=proto,
latitude=latitude,
longitude=longitude,
speed=speed,
course=course,
mcc=mcc,
mnc=mnc,
lac=lac,
cell_id=cell_id,
battery_level=battery_level,
gsm_signal=gsm_signal,
address=address,
wifi_data=wifi_data,
fence_data=fence_data,
recorded_at=recorded_at,
)
session.add(record)
# Broadcast alarm to WebSocket subscribers
ws_manager.broadcast_nonblocking("alarm", {
"imei": imei, "device_id": device_id, "alarm_type": alarm_type_name,
"alarm_source": alarm_source, "latitude": latitude, "longitude": longitude,
"address": address, "recorded_at": str(recorded_at),
})
except Exception:
logger.exception(
"DB error storing alarm for IMEI=%s (source=%s)", imei, alarm_source
)
return address
# -- Address / LBS request handlers ----------------------------------------
@staticmethod
def _extract_language_from_content(content: bytes) -> int:
"""Extract language field (last 2 bytes) from address request content."""
if len(content) >= 2:
return struct.unpack("!H", content[-2:])[0]
return 0x0001 # default Chinese
async def handle_lbs_address_req(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle LBS address request (0x17). Store data, send address reply."""
address = await self._store_location(pkt, conn_info, location_type="lbs_address_req")
lang = self._extract_language_from_content(pkt["content"])
reply_proto = PROTO_ADDRESS_REPLY_EN if lang == 0x0002 else PROTO_LBS_ADDRESS_REQ
await self._send_address_reply(reply_proto, pkt["serial"], writer, address=address)
async def handle_address_query(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle address query (0x1A). Store data, send address reply."""
address = await self._store_location(pkt, conn_info, location_type="address_query")
lang = self._extract_language_from_content(pkt["content"])
reply_proto = PROTO_ADDRESS_REPLY_EN if lang == 0x0002 else PROTO_LBS_ADDRESS_REQ
await self._send_address_reply(reply_proto, pkt["serial"], writer, address=address)
async def handle_lbs_4g_address_req(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle LBS 4G address request (0xA7). Store data, send address reply."""
address = await self._store_location(pkt, conn_info, location_type="lbs_4g_address_req")
lang = self._extract_language_from_content(pkt["content"])
reply_proto = PROTO_ADDRESS_REPLY_EN if lang == 0x0002 else PROTO_LBS_ADDRESS_REQ
await self._send_address_reply(reply_proto, pkt["serial"], writer, address=address)
# -- Attendance handlers ---------------------------------------------------
async def handle_attendance(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle attendance packet (0xB0). Parse GPS+LBS+WiFi, store, respond."""
att_type, reserved_bytes, datetime_bytes = await self._store_attendance(
pkt, conn_info, is_4g=False
)
# Response: datetime(6) + status(1) + type(1) + reserved(2)
punch_type = 1 if att_type == "clock_in" else 2
payload = datetime_bytes + struct.pack("BB", 1, punch_type) + reserved_bytes
response = PacketBuilder.build_response(pkt["protocol"], payload, pkt["serial"])
await self._send(writer, response)
async def handle_attendance_4g(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle attendance 4G packet (0xB1). Like 0xB0 but MNC=2B, LAC=4B, CI=8B."""
att_type, reserved_bytes, datetime_bytes = await self._store_attendance(
pkt, conn_info, is_4g=True
)
punch_type = 1 if att_type == "clock_in" else 2
payload = datetime_bytes + struct.pack("BB", 1, punch_type) + reserved_bytes
response = PacketBuilder.build_response(pkt["protocol"], payload, pkt["serial"])
await self._send(writer, response)
async def _store_attendance(
self,
pkt: dict,
conn_info: ConnectionInfo,
is_4g: bool,
) -> tuple[str, bytes, bytes]:
"""Parse and store attendance record (0xB0 / 0xB1).
Returns (attendance_type, reserved_bytes, datetime_bytes) for building response.
"""
imei = conn_info.imei
content = pkt["content"]
proto = pkt["protocol"]
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
# -- Parse fields --
pos = 0
# datetime (6 bytes)
recorded_at = self._parse_datetime(content, pos) or now
datetime_bytes = content[pos:pos + 6] if len(content) >= 6 else b"\x00" * 6
pos += 6
# GPS positioned flag (1 byte)
gps_positioned = False
if len(content) > pos:
gps_positioned = content[pos] == 1
pos += 1
# Terminal reserved info (2 bytes) - echo back in response
reserved_bytes = content[pos:pos + 2] if len(content) >= pos + 2 else b"\x00\x00"
pos += 2
# GPS block (12 bytes): gps_info(1) + lat(4) + lon(4) + speed(1) + course_status(2)
gps = self._parse_gps_from_content(content, offset=pos)
latitude = gps.get("latitude")
longitude = gps.get("longitude")
speed = gps.get("speed")
course = gps.get("course")
gps_satellites = gps.get("gps_satellites")
if gps.get("gps_positioned") is not None:
gps_positioned = gps["gps_positioned"]
pos += 12
# Terminal info (1 byte) - parse status bits (clock_out disabled due to GPS drift)
attendance_type = "clock_in"
terminal_info = 0
if len(content) > pos:
terminal_info = content[pos]
# status_code = (terminal_info >> 2) & 0x0F — always use clock_in
pos += 1
# voltage_level (1 byte)
battery_level: Optional[int] = None
if len(content) > pos:
vl = content[pos]
battery_level = min(vl * 17, 100) if vl <= 6 else None
pos += 1
# GSM signal (1 byte)
gsm_signal: Optional[int] = None
if len(content) > pos:
gsm_signal = content[pos]
pos += 1
# Reserved extension (2 bytes)
pos += 2
# LBS: MCC/MNC/LAC/CI + neighbors
mcc: Optional[int] = None
mnc: Optional[int] = None
lac: Optional[int] = None
cell_id: Optional[int] = None
neighbor_cells = []
if is_4g:
# 0xB1: MNC=2B fixed, LAC=4B, CI=8B
if len(content) >= pos + 2:
mcc_raw = struct.unpack("!H", content[pos:pos + 2])[0]
mcc = mcc_raw & 0x7FFF
pos += 2
if len(content) >= pos + 2:
mnc = struct.unpack("!H", content[pos:pos + 2])[0]
pos += 2
if len(content) >= pos + 4:
lac = struct.unpack("!I", content[pos:pos + 4])[0]
pos += 4
if len(content) >= pos + 8:
cell_id = struct.unpack("!Q", content[pos:pos + 8])[0]
pos += 8
# RSSI
if len(content) > pos:
pos += 1 # main RSSI
# Neighbor cells (6): LAC(4) + CI(8) + RSSI(1) each
for _ in range(6):
if len(content) >= pos + 13:
n_lac = struct.unpack("!I", content[pos:pos + 4])[0]
n_ci = struct.unpack("!Q", content[pos + 4:pos + 12])[0]
n_rssi = content[pos + 12]
if n_lac or n_ci:
neighbor_cells.append({"lac": n_lac, "cell_id": n_ci, "rssi": n_rssi})
pos += 13
else:
# 0xB0: standard MCC/MNC/LAC(2B)/CI(3B)
mcc, mnc, pos = self._parse_mcc_mnc_from_content(content, pos)
if len(content) >= pos + 2:
lac = struct.unpack("!H", content[pos:pos + 2])[0]
pos += 2
if len(content) >= pos + 3:
cell_id = (content[pos] << 16) | (content[pos + 1] << 8) | content[pos + 2]
pos += 3
# RSSI
if len(content) > pos:
pos += 1
# Neighbor cells (6): LAC(2) + CI(3) + RSSI(1) each
for _ in range(6):
if len(content) >= pos + 6:
n_lac = struct.unpack("!H", content[pos:pos + 2])[0]
n_ci = (content[pos + 2] << 16) | (content[pos + 3] << 8) | content[pos + 4]
n_rssi = content[pos + 5]
if n_lac or n_ci:
neighbor_cells.append({"lac": n_lac, "cell_id": n_ci, "rssi": n_rssi})
pos += 6
# TA (1 byte)
if len(content) > pos:
pos += 1
# WiFi data
wifi_data_list: Optional[list] = None
if len(content) > pos:
wifi_count = content[pos]
pos += 1
if wifi_count > 0:
wifi_data_list = []
for _ in range(wifi_count):
if len(content) >= pos + 7:
mac = ":".join(f"{b:02X}" for b in content[pos:pos + 6])
signal = content[pos + 6]
wifi_data_list.append({"mac": mac, "signal": signal})
pos += 7
# Forward geocoding if no GPS
address: Optional[str] = None
if not gps_positioned or latitude is None:
if mcc is not None and lac is not None and cell_id is not None:
try:
att_loc_type = "wifi_4g" if is_4g and wifi_data_list else ("lbs_4g" if is_4g else "lbs")
lat, lon = await geocode_location(
mcc=mcc, mnc=mnc, lac=lac, cell_id=cell_id,
wifi_list=wifi_data_list,
location_type=att_loc_type,
)
if lat is not None and lon is not None:
latitude, longitude = lat, lon
except Exception:
logger.exception("Geocoding failed for attendance IMEI=%s", imei)
# Reverse geocoding
if latitude is not None and longitude is not None:
try:
address = await reverse_geocode(latitude, longitude)
except Exception:
logger.exception("Reverse geocoding failed for attendance IMEI=%s", imei)
if not imei:
logger.warning("Attendance received before login")
return attendance_type, reserved_bytes, datetime_bytes
lbs_data = None
if mcc is not None:
lbs_data = {
"mcc": mcc, "mnc": mnc, "lac": lac, "cell_id": cell_id,
"neighbors": neighbor_cells or None,
}
try:
async with async_session() as session:
async with session.begin():
device_id = await _get_device_id(session, imei, conn_info)
if device_id is None:
logger.warning("Attendance for unknown IMEI=%s", imei)
return attendance_type, reserved_bytes, datetime_bytes
# Determine attendance source from protocol
_att_source = "bluetooth" if proto == 0xB2 else "device"
# Daily dedup: one clock_in / clock_out per device per day
from app.services.fence_checker import _has_attendance_today
if await _has_attendance_today(session, device_id, attendance_type):
logger.info(
"Attendance dedup: IMEI=%s already has %s today, skip",
imei, attendance_type,
)
return attendance_type, reserved_bytes, datetime_bytes
record = AttendanceRecord(
device_id=device_id,
imei=conn_info.imei,
attendance_type=attendance_type,
attendance_source=_att_source,
protocol_number=proto,
gps_positioned=gps_positioned,
latitude=latitude,
longitude=longitude,
speed=speed,
course=course,
gps_satellites=gps_satellites,
battery_level=battery_level,
gsm_signal=gsm_signal,
mcc=mcc,
mnc=mnc,
lac=lac,
cell_id=cell_id,
wifi_data=wifi_data_list,
lbs_data=lbs_data,
address=address,
recorded_at=recorded_at,
)
session.add(record)
logger.info(
"Attendance %s stored: IMEI=%s type=%s GPS=%s pos=(%s,%s) addr=%s",
"0xB1" if is_4g else "0xB0", imei, attendance_type,
gps_positioned, latitude, longitude, address,
)
# Broadcast attendance to WebSocket subscribers
ws_manager.broadcast_nonblocking("attendance", {
"imei": imei, "attendance_type": attendance_type,
"latitude": latitude, "longitude": longitude,
"address": address, "recorded_at": str(recorded_at),
})
except Exception:
logger.exception("DB error storing attendance for IMEI=%s", imei)
return attendance_type, reserved_bytes, datetime_bytes
# -- Bluetooth handlers ----------------------------------------------------
async def handle_bt_punch(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle Bluetooth punch (0xB2).
Parse iBeacon data (MAC/UUID/Major/Minor/RSSI/battery),
determine clock_in/clock_out, store and respond.
"""
content = pkt["content"]
imei = conn_info.imei
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
# -- Parse 0xB2 fields --
pos = 0
recorded_at = self._parse_datetime(content, pos) or now
datetime_bytes = content[pos:pos + 6] if len(content) >= 6 else b"\x00" * 6
pos += 6
# RSSI (1 byte, signed)
rssi: Optional[int] = None
if len(content) > pos:
rssi = struct.unpack_from("b", content, pos)[0]
pos += 1
# MAC address (6 bytes)
beacon_mac: Optional[str] = None
if len(content) >= pos + 6:
beacon_mac = ":".join(f"{b:02X}" for b in content[pos:pos + 6])
pos += 6
# UUID (16 bytes)
beacon_uuid: Optional[str] = None
if len(content) >= pos + 16:
uuid_bytes = content[pos:pos + 16]
beacon_uuid = (
f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-"
f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-"
f"{uuid_bytes[10:16].hex()}"
).upper()
pos += 16
# Major (2 bytes)
beacon_major: Optional[int] = None
if len(content) >= pos + 2:
beacon_major = struct.unpack("!H", content[pos:pos + 2])[0]
pos += 2
# Minor (2 bytes)
beacon_minor: Optional[int] = None
if len(content) >= pos + 2:
beacon_minor = struct.unpack("!H", content[pos:pos + 2])[0]
pos += 2
# Beacon battery (2 bytes, unsigned, unit 0.01V)
beacon_battery: Optional[float] = None
if len(content) >= pos + 2:
raw_batt = struct.unpack("!H", content[pos:pos + 2])[0]
beacon_battery = raw_batt * 0.01
pos += 2
# Terminal info (1 byte) - parse but always use clock_in (clock_out disabled)
attendance_type = "clock_in"
if len(content) > pos:
pos += 1
# Terminal reserved (2 bytes) - echo back
reserved_bytes = content[pos:pos + 2] if len(content) >= pos + 2 else b"\x00\x00"
# -- Store record --
if imei:
try:
async with async_session() as session:
async with session.begin():
device_id = await _get_device_id(session, imei, conn_info)
if device_id is not None:
# Look up beacon location from beacon_configs
beacon_lat = None
beacon_lon = None
beacon_addr = None
if beacon_mac:
bcfg = await session.execute(
select(BeaconConfig).where(
BeaconConfig.beacon_mac == beacon_mac,
BeaconConfig.status == "active",
)
)
beacon_cfg = bcfg.scalar_one_or_none()
if beacon_cfg:
beacon_lat = beacon_cfg.latitude
beacon_lon = beacon_cfg.longitude
beacon_addr = beacon_cfg.address
logger.info(
"BT punch: matched beacon '%s' at (%s, %s)",
beacon_cfg.name, beacon_lat, beacon_lon,
)
record = BluetoothRecord(
device_id=device_id,
imei=conn_info.imei,
record_type="punch",
protocol_number=pkt["protocol"],
beacon_mac=beacon_mac,
beacon_uuid=beacon_uuid,
beacon_major=beacon_major,
beacon_minor=beacon_minor,
rssi=rssi,
beacon_battery=beacon_battery,
beacon_battery_unit="V",
attendance_type=attendance_type,
latitude=beacon_lat,
longitude=beacon_lon,
bluetooth_data={
"mac": beacon_mac, "uuid": beacon_uuid,
"major": beacon_major, "minor": beacon_minor,
"rssi": rssi, "battery_v": beacon_battery,
"beacon_name": beacon_cfg.name if beacon_mac and beacon_cfg else None,
"beacon_address": beacon_addr,
},
recorded_at=recorded_at,
)
session.add(record)
logger.info(
"BT punch stored: IMEI=%s type=%s MAC=%s UUID=%s "
"Major=%s Minor=%s RSSI=%s Battery=%.2fV",
imei, attendance_type, beacon_mac, beacon_uuid,
beacon_major, beacon_minor, rssi,
beacon_battery or 0,
)
# Create AttendanceRecord for bluetooth punch
from app.services.fence_checker import _has_attendance_today
if not await _has_attendance_today(session, device_id, attendance_type):
device = await session.get(Device, device_id)
att_record = AttendanceRecord(
device_id=device_id,
imei=imei,
attendance_type=attendance_type,
attendance_source="bluetooth",
protocol_number=pkt["protocol"],
gps_positioned=False,
latitude=beacon_lat,
longitude=beacon_lon,
address=beacon_addr,
battery_level=device.battery_level if device else None,
gsm_signal=device.gsm_signal if device else None,
lbs_data={
"source": "bluetooth",
"beacon_mac": beacon_mac,
"beacon_name": beacon_cfg.name if beacon_cfg else None,
},
recorded_at=recorded_at,
)
session.add(att_record)
logger.info(
"BT attendance created: IMEI=%s type=%s beacon=%s",
imei, attendance_type, beacon_mac,
)
else:
logger.info(
"BT attendance dedup: IMEI=%s already has %s today",
imei, attendance_type,
)
# Broadcast bluetooth punch
ws_manager.broadcast_nonblocking("bluetooth", {
"imei": imei, "record_type": "punch",
"beacon_mac": beacon_mac, "attendance_type": attendance_type,
"recorded_at": str(recorded_at),
})
ws_manager.broadcast_nonblocking("attendance", {
"imei": imei, "attendance_type": attendance_type,
"latitude": beacon_lat, "longitude": beacon_lon,
"address": beacon_addr, "recorded_at": str(recorded_at),
"source": "bluetooth",
})
except Exception:
logger.exception("DB error storing BT punch for IMEI=%s", imei)
# -- Response: datetime(6) + status(1) + type(1) + reserved(2) --
punch_type = 1 if attendance_type == "clock_in" else 2
payload = datetime_bytes + struct.pack("BB", 1, punch_type) + reserved_bytes
response = PacketBuilder.build_response(pkt["protocol"], payload, pkt["serial"])
await self._send(writer, response)
async def handle_bt_location(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle Bluetooth location (0xB3).
Parse multiple beacon data. Uses 0x7979 long packet format.
No response needed per protocol spec.
"""
content = pkt["content"]
imei = conn_info.imei
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
pos = 0
recorded_at = self._parse_datetime(content, pos) or now
pos += 6
# Beacon count (1 byte)
beacon_count = 0
if len(content) > pos:
beacon_count = content[pos]
pos += 1
# Parse each beacon: RSSI(1) + MAC(6) + UUID(16) + Major(2) + Minor(2) + Battery(2) + BattUnit(1) = 30 bytes
beacons = []
for i in range(beacon_count):
if len(content) < pos + 30:
logger.warning(
"BT location: truncated beacon %d/%d at pos=%d (have %d bytes)",
i + 1, beacon_count, pos, len(content),
)
break
rssi = struct.unpack_from("b", content, pos)[0]
pos += 1
mac = ":".join(f"{b:02X}" for b in content[pos:pos + 6])
pos += 6
uuid_bytes = content[pos:pos + 16]
uuid_str = (
f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-"
f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-"
f"{uuid_bytes[10:16].hex()}"
).upper()
pos += 16
major = struct.unpack("!H", content[pos:pos + 2])[0]
pos += 2
minor = struct.unpack("!H", content[pos:pos + 2])[0]
pos += 2
raw_batt = struct.unpack("!H", content[pos:pos + 2])[0]
pos += 2
batt_unit_byte = content[pos]
pos += 1
if batt_unit_byte == 0:
battery_val = raw_batt * 0.01
battery_unit = "V"
else:
battery_val = float(raw_batt)
battery_unit = "%"
beacons.append({
"rssi": rssi,
"mac": mac,
"uuid": uuid_str,
"major": major,
"minor": minor,
"battery": battery_val,
"battery_unit": battery_unit,
})
if not imei:
logger.warning("BT location received before login")
return
logger.info(
"BT location: IMEI=%s count=%d beacons=%s",
imei, beacon_count,
[(b["mac"], b["rssi"]) for b in beacons],
)
# Store one record per beacon for easier querying,
# plus keep full list in bluetooth_data for the first record.
# Look up beacon locations from beacon_configs table.
try:
async with async_session() as session:
async with session.begin():
device_id = await _get_device_id(session, imei, conn_info)
if device_id is None:
logger.warning("BT location for unknown IMEI=%s", imei)
return
# Look up all beacon locations by MAC
beacon_locations: dict[str, BeaconConfig] = {}
if beacons:
macs = [b["mac"] for b in beacons]
bcfg_result = await session.execute(
select(BeaconConfig).where(
BeaconConfig.beacon_mac.in_(macs),
BeaconConfig.status == "active",
)
)
for cfg in bcfg_result.scalars().all():
beacon_locations[cfg.beacon_mac] = cfg
# Find the strongest RSSI beacon with known location
best_beacon = None
best_rssi = -999
for b in beacons:
cfg = beacon_locations.get(b["mac"])
if cfg and cfg.latitude and cfg.longitude and b["rssi"] > best_rssi:
best_rssi = b["rssi"]
best_beacon = cfg
if best_beacon:
logger.info(
"BT location: best beacon '%s' MAC=%s RSSI=%d at (%s, %s)",
best_beacon.name, best_beacon.beacon_mac,
best_rssi, best_beacon.latitude, best_beacon.longitude,
)
for idx, b in enumerate(beacons):
cfg = beacon_locations.get(b["mac"])
record = BluetoothRecord(
device_id=device_id,
imei=conn_info.imei,
record_type="location",
protocol_number=pkt["protocol"],
beacon_mac=b["mac"],
beacon_uuid=b["uuid"],
beacon_major=b["major"],
beacon_minor=b["minor"],
rssi=b["rssi"],
beacon_battery=b["battery"],
beacon_battery_unit=b["battery_unit"],
latitude=cfg.latitude if cfg else None,
longitude=cfg.longitude if cfg else None,
bluetooth_data={
"beacons": beacons,
"best_beacon_mac": best_beacon.beacon_mac if best_beacon else None,
"best_beacon_name": best_beacon.name if best_beacon else None,
} if idx == 0 else (
{"beacon_name": cfg.name} if cfg else None
),
recorded_at=recorded_at,
)
session.add(record)
else:
# No beacons parsed, store raw
record = BluetoothRecord(
device_id=device_id,
imei=conn_info.imei,
record_type="location",
protocol_number=pkt["protocol"],
bluetooth_data={"raw": content.hex(), "beacon_count": beacon_count},
recorded_at=recorded_at,
)
session.add(record)
# Broadcast bluetooth location
ws_manager.broadcast_nonblocking("bluetooth", {
"imei": imei, "record_type": "location",
"beacon_count": beacon_count, "recorded_at": str(recorded_at),
})
except Exception:
logger.exception("DB error storing BT location for IMEI=%s", imei)
# 0xB3 does NOT require a response per protocol spec
# -- General info handler --------------------------------------------------
async def handle_general_info(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle general information (0x94).
The content begins with a sub-protocol byte. Common sub-protocols
include ICCID retrieval. We store the raw data and attempt to
extract known sub-protocol fields.
"""
content = pkt["content"]
imei = conn_info.imei
if not imei:
logger.warning("General info received before login")
return
sub_protocol: Optional[int] = None
iccid: Optional[str] = None
if len(content) >= 1:
sub_protocol = content[0]
imsi: Optional[str] = None
# Sub-protocol 0x0A: IMEI(8) + IMSI(8) + ICCID(10) = 26 bytes after sub-proto
if sub_protocol == 0x0A and len(content) >= 27:
# Skip IMEI (8 bytes, already known), extract IMSI and ICCID
imsi = content[9:17].hex().lstrip("0") or "0"
iccid = content[17:27].hex()
logger.info("IMEI=%s IMSI=%s ICCID=%s", imei, imsi, iccid)
# Sub-protocol 0x00: legacy ICCID query -- ICCID is 10 bytes BCD
elif sub_protocol == 0x00 and len(content) >= 11:
iccid = content[1:11].hex()
logger.info("IMEI=%s ICCID=%s (legacy)", imei, iccid)
# Sub-protocol 0x09: GPS satellite info
elif sub_protocol == 0x09 and len(content) >= 2:
gps_status = content[1]
status_names = {0: "OFF", 1: "Searching", 2: "2D", 3: "3D", 4: "Sleep"}
logger.info("IMEI=%s GPS status=%s (%d)", imei, status_names.get(gps_status, "Unknown"), gps_status)
# Store ICCID/IMSI on the device record if available
if iccid or imsi:
update_vals: dict = {}
if iccid:
update_vals["iccid"] = iccid
if imsi:
update_vals["imsi"] = imsi
try:
async with async_session() as session:
async with session.begin():
await session.execute(
update(Device)
.where(Device.imei == imei)
.values(**update_vals)
)
except Exception:
logger.exception("DB error storing device info for IMEI=%s", imei)
# Protocol doc states 0x94 does NOT require a server response
# -- Online command reply handler ------------------------------------------
async def handle_online_cmd_reply(
self,
pkt: dict,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn_info: ConnectionInfo,
) -> None:
"""Handle device reply to a server-issued command (0x81).
Update the corresponding CommandLog entry with the response.
"""
content = pkt["content"]
imei = conn_info.imei
if not imei:
logger.warning("Command reply received before login")
return
# Content: length(1) + server_flag(4) + command_content(variable) + language(2)
response_text = ""
if len(content) > 7:
try:
# Strip last 2 bytes (language/reserved)
response_text = content[5:-2].decode("utf-8", errors="replace")
except Exception:
response_text = content[5:-2].hex()
elif len(content) > 5:
try:
response_text = content[5:].decode("utf-8", errors="replace")
except Exception:
response_text = content[5:].hex()
now = datetime.now(timezone(timedelta(hours=8))).replace(tzinfo=None)
try:
async with async_session() as session:
async with session.begin():
device_id = await _get_device_id(session, imei, conn_info)
if device_id is None:
logger.warning("Command reply for unknown IMEI=%s", imei)
return
# Find the most recent sent command for this device
result = await session.execute(
select(CommandLog)
.where(CommandLog.device_id == device_id, CommandLog.status == "sent")
.order_by(CommandLog.created_at.desc())
.limit(1)
)
cmd_log = result.scalar_one_or_none()
if cmd_log is not None:
cmd_log.response_content = response_text
cmd_log.status = "success"
cmd_log.response_at = now
else:
logger.warning(
"No pending CommandLog for reply from IMEI=%s", imei
)
except Exception:
logger.exception(
"DB error updating command log for IMEI=%s", imei
)
# ------------------------------------------------------------------
# Command sending (called from API layer)
# ------------------------------------------------------------------
async def send_command(
self, imei: str, command_type: str, command_content: str
) -> bool:
"""Send a command to a connected device.
Parameters
----------
imei : str
The IMEI of the target device.
command_type : str
An application-level label for the command (e.g. "restart", "config").
command_content : str
The ASCII command string to send to the device.
Returns
-------
bool
``True`` if the command was successfully written to the socket.
"""
async with self._conn_lock:
conn = self.connections.get(imei)
if conn is None:
logger.warning("Cannot send command to IMEI=%s: not connected", imei)
return False
_reader, writer, conn_info = conn
if writer.is_closing():
logger.warning("IMEI=%s writer is closing, removing stale connection", imei)
del self.connections[imei]
return False
serial = conn_info.next_serial()
# Build 0x80 online-command packet
# Payload: length(1) + server_flag(4) + content_bytes + language(2)
content_bytes = command_content.encode("utf-8")
server_flag_bytes = b"\x00\x00\x00\x00"
server_flag_str = server_flag_bytes.hex()
language_bytes = b"\x00\x01" # 0x01=Chinese, 0x02=English
inner_len = len(server_flag_bytes) + len(content_bytes) + len(language_bytes)
payload = bytes([inner_len]) + server_flag_bytes + content_bytes + language_bytes
packet = PacketBuilder.build_response(
PROTO_ONLINE_CMD, payload, serial, long=len(payload) > 200
)
try:
await self._send(writer, packet)
logger.info(
"Command sent to IMEI=%s type=%s content=%r",
imei,
command_type,
command_content,
)
except Exception:
logger.exception("Failed to send command to IMEI=%s, removing stale connection", imei)
self.connections.pop(imei, None)
return False
return True
async def send_message(self, imei: str, message: str) -> bool:
"""Send a text message (0x82) to a connected device.
Parameters
----------
imei : str
Target device IMEI.
message : str
The message text to send.
Returns
-------
bool
``True`` if the message was successfully written to the socket.
"""
async with self._conn_lock:
conn = self.connections.get(imei)
if conn is None:
logger.warning("Cannot send message to IMEI=%s: not connected", imei)
return False
_reader, writer, conn_info = conn
if writer.is_closing():
logger.warning("IMEI=%s writer is closing, removing stale connection", imei)
del self.connections[imei]
return False
serial = conn_info.next_serial()
msg_bytes = message.encode("utf-16-be")
server_flag = b"\x00\x00\x00\x00"
language = b"\x00\x01" # 0x0001 = Chinese (UTF16BE)
# Message payload: cmd_length(1) + server_flag(4) + content(UTF16BE) + language(2)
inner = server_flag + msg_bytes + language
cmd_len = len(inner)
if cmd_len > 0xFF:
payload = struct.pack("!H", cmd_len) + inner
else:
payload = bytes([cmd_len]) + inner
packet = PacketBuilder.build_response(
PROTO_MESSAGE, payload, serial, long=len(payload) > 200
)
try:
await self._send(writer, packet)
logger.info("Message sent to IMEI=%s (%d bytes)", imei, len(msg_bytes))
return True
except Exception:
logger.exception("Failed to send message to IMEI=%s, removing stale connection", imei)
self.connections.pop(imei, None)
return False
# ------------------------------------------------------------------
# Server lifecycle
# ------------------------------------------------------------------
async def start(self, host: str, port: int) -> None:
"""Start the TCP server.
Parameters
----------
host : str
Bind address (e.g. ``"0.0.0.0"``).
port : int
Bind port.
"""
self._server = await asyncio.start_server(
self._handle_connection, host, port
)
addrs = ", ".join(str(s.getsockname()) for s in self._server.sockets)
logger.info("KKS TCP server listening on %s", addrs)
await self._server.serve_forever()
async def stop(self) -> None:
"""Gracefully shut down the TCP server and close all connections."""
if self._server is not None:
self._server.close()
await self._server.wait_closed()
logger.info("KKS TCP server stopped")
# Close all active device connections
for imei, (_reader, writer, _info) in list(self.connections.items()):
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
logger.info("Closed connection for IMEI=%s", imei)
self.connections.clear()
@property
def is_running(self) -> bool:
return self._server is not None and self._server.is_serving()
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
tcp_manager = TCPManager()
async def start_tcp_server(
host: Optional[str] = None, port: Optional[int] = None
) -> None:
"""Convenience function to start the global TCP server.
Parameters
----------
host : str, optional
Bind address. Defaults to ``settings.TCP_HOST``.
port : int, optional
Bind port. Defaults to ``settings.TCP_PORT``.
"""
await tcp_manager.start(
host=host or settings.TCP_HOST,
port=port or settings.TCP_PORT,
)