""" KKS Bluetooth Badge Protocol Packet Parser Parses raw TCP byte streams into structured Python dictionaries. Handles both short (0x7878, 1-byte length) and long (0x7979, 2-byte length) start markers. """ from __future__ import annotations import struct from datetime import datetime, timezone from typing import Any, Dict, List, Tuple from .constants import ( ALARM_TYPES, ATTENDANCE_STATUS_MASK, ATTENDANCE_STATUS_SHIFT, ATTENDANCE_TYPES, DATA_REPORT_MODES, GSM_SIGNAL_LEVELS, PROTOCOL_NAMES, VOLTAGE_LEVELS, PROTO_ADDRESS_QUERY, PROTO_ALARM_LBS_4G, PROTO_ALARM_MULTI_FENCE, PROTO_ALARM_SINGLE_FENCE, PROTO_ALARM_WIFI, PROTO_ATTENDANCE, PROTO_ATTENDANCE_4G, PROTO_BT_LOCATION, PROTO_BT_PUNCH, PROTO_GENERAL_INFO, PROTO_GPS, PROTO_GPS_4G, PROTO_HEARTBEAT, PROTO_HEARTBEAT_EXT, PROTO_LBS_4G, PROTO_LBS_ADDRESS_REQ, PROTO_LBS_MULTI, PROTO_LOGIN, PROTO_MESSAGE, PROTO_ONLINE_CMD, PROTO_ONLINE_CMD_REPLY, PROTO_TIME_SYNC, PROTO_TIME_SYNC_2, PROTO_WIFI, PROTO_WIFI_4G, START_MARKER_LONG, START_MARKER_SHORT, STOP_MARKER, ) from .crc import crc_itu class PacketParser: """Parses KKS protocol packets from raw TCP byte data.""" # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def parse(self, data: bytes) -> Dict[str, Any]: """ Parse a single raw TCP packet into a structured dictionary. Parameters ---------- data : bytes A complete packet including start marker and stop marker. Returns ------- dict Parsed packet fields. Always contains at least: ``start_marker``, ``packet_length``, ``protocol_number``, ``protocol_name``, ``serial_number``, ``crc``, ``crc_valid``, and ``info_content`` (raw bytes of the information field). Raises ------ ValueError If *data* is too short or has invalid markers. """ if len(data) < 10: raise ValueError(f"Packet too short: {len(data)} bytes") start = data[:2] if start == START_MARKER_SHORT: is_long = False elif start == START_MARKER_LONG: is_long = True else: raise ValueError(f"Invalid start marker: {start.hex()}") # Parse length if is_long: packet_length = struct.unpack("!H", data[2:4])[0] header_size = 4 # 2 marker + 2 length else: packet_length = data[2] header_size = 3 # 2 marker + 1 length # Validate stop marker total_len = header_size + packet_length + 2 # +2 for stop marker if len(data) < total_len: raise ValueError( f"Packet data too short: expected {total_len}, got {len(data)}" ) stop = data[total_len - 2 : total_len] if stop != STOP_MARKER: raise ValueError(f"Invalid stop marker: {stop.hex()}") # Extract payload (everything between length field and stop marker) payload = data[header_size : total_len - 2] # payload = protocol(1) + info_content(N) + serial(2) + crc(2) protocol_number = payload[0] serial_number = struct.unpack("!H", payload[-4:-2])[0] crc_received = struct.unpack("!H", payload[-2:])[0] # CRC is computed over the length field(s) + payload minus the CRC itself if is_long: crc_data = data[2:4] + payload[:-2] else: crc_data = data[2:3] + payload[:-2] crc_computed = crc_itu(crc_data) crc_valid = crc_computed == crc_received info_content = payload[1:-4] result: Dict[str, Any] = { "start_marker": start, "is_long_packet": is_long, "packet_length": packet_length, "protocol_number": protocol_number, "protocol_name": PROTOCOL_NAMES.get(protocol_number, f"Unknown(0x{protocol_number:02X})"), "serial_number": serial_number, "crc": crc_received, "crc_valid": crc_valid, "info_content": info_content, "raw": data[:total_len], } # Dispatch to protocol-specific parser handler = self._PROTOCOL_HANDLERS.get(protocol_number) if handler is not None: try: parsed = handler(self, info_content) result.update(parsed) except Exception as exc: result["parse_error"] = str(exc) return result def find_packets(self, buffer: bytes) -> List[Tuple[Dict[str, Any], int]]: """ Scan *buffer* for all valid packets. Returns a list of ``(parsed_dict, consumed_bytes)`` tuples. Skips over invalid data to find the next valid start marker. """ results: List[Tuple[Dict[str, Any], int]] = [] offset = 0 while offset < len(buffer) - 9: # Scan for a start marker idx_short = buffer.find(START_MARKER_SHORT, offset) idx_long = buffer.find(START_MARKER_LONG, offset) # Pick the earliest marker found candidates = [i for i in (idx_short, idx_long) if i >= 0] if not candidates: break idx = min(candidates) is_long = buffer[idx : idx + 2] == START_MARKER_LONG header_size = 4 if is_long else 3 if idx + header_size > len(buffer): break # Read length if is_long: if idx + 4 > len(buffer): break pkt_len = struct.unpack("!H", buffer[idx + 2 : idx + 4])[0] else: pkt_len = buffer[idx + 2] total_len = header_size + pkt_len + 2 # +2 stop marker if idx + total_len > len(buffer): # Not enough data yet offset = idx + 1 continue # Verify stop marker if buffer[idx + total_len - 2 : idx + total_len] != STOP_MARKER: offset = idx + 1 continue pkt_data = buffer[idx : idx + total_len] try: parsed = self.parse(pkt_data) results.append((parsed, idx + total_len)) offset = idx + total_len except ValueError: offset = idx + 1 return results # ------------------------------------------------------------------ # Common field helpers # ------------------------------------------------------------------ @staticmethod def parse_datetime(data: bytes, offset: int = 0) -> Dict[str, Any]: """ Parse a 6-byte datetime field (YY MM DD HH MM SS) starting at *offset*. Returns a dict with ``year``, ``month``, ``day``, ``hour``, ``minute``, ``second``, and ``datetime`` (a :class:`datetime.datetime` object). """ if len(data) < offset + 6: raise ValueError("Not enough data for datetime") yy, mo, dd, hh, mi, ss = struct.unpack_from("BBBBBB", data, offset) year = 2000 + yy try: dt = datetime(year, mo, dd, hh, mi, ss, tzinfo=timezone.utc) except ValueError: dt = None return { "year": year, "month": mo, "day": dd, "hour": hh, "minute": mi, "second": ss, "datetime": dt, } @staticmethod def parse_gps(data: bytes, offset: int = 0) -> Dict[str, Any]: """ Parse GPS information content. Layout (12 bytes): - 1 byte: upper 4 bits = GPS info length, lower 4 bits = satellite count - 4 bytes: latitude (uint32) -> divide by 1_800_000.0 - 4 bytes: longitude (uint32) -> divide by 1_800_000.0 - 1 byte: speed (km/h) - 2 bytes: course/status flags Course/Status 16-bit field: - bit 15..13: reserved - bit 12: GPS is real-time (1) or differential (0) - bit 11: GPS positioned (1) or not (0) - bit 10: longitude hemisphere 1=East, 0=West - bit 9: latitude hemisphere 1=North (south=0) - bit 8..0: course (0-360 degrees, lower 10 bits across bits 9..0 -- actually bits 9-0 = 10 bits for course) Note: the hemisphere bits need careful handling. The 16-bit value: bit15 bit14 bit13 bit12 bit11 bit10 bit9 bit8 bit7..0 rsv rsv rsv real fix E/W N/S course[9:8] course[7:0] """ if len(data) < offset + 12: raise ValueError("Not enough data for GPS info") gps_byte = data[offset] gps_info_length = (gps_byte >> 4) & 0x0F satellite_count = gps_byte & 0x0F lat_raw = struct.unpack_from("!I", data, offset + 1)[0] lon_raw = struct.unpack_from("!I", data, offset + 5)[0] speed = data[offset + 9] course_status = struct.unpack_from("!H", data, offset + 10)[0] # Decode course/status (per protocol doc): # bit 13 (0x2000): GPS real-time differential positioning # bit 12 (0x1000): GPS positioned # bit 11 (0x0800): 0=East, 1=West (东经/西经) # bit 10 (0x0400): 0=South, 1=North (南纬/北纬) # bits 9-0: course (0-360) is_realtime = bool(course_status & 0x2000) is_gps_positioned = bool(course_status & 0x1000) is_west = bool(course_status & 0x0800) is_north = bool(course_status & 0x0400) course = course_status & 0x03FF latitude = lat_raw / 1_800_000.0 longitude = lon_raw / 1_800_000.0 if not is_north: latitude = -latitude if is_west: longitude = -longitude return { "gps_info_length": gps_info_length, "satellite_count": satellite_count, "latitude_raw": lat_raw, "longitude_raw": lon_raw, "latitude": latitude, "longitude": longitude, "speed": speed, "course": course, "is_realtime": is_realtime, "is_gps_positioned": is_gps_positioned, "is_west": is_west, "is_north": is_north, "course_status_raw": course_status, } @staticmethod def parse_mcc_mnc(data: bytes, offset: int = 0) -> Tuple[Dict[str, Any], int]: """ Parse MCC and MNC from data. MCC is 2 bytes. The high bit of MCC indicates whether MNC is 1 byte (high bit clear) or 2 bytes (high bit set). Returns ``(result_dict, bytes_consumed)``. """ if len(data) < offset + 3: raise ValueError("Not enough data for MCC/MNC") mcc_raw = struct.unpack_from("!H", data, offset)[0] mnc_2byte = bool(mcc_raw & 0x8000) mcc = mcc_raw & 0x7FFF if mnc_2byte: if len(data) < offset + 4: raise ValueError("Not enough data for 2-byte MNC") mnc = struct.unpack_from("!H", data, offset + 2)[0] consumed = 4 else: mnc = data[offset + 2] consumed = 3 return {"mcc": mcc, "mnc": mnc, "mnc_2byte": mnc_2byte}, consumed @staticmethod def parse_lbs_station( data: bytes, offset: int = 0, *, lac_size: int = 2, cell_id_size: int = 3, ) -> Tuple[Dict[str, Any], int]: """ Parse a single LBS station (LAC + Cell ID + RSSI). Parameters ---------- lac_size : int 2 for standard, 4 for 4G protocols. cell_id_size : int 2 for standard, 8 for 4G protocols. Returns ``(station_dict, bytes_consumed)``. """ consumed = 0 if lac_size == 2: lac = struct.unpack_from("!H", data, offset)[0] else: lac = struct.unpack_from("!I", data, offset)[0] consumed += lac_size if cell_id_size == 2: cell_id = struct.unpack_from("!H", data, offset + consumed)[0] elif cell_id_size == 3: cell_id = int.from_bytes(data[offset + consumed : offset + consumed + 3], "big") elif cell_id_size == 4: cell_id = struct.unpack_from("!I", data, offset + consumed)[0] else: # 8 cell_id = struct.unpack_from("!Q", data, offset + consumed)[0] consumed += cell_id_size rssi = data[offset + consumed] consumed += 1 return {"lac": lac, "cell_id": cell_id, "rssi": rssi}, consumed @staticmethod def parse_wifi_list(data: bytes, offset: int = 0, count: int = 0) -> Tuple[List[Dict[str, Any]], int]: """ Parse a list of WIFI access points. Each entry: 6-byte MAC address + 1-byte signal strength. Returns ``(wifi_list, bytes_consumed)``. """ wifi_entries: List[Dict[str, Any]] = [] consumed = 0 for _ in range(count): if offset + consumed + 7 > len(data): break mac_bytes = data[offset + consumed : offset + consumed + 6] mac = ":".join(f"{b:02X}" for b in mac_bytes) strength = data[offset + consumed + 6] wifi_entries.append({"mac": mac, "strength": strength}) consumed += 7 return wifi_entries, consumed # ------------------------------------------------------------------ # Protocol-specific parsers # ------------------------------------------------------------------ def _parse_login(self, info: bytes) -> Dict[str, Any]: """0x01 Login: 8-byte IMEI (BCD) + type_code(2) + timezone_language(2).""" result: Dict[str, Any] = {} if len(info) < 8: raise ValueError("Login packet info too short") # IMEI: 8 bytes BCD-encoded -> 16 hex digits, first digit is padding 0 imei_hex = info[:8].hex() # e.g. "0123456789123456" # IMEI is 15 digits; BCD encoding pads with a leading 0 to fill 8 bytes if len(imei_hex) == 16 and imei_hex[0] == "0": terminal_id = imei_hex[1:] # strip the single padding nibble else: terminal_id = imei_hex[-15:] # fallback: take last 15 digits result["terminal_id"] = terminal_id pos = 8 if len(info) >= pos + 2: result["type_code"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if len(info) >= pos + 2: tz_lang = struct.unpack_from("!H", info, pos)[0] result["timezone_language"] = tz_lang # Decode timezone: bits 15-4 = timezone * 100, bit 3 = sign (1=west) tz_value = (tz_lang >> 4) & 0x0FFF tz_sign = -1 if (tz_lang & 0x08) else 1 result["timezone_offset"] = tz_sign * tz_value / 100.0 result["language"] = tz_lang & 0x07 pos += 2 return result def _parse_heartbeat(self, info: bytes) -> Dict[str, Any]: """0x13 Heartbeat: terminal_info(1) + battery(1) + gsm_signal(1) + reserved(2).""" result: Dict[str, Any] = {} if len(info) < 1: raise ValueError("Heartbeat packet info too short") ti = info[0] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } if len(info) >= 2: result["battery_level"] = info[1] if len(info) >= 3: result["gsm_signal"] = info[2] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[2], "Unknown") if len(info) >= 5: result["reserved"] = struct.unpack_from("!H", info, 3)[0] return result def _parse_lbs_address_req(self, info: bytes) -> Dict[str, Any]: """0x17 LBS Address Request.""" result: Dict[str, Any] = {} pos = 0 if len(info) < pos + 9: raise ValueError("LBS Address Request info too short") mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed result["lac"] = struct.unpack_from("!H", info, pos)[0] pos += 2 result["cell_id"] = int.from_bytes(info[pos : pos + 3], "big") pos += 3 # Remaining bytes: phone number (BCD) + alarm_language if pos < len(info): # Phone number length is variable; alarm_language is last 2 bytes if len(info) - pos >= 2: phone_end = len(info) - 2 if phone_end > pos: result["phone_number"] = info[pos:phone_end].hex().rstrip("f") pos = phone_end result["alarm_language"] = struct.unpack_from("!H", info, pos)[0] return result def _parse_address_query(self, info: bytes) -> Dict[str, Any]: """0x1A Address Query: datetime(6) + gps(12) + phone_number.""" result: Dict[str, Any] = {} pos = 0 dt = self.parse_datetime(info, pos) result["datetime"] = dt pos += 6 if len(info) >= pos + 12: gps = self.parse_gps(info, pos) result["gps_info"] = gps pos += 12 # Remaining is phone number in BCD if pos < len(info): result["phone_number"] = info[pos:].hex().rstrip("f") return result def _parse_time_sync(self, info: bytes) -> Dict[str, Any]: """0x1F Time Sync: datetime(6) + language(2).""" result: Dict[str, Any] = {} if len(info) >= 6: result["datetime"] = self.parse_datetime(info, 0) if len(info) >= 8: result["language"] = struct.unpack_from("!H", info, 6)[0] return result def _parse_gps_packet(self, info: bytes) -> Dict[str, Any]: """0x22 GPS: datetime(6) + gps(12) + mcc(2) + mnc(1-2) + lac(2) + cell_id(3) + acc(1) + report_mode(1) + realtime_upload(1) + mileage(4).""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 result["gps_info"] = self.parse_gps(info, pos) pos += 12 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 2: result["lac"] = struct.unpack_from("!H", info, pos)[0] pos += 2 # 2G Cell ID is 3 bytes (not 2) if len(info) >= pos + 3: result["cell_id"] = int.from_bytes(info[pos:pos + 3], "big") pos += 3 if len(info) >= pos + 1: result["acc"] = info[pos] pos += 1 if len(info) >= pos + 1: result["report_mode"] = info[pos] result["report_mode_name"] = DATA_REPORT_MODES.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 1: result["realtime_upload"] = info[pos] pos += 1 if len(info) >= pos + 4: result["mileage"] = struct.unpack_from("!I", info, pos)[0] pos += 4 return result def _parse_lbs_multi(self, info: bytes) -> Dict[str, Any]: """0x28 LBS Multi: datetime(6) + mcc(2) + mnc(1-2) + main_station + 6 neighbors + timing_advance(1) + language(2).""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): # main + 6 neighbors if len(info) < pos + 6: # LAC(2) + CellID(3) + RSSI(1) = 6 break station, consumed = self.parse_lbs_station(info, pos) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if stations: result["main_station"] = stations[0] if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 2: result["language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 return result def _parse_wifi(self, info: bytes) -> Dict[str, Any]: """0x2C WIFI: datetime(6) + mcc(2) + mnc(1-2) + main+6 stations + timing_advance(1) + wifi_count(1) + wifi_list.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 6: # LAC(2) + CellID(3) + RSSI(1) = 6 break station, consumed = self.parse_lbs_station(info, pos) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed return result def _parse_heartbeat_ext(self, info: bytes) -> Dict[str, Any]: """0x36 Heartbeat Extended: terminal_info(1) + battery(1) + gsm_signal(1) + language(2) + extension(N).""" result: Dict[str, Any] = {} pos = 0 if len(info) < 1: raise ValueError("Heartbeat Ext info too short") ti = info[pos] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } pos += 1 if len(info) >= pos + 1: result["battery_level"] = info[pos] pos += 1 if len(info) >= pos + 1: result["gsm_signal"] = info[pos] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 2: result["language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if pos < len(info): result["extension_modules"] = info[pos:] return result def _parse_time_sync_2(self, info: bytes) -> Dict[str, Any]: """0x8A Time Sync 2: empty info body (serial + CRC only).""" return {} def _parse_general_info(self, info: bytes) -> Dict[str, Any]: """0x94 General Info: sub_protocol(1) + data_content(N).""" result: Dict[str, Any] = {} if len(info) >= 1: result["sub_protocol"] = info[0] result["data_content"] = info[1:] if len(info) > 1 else b"" return result def _parse_gps_4g(self, info: bytes) -> Dict[str, Any]: """0xA0 GPS 4G: like 0x22 but LAC=4 bytes, CellID=8 bytes.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 result["gps_info"] = self.parse_gps(info, pos) pos += 12 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 4: result["lac"] = struct.unpack_from("!I", info, pos)[0] pos += 4 if len(info) >= pos + 8: result["cell_id"] = struct.unpack_from("!Q", info, pos)[0] pos += 8 if len(info) >= pos + 1: result["acc"] = info[pos] pos += 1 if len(info) >= pos + 1: result["report_mode"] = info[pos] result["report_mode_name"] = DATA_REPORT_MODES.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 1: result["realtime_upload"] = info[pos] pos += 1 if len(info) >= pos + 4: result["mileage"] = struct.unpack_from("!I", info, pos)[0] pos += 4 return result def _parse_lbs_4g(self, info: bytes) -> Dict[str, Any]: """0xA1 LBS 4G: like 0x28 but LAC=4 bytes, CellID=8 bytes per station.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 13: # 4 + 8 + 1 break station, consumed = self.parse_lbs_station( info, pos, lac_size=4, cell_id_size=8 ) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if stations: result["main_station"] = stations[0] if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 2: result["language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 return result def _parse_wifi_4g(self, info: bytes) -> Dict[str, Any]: """0xA2 WIFI 4G: like 0x2C but LAC=4 bytes, CellID=8 bytes.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 13: break station, consumed = self.parse_lbs_station( info, pos, lac_size=4, cell_id_size=8 ) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed return result @staticmethod def _parse_alarm_tail(info: bytes, pos: int) -> Tuple[Dict[str, Any], int]: """Parse common alarm tail: terminal_info(1) + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1).""" result: Dict[str, Any] = {} if len(info) >= pos + 1: ti = info[pos] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } pos += 1 if len(info) >= pos + 1: voltage_level = info[pos] result["voltage_level"] = voltage_level result["voltage_name"] = VOLTAGE_LEVELS.get(voltage_level, "Unknown") result["battery_level"] = min(voltage_level * 17, 100) if voltage_level <= 6 else None pos += 1 if len(info) >= pos + 1: result["gsm_signal"] = info[pos] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 1: alarm_code = info[pos] result["alarm_code"] = alarm_code result["alarm_type"] = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}") pos += 1 if len(info) >= pos + 1: result["language"] = info[pos] pos += 1 return result, pos def _parse_alarm_single_fence(self, info: bytes) -> Dict[str, Any]: """0xA3 Single Fence Alarm: datetime(6) + gps(12) + lbs_length(1) + mcc(2) + mnc(1-2) + lac(4) + cell_id(8) + terminal_info(1) + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1).""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 if len(info) >= pos + 12: result["gps_info"] = self.parse_gps(info, pos) pos += 12 if len(info) >= pos + 1: result["lbs_length"] = info[pos] pos += 1 if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 4: result["lac"] = struct.unpack_from("!I", info, pos)[0] pos += 4 if len(info) >= pos + 8: result["cell_id"] = struct.unpack_from("!Q", info, pos)[0] pos += 8 tail, pos = self._parse_alarm_tail(info, pos) result.update(tail) return result def _parse_alarm_lbs_4g(self, info: bytes) -> Dict[str, Any]: """0xA5 LBS 4G Alarm: NO datetime, NO GPS, NO lbs_length. Content starts directly with MCC(2) + MNC(1-2) + LAC(4) + CellID(8) + terminal_info(1) + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1). """ result: Dict[str, Any] = {} pos = 0 # content starts directly with MCC if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 4: result["lac"] = struct.unpack_from("!I", info, pos)[0] pos += 4 if len(info) >= pos + 8: result["cell_id"] = struct.unpack_from("!Q", info, pos)[0] pos += 8 tail, pos = self._parse_alarm_tail(info, pos) result.update(tail) return result def _parse_alarm_wifi(self, info: bytes) -> Dict[str, Any]: """0xA9 WIFI Alarm: datetime(6) + MCC(2) + MNC(1-2) + cell_type(1) + cell_count(1) + [cell_stations] + timing_advance(1) + wifi_count(1) + [wifi_list] + alarm_code(1) + language(1). No GPS block, no lbs_length. Independent format. """ result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed # cell_type(1) + cell_count(1) cell_type = 0 # 0=2G, 1=4G cell_count = 0 if len(info) >= pos + 2: cell_type = info[pos] cell_count = info[pos + 1] result["cell_type"] = cell_type result["cell_count"] = cell_count pos += 2 # Parse cell stations stations: List[Dict[str, Any]] = [] for i in range(cell_count): if cell_type == 1: # 4G: LAC(4) + CI(8) + RSSI(1) = 13 bytes if len(info) < pos + 13: break station, consumed = self.parse_lbs_station(info, pos, lac_size=4, cell_id_size=8) stations.append(station) pos += consumed else: # 2G: LAC(2) + CI(3) + RSSI(1) = 6 bytes if len(info) < pos + 6: break lac_val = struct.unpack_from("!H", info, pos)[0] ci_val = int.from_bytes(info[pos + 2:pos + 5], "big") rssi_val = info[pos + 5] stations.append({"lac": lac_val, "cell_id": ci_val, "rssi": rssi_val}) pos += 6 result["stations"] = stations # timing_advance(1) if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 # WiFi APs: wifi_count(1) + [mac(6) + signal(1)]*N if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed # alarm_code(1) + language(1) if len(info) >= pos + 1: alarm_code = info[pos] result["alarm_code"] = alarm_code result["alarm_type"] = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}") pos += 1 if len(info) >= pos + 1: result["language"] = info[pos] pos += 1 return result def _parse_attendance(self, info: bytes) -> Dict[str, Any]: """0xB0 Attendance: datetime(6) + gps_positioned(1) + reserved(2) + GPS(12) + terminal_info(1) + voltage_level(1) + gsm_signal(1) + reserved_ext(2) + MCC/MNC + 7 stations(LAC2+CI3+RSSI) + TA(1) + wifi_count(1) + wifi_list. """ result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 # GPS positioned flag (1 byte) if len(info) > pos: result["gps_positioned"] = info[pos] == 1 pos += 1 # Terminal reserved (2 bytes) if len(info) >= pos + 2: result["terminal_reserved"] = info[pos:pos + 2] pos += 2 # GPS data (12 bytes) if len(info) >= pos + 12: result["gps_info"] = self.parse_gps(info, pos) pos += 12 # Terminal info (1 byte) - clock_in/clock_out if len(info) > pos: ti = info[pos] result["terminal_info"] = ti status_code = (ti >> ATTENDANCE_STATUS_SHIFT) & ATTENDANCE_STATUS_MASK result["attendance_type"] = ATTENDANCE_TYPES.get(status_code, "unknown") pos += 1 # Voltage level (1 byte) if len(info) > pos: vl = info[pos] result["voltage_level"] = vl result["battery_level"] = min(vl * 17, 100) if vl <= 6 else None pos += 1 # GSM signal (1 byte) if len(info) > pos: result["gsm_signal"] = info[pos] pos += 1 # Reserved extension (2 bytes) if len(info) >= pos + 2: pos += 2 # LBS: MCC/MNC if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed # 7 stations: LAC(2) + CI(3) + RSSI(1) = 6 bytes each for 2G stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 6: break lac_val = struct.unpack_from("!H", info, pos)[0] ci_val = int.from_bytes(info[pos + 2:pos + 5], "big") rssi_val = info[pos + 5] station = {"lac": lac_val, "cell_id": ci_val, "rssi": rssi_val, "is_main": (i == 0)} stations.append(station) pos += 6 result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 # WIFI data if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed return result def _parse_attendance_4g(self, info: bytes) -> Dict[str, Any]: """0xB1 Attendance 4G: same layout as 0xB0 but MNC=2B fixed, LAC=4B, CI=8B.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 # GPS positioned flag (1 byte) if len(info) > pos: result["gps_positioned"] = info[pos] == 1 pos += 1 # Terminal reserved (2 bytes) if len(info) >= pos + 2: result["terminal_reserved"] = info[pos:pos + 2] pos += 2 # GPS data (12 bytes) if len(info) >= pos + 12: result["gps_info"] = self.parse_gps(info, pos) pos += 12 # Terminal info (1 byte) - clock_in/clock_out if len(info) > pos: ti = info[pos] result["terminal_info"] = ti status_code = (ti >> ATTENDANCE_STATUS_SHIFT) & ATTENDANCE_STATUS_MASK result["attendance_type"] = ATTENDANCE_TYPES.get(status_code, "unknown") pos += 1 # Voltage level (1 byte) if len(info) > pos: vl = info[pos] result["voltage_level"] = vl result["battery_level"] = min(vl * 17, 100) if vl <= 6 else None pos += 1 # GSM signal (1 byte) if len(info) > pos: result["gsm_signal"] = info[pos] pos += 1 # Reserved extension (2 bytes) if len(info) >= pos + 2: pos += 2 # 4G LBS: MCC(2, clear high bit) + MNC(2, fixed) + LAC(4) + CI(8) if len(info) >= pos + 2: mcc_raw = struct.unpack_from("!H", info, pos)[0] result["mcc"] = mcc_raw & 0x7FFF pos += 2 if len(info) >= pos + 2: result["mnc"] = struct.unpack_from("!H", info, pos)[0] result["mnc_2byte"] = True pos += 2 # 7 stations: LAC(4) + CI(8) + RSSI(1) = 13 bytes each stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 13: break station, consumed = self.parse_lbs_station( info, pos, lac_size=4, cell_id_size=8 ) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 # WIFI data if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed return result def _parse_bt_punch(self, info: bytes) -> Dict[str, Any]: """0xB2 BT Punch: datetime(6) + RSSI(1,signed) + MAC(6) + UUID(16) + Major(2) + Minor(2) + Battery(2) + TerminalInfo(1) + Reserved(2).""" result: Dict[str, Any] = {} pos = 0 if len(info) >= 6: result["datetime"] = self.parse_datetime(info, pos) pos += 6 # RSSI (1 byte, signed) if len(info) > pos: result["rssi"] = struct.unpack_from("b", info, pos)[0] pos += 1 # MAC address (6 bytes) if len(info) >= pos + 6: result["beacon_mac"] = ":".join(f"{b:02X}" for b in info[pos:pos + 6]) pos += 6 # UUID (16 bytes) if len(info) >= pos + 16: uuid_bytes = info[pos:pos + 16] result["beacon_uuid"] = ( f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-" f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-" f"{uuid_bytes[10:16].hex()}" ).upper() pos += 16 # Major (2 bytes) if len(info) >= pos + 2: result["beacon_major"] = struct.unpack_from("!H", info, pos)[0] pos += 2 # Minor (2 bytes) if len(info) >= pos + 2: result["beacon_minor"] = struct.unpack_from("!H", info, pos)[0] pos += 2 # Beacon battery (2 bytes, unit 0.01V) if len(info) >= pos + 2: raw_batt = struct.unpack_from("!H", info, pos)[0] result["beacon_battery"] = raw_batt * 0.01 result["beacon_battery_unit"] = "V" pos += 2 # Terminal info (1 byte) - clock_in/clock_out if len(info) > pos: ti = info[pos] result["terminal_info"] = ti status_code = (ti >> ATTENDANCE_STATUS_SHIFT) & ATTENDANCE_STATUS_MASK result["attendance_type"] = ATTENDANCE_TYPES.get(status_code, "clock_in") pos += 1 # Terminal reserved (2 bytes) if len(info) >= pos + 2: result["terminal_reserved"] = info[pos:pos + 2] pos += 2 return result def _parse_bt_location(self, info: bytes) -> Dict[str, Any]: """0xB3 BT Location: datetime(6) + beacon_count(1) + per-beacon(30 bytes each). Per beacon: RSSI(1,signed) + MAC(6) + UUID(16) + Major(2) + Minor(2) + Battery(2) + BattUnit(1) = 30 bytes. """ result: Dict[str, Any] = {} pos = 0 if len(info) >= 6: result["datetime"] = self.parse_datetime(info, pos) pos += 6 beacon_count = 0 if len(info) > pos: beacon_count = info[pos] result["beacon_count"] = beacon_count pos += 1 beacons: List[Dict[str, Any]] = [] for _ in range(beacon_count): if len(info) < pos + 30: break rssi = struct.unpack_from("b", info, pos)[0] pos += 1 mac = ":".join(f"{b:02X}" for b in info[pos:pos + 6]) pos += 6 uuid_bytes = info[pos:pos + 16] uuid_str = ( f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-" f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-" f"{uuid_bytes[10:16].hex()}" ).upper() pos += 16 major = struct.unpack_from("!H", info, pos)[0] pos += 2 minor = struct.unpack_from("!H", info, pos)[0] pos += 2 raw_batt = struct.unpack_from("!H", info, pos)[0] pos += 2 batt_unit_byte = info[pos] pos += 1 if batt_unit_byte == 0: battery_val = raw_batt * 0.01 battery_unit = "V" else: battery_val = float(raw_batt) battery_unit = "%" beacons.append({ "rssi": rssi, "mac": mac, "uuid": uuid_str, "major": major, "minor": minor, "battery": battery_val, "battery_unit": battery_unit, }) result["beacons"] = beacons return result def _parse_alarm_multi_fence(self, info: bytes) -> Dict[str, Any]: """0xA4 Multi Fence Alarm: same as 0xA3 + fence_id(1) at the end.""" result = self._parse_alarm_single_fence(info) # After the standard alarm fields, 0xA4 has an extra fence_id byte # We need to re-parse to find the fence_id position # The simplest approach: fence_id is the last unparsed byte # Since _parse_alarm_single_fence consumed up to language(1), # the fence_id follows it. Calculate position from the end. # Format ends with: ...alarm_code(1) + language(1) + fence_id(1) if len(info) > 0: result["fence_id"] = info[-1] return result def _parse_online_cmd_reply(self, info: bytes) -> Dict[str, Any]: """0x81 Online Command Reply: length(1) + server_flag(4) + content(N) + language(2).""" result: Dict[str, Any] = {} if len(info) < 1: return result result["cmd_length"] = info[0] pos = 1 if len(info) >= pos + 4: result["server_flag"] = struct.unpack_from("!I", info, pos)[0] pos += 4 # Content is between server_flag and language(2 bytes at end) if len(info) > pos + 2: try: result["response_content"] = info[pos:-2].decode("utf-8", errors="replace") except Exception: result["response_content"] = info[pos:-2].hex() result["language"] = struct.unpack_from("!H", info, len(info) - 2)[0] elif len(info) > pos: try: result["response_content"] = info[pos:].decode("utf-8", errors="replace") except Exception: result["response_content"] = info[pos:].hex() return result # ------------------------------------------------------------------ # Handler dispatch table (defined after methods so names resolve) # ------------------------------------------------------------------ _PROTOCOL_HANDLERS = { PROTO_LOGIN: _parse_login, PROTO_HEARTBEAT: _parse_heartbeat, PROTO_LBS_ADDRESS_REQ: _parse_lbs_address_req, PROTO_ADDRESS_QUERY: _parse_address_query, PROTO_TIME_SYNC: _parse_time_sync, PROTO_GPS: _parse_gps_packet, PROTO_LBS_MULTI: _parse_lbs_multi, PROTO_WIFI: _parse_wifi, PROTO_HEARTBEAT_EXT: _parse_heartbeat_ext, PROTO_TIME_SYNC_2: _parse_time_sync_2, PROTO_GENERAL_INFO: _parse_general_info, PROTO_GPS_4G: _parse_gps_4g, PROTO_LBS_4G: _parse_lbs_4g, PROTO_WIFI_4G: _parse_wifi_4g, PROTO_ALARM_SINGLE_FENCE: _parse_alarm_single_fence, PROTO_ALARM_MULTI_FENCE: _parse_alarm_multi_fence, PROTO_ALARM_LBS_4G: _parse_alarm_lbs_4g, PROTO_ALARM_WIFI: _parse_alarm_wifi, PROTO_ONLINE_CMD_REPLY: _parse_online_cmd_reply, PROTO_ATTENDANCE: _parse_attendance, PROTO_ATTENDANCE_4G: _parse_attendance_4g, PROTO_BT_PUNCH: _parse_bt_punch, PROTO_BT_LOCATION: _parse_bt_location, }