""" KKS Bluetooth Badge Protocol Packet Parser Parses raw TCP byte streams into structured Python dictionaries. Handles both short (0x7878, 1-byte length) and long (0x7979, 2-byte length) start markers. """ from __future__ import annotations import struct from datetime import datetime, timezone from typing import Any, Dict, List, Tuple from .constants import ( ALARM_TYPES, DATA_REPORT_MODES, GSM_SIGNAL_LEVELS, PROTOCOL_NAMES, PROTO_ADDRESS_QUERY, PROTO_ALARM_LBS_4G, PROTO_ALARM_MULTI_FENCE, PROTO_ALARM_SINGLE_FENCE, PROTO_ALARM_WIFI, PROTO_ATTENDANCE, PROTO_ATTENDANCE_4G, PROTO_BT_LOCATION, PROTO_BT_PUNCH, PROTO_GENERAL_INFO, PROTO_GPS, PROTO_GPS_4G, PROTO_HEARTBEAT, PROTO_HEARTBEAT_EXT, PROTO_LBS_4G, PROTO_LBS_ADDRESS_REQ, PROTO_LBS_MULTI, PROTO_LOGIN, PROTO_MESSAGE, PROTO_ONLINE_CMD, PROTO_ONLINE_CMD_REPLY, PROTO_TIME_SYNC, PROTO_TIME_SYNC_2, PROTO_WIFI, PROTO_WIFI_4G, START_MARKER_LONG, START_MARKER_SHORT, STOP_MARKER, ) from .crc import crc_itu class PacketParser: """Parses KKS protocol packets from raw TCP byte data.""" # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def parse(self, data: bytes) -> Dict[str, Any]: """ Parse a single raw TCP packet into a structured dictionary. Parameters ---------- data : bytes A complete packet including start marker and stop marker. Returns ------- dict Parsed packet fields. Always contains at least: ``start_marker``, ``packet_length``, ``protocol_number``, ``protocol_name``, ``serial_number``, ``crc``, ``crc_valid``, and ``info_content`` (raw bytes of the information field). Raises ------ ValueError If *data* is too short or has invalid markers. """ if len(data) < 10: raise ValueError(f"Packet too short: {len(data)} bytes") start = data[:2] if start == START_MARKER_SHORT: is_long = False elif start == START_MARKER_LONG: is_long = True else: raise ValueError(f"Invalid start marker: {start.hex()}") # Parse length if is_long: packet_length = struct.unpack("!H", data[2:4])[0] header_size = 4 # 2 marker + 2 length else: packet_length = data[2] header_size = 3 # 2 marker + 1 length # Validate stop marker total_len = header_size + packet_length + 2 # +2 for stop marker if len(data) < total_len: raise ValueError( f"Packet data too short: expected {total_len}, got {len(data)}" ) stop = data[total_len - 2 : total_len] if stop != STOP_MARKER: raise ValueError(f"Invalid stop marker: {stop.hex()}") # Extract payload (everything between length field and stop marker) payload = data[header_size : total_len - 2] # payload = protocol(1) + info_content(N) + serial(2) + crc(2) protocol_number = payload[0] serial_number = struct.unpack("!H", payload[-4:-2])[0] crc_received = struct.unpack("!H", payload[-2:])[0] # CRC is computed over the length field(s) + payload minus the CRC itself if is_long: crc_data = data[2:4] + payload[:-2] else: crc_data = data[2:3] + payload[:-2] crc_computed = crc_itu(crc_data) crc_valid = crc_computed == crc_received info_content = payload[1:-4] result: Dict[str, Any] = { "start_marker": start, "is_long_packet": is_long, "packet_length": packet_length, "protocol_number": protocol_number, "protocol_name": PROTOCOL_NAMES.get(protocol_number, f"Unknown(0x{protocol_number:02X})"), "serial_number": serial_number, "crc": crc_received, "crc_valid": crc_valid, "info_content": info_content, "raw": data[:total_len], } # Dispatch to protocol-specific parser handler = self._PROTOCOL_HANDLERS.get(protocol_number) if handler is not None: try: parsed = handler(self, info_content) result.update(parsed) except Exception as exc: result["parse_error"] = str(exc) return result def find_packets(self, buffer: bytes) -> List[Tuple[Dict[str, Any], int]]: """ Scan *buffer* for all valid packets. Returns a list of ``(parsed_dict, consumed_bytes)`` tuples. Skips over invalid data to find the next valid start marker. """ results: List[Tuple[Dict[str, Any], int]] = [] offset = 0 while offset < len(buffer) - 9: # Scan for a start marker idx_short = buffer.find(START_MARKER_SHORT, offset) idx_long = buffer.find(START_MARKER_LONG, offset) # Pick the earliest marker found candidates = [i for i in (idx_short, idx_long) if i >= 0] if not candidates: break idx = min(candidates) is_long = buffer[idx : idx + 2] == START_MARKER_LONG header_size = 4 if is_long else 3 if idx + header_size > len(buffer): break # Read length if is_long: if idx + 4 > len(buffer): break pkt_len = struct.unpack("!H", buffer[idx + 2 : idx + 4])[0] else: pkt_len = buffer[idx + 2] total_len = header_size + pkt_len + 2 # +2 stop marker if idx + total_len > len(buffer): # Not enough data yet offset = idx + 1 continue # Verify stop marker if buffer[idx + total_len - 2 : idx + total_len] != STOP_MARKER: offset = idx + 1 continue pkt_data = buffer[idx : idx + total_len] try: parsed = self.parse(pkt_data) results.append((parsed, idx + total_len)) offset = idx + total_len except ValueError: offset = idx + 1 return results # ------------------------------------------------------------------ # Common field helpers # ------------------------------------------------------------------ @staticmethod def parse_datetime(data: bytes, offset: int = 0) -> Dict[str, Any]: """ Parse a 6-byte datetime field (YY MM DD HH MM SS) starting at *offset*. Returns a dict with ``year``, ``month``, ``day``, ``hour``, ``minute``, ``second``, and ``datetime`` (a :class:`datetime.datetime` object). """ if len(data) < offset + 6: raise ValueError("Not enough data for datetime") yy, mo, dd, hh, mi, ss = struct.unpack_from("BBBBBB", data, offset) year = 2000 + yy try: dt = datetime(year, mo, dd, hh, mi, ss, tzinfo=timezone.utc) except ValueError: dt = None return { "year": year, "month": mo, "day": dd, "hour": hh, "minute": mi, "second": ss, "datetime": dt, } @staticmethod def parse_gps(data: bytes, offset: int = 0) -> Dict[str, Any]: """ Parse GPS information content. Layout (12 bytes): - 1 byte: upper 4 bits = GPS info length, lower 4 bits = satellite count - 4 bytes: latitude (uint32) -> divide by 1_800_000.0 - 4 bytes: longitude (uint32) -> divide by 1_800_000.0 - 1 byte: speed (km/h) - 2 bytes: course/status flags Course/Status 16-bit field: - bit 15..13: reserved - bit 12: GPS is real-time (1) or differential (0) - bit 11: GPS positioned (1) or not (0) - bit 10: longitude hemisphere 1=East, 0=West - bit 9: latitude hemisphere 1=North (south=0) - bit 8..0: course (0-360 degrees, lower 10 bits across bits 9..0 -- actually bits 9-0 = 10 bits for course) Note: the hemisphere bits need careful handling. The 16-bit value: bit15 bit14 bit13 bit12 bit11 bit10 bit9 bit8 bit7..0 rsv rsv rsv real fix E/W N/S course[9:8] course[7:0] """ if len(data) < offset + 12: raise ValueError("Not enough data for GPS info") gps_byte = data[offset] gps_info_length = (gps_byte >> 4) & 0x0F satellite_count = gps_byte & 0x0F lat_raw = struct.unpack_from("!I", data, offset + 1)[0] lon_raw = struct.unpack_from("!I", data, offset + 5)[0] speed = data[offset + 9] course_status = struct.unpack_from("!H", data, offset + 10)[0] # Decode course/status is_realtime = bool(course_status & 0x2000) # bit 13 (from MSB: bit 12 if 0-indexed from MSB) is_gps_positioned = bool(course_status & 0x1000) # bit 12 -> actually bit 11 is_east = bool(course_status & 0x0800) # bit 11 -> bit 10 is_north = bool(course_status & 0x0400) # bit 10 -> bit 9 course = course_status & 0x03FF # lower 10 bits # Wait -- the standard mapping for this protocol: # bit 13 (0x2000): real-time GPS # bit 12 (0x1000): GPS is positioned # bit 11 (0x0800): East longitude (0=West) # bit 10 (0x0400): North latitude (0=South, but spec says 1=South sometimes) # We'll use the most common convention: bit10=1 means South latitude is *negated*. # Actually, common convention: bit10 = 0 -> South, bit10 = 1 -> North? No -- # In most implementations of this protocol family: # bit 10 (0x0400): 1 = North latitude, 0 = South # We'll go with that. latitude = lat_raw / 1_800_000.0 longitude = lon_raw / 1_800_000.0 if not is_north: latitude = -latitude if not is_east: longitude = -longitude return { "gps_info_length": gps_info_length, "satellite_count": satellite_count, "latitude_raw": lat_raw, "longitude_raw": lon_raw, "latitude": latitude, "longitude": longitude, "speed": speed, "course": course, "is_realtime": is_realtime, "is_gps_positioned": is_gps_positioned, "is_east": is_east, "is_north": is_north, "course_status_raw": course_status, } @staticmethod def parse_mcc_mnc(data: bytes, offset: int = 0) -> Tuple[Dict[str, Any], int]: """ Parse MCC and MNC from data. MCC is 2 bytes. The high bit of MCC indicates whether MNC is 1 byte (high bit clear) or 2 bytes (high bit set). Returns ``(result_dict, bytes_consumed)``. """ if len(data) < offset + 3: raise ValueError("Not enough data for MCC/MNC") mcc_raw = struct.unpack_from("!H", data, offset)[0] mnc_2byte = bool(mcc_raw & 0x8000) mcc = mcc_raw & 0x7FFF if mnc_2byte: if len(data) < offset + 4: raise ValueError("Not enough data for 2-byte MNC") mnc = struct.unpack_from("!H", data, offset + 2)[0] consumed = 4 else: mnc = data[offset + 2] consumed = 3 return {"mcc": mcc, "mnc": mnc, "mnc_2byte": mnc_2byte}, consumed @staticmethod def parse_lbs_station( data: bytes, offset: int = 0, *, lac_size: int = 2, cell_id_size: int = 2, ) -> Tuple[Dict[str, Any], int]: """ Parse a single LBS station (LAC + Cell ID + RSSI). Parameters ---------- lac_size : int 2 for standard, 4 for 4G protocols. cell_id_size : int 2 for standard, 8 for 4G protocols. Returns ``(station_dict, bytes_consumed)``. """ consumed = 0 if lac_size == 2: lac = struct.unpack_from("!H", data, offset)[0] else: lac = struct.unpack_from("!I", data, offset)[0] consumed += lac_size if cell_id_size == 2: cell_id = struct.unpack_from("!H", data, offset + consumed)[0] elif cell_id_size == 4: cell_id = struct.unpack_from("!I", data, offset + consumed)[0] else: # 8 cell_id = struct.unpack_from("!Q", data, offset + consumed)[0] consumed += cell_id_size rssi = data[offset + consumed] consumed += 1 return {"lac": lac, "cell_id": cell_id, "rssi": rssi}, consumed @staticmethod def parse_wifi_list(data: bytes, offset: int = 0, count: int = 0) -> Tuple[List[Dict[str, Any]], int]: """ Parse a list of WIFI access points. Each entry: 6-byte MAC address + 1-byte signal strength. Returns ``(wifi_list, bytes_consumed)``. """ wifi_entries: List[Dict[str, Any]] = [] consumed = 0 for _ in range(count): if offset + consumed + 7 > len(data): break mac_bytes = data[offset + consumed : offset + consumed + 6] mac = ":".join(f"{b:02X}" for b in mac_bytes) strength = data[offset + consumed + 6] wifi_entries.append({"mac": mac, "strength": strength}) consumed += 7 return wifi_entries, consumed # ------------------------------------------------------------------ # Protocol-specific parsers # ------------------------------------------------------------------ def _parse_login(self, info: bytes) -> Dict[str, Any]: """0x01 Login: 8-byte IMEI (BCD) + type_code(2) + timezone_language(2).""" result: Dict[str, Any] = {} if len(info) < 8: raise ValueError("Login packet info too short") # IMEI: 8 bytes BCD-encoded -> 16 hex digits, first digit is padding 0 imei_hex = info[:8].hex() # e.g. "0123456789123456" # IMEI is 15 digits; BCD encoding pads with a leading 0 to fill 8 bytes if len(imei_hex) == 16 and imei_hex[0] == "0": terminal_id = imei_hex[1:] # strip the single padding nibble else: terminal_id = imei_hex[-15:] # fallback: take last 15 digits result["terminal_id"] = terminal_id pos = 8 if len(info) >= pos + 2: result["type_code"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if len(info) >= pos + 2: tz_lang = struct.unpack_from("!H", info, pos)[0] result["timezone_language"] = tz_lang # Decode timezone: bits 15-4 = timezone * 100, bit 3 = sign (1=west) tz_value = (tz_lang >> 4) & 0x0FFF tz_sign = -1 if (tz_lang & 0x08) else 1 result["timezone_offset"] = tz_sign * tz_value / 100.0 result["language"] = tz_lang & 0x07 pos += 2 return result def _parse_heartbeat(self, info: bytes) -> Dict[str, Any]: """0x13 Heartbeat: terminal_info(1) + battery(1) + gsm_signal(1) + reserved(2).""" result: Dict[str, Any] = {} if len(info) < 1: raise ValueError("Heartbeat packet info too short") ti = info[0] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } if len(info) >= 2: result["battery_level"] = info[1] if len(info) >= 3: result["gsm_signal"] = info[2] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[2], "Unknown") if len(info) >= 5: result["reserved"] = struct.unpack_from("!H", info, 3)[0] return result def _parse_lbs_address_req(self, info: bytes) -> Dict[str, Any]: """0x17 LBS Address Request.""" result: Dict[str, Any] = {} pos = 0 if len(info) < pos + 9: raise ValueError("LBS Address Request info too short") mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed result["lac"] = struct.unpack_from("!H", info, pos)[0] pos += 2 result["cell_id"] = struct.unpack_from("!H", info, pos)[0] pos += 2 # Remaining bytes: phone number (BCD) + alarm_language if pos < len(info): # Phone number length is variable; alarm_language is last 2 bytes if len(info) - pos >= 2: phone_end = len(info) - 2 if phone_end > pos: result["phone_number"] = info[pos:phone_end].hex().rstrip("f") pos = phone_end result["alarm_language"] = struct.unpack_from("!H", info, pos)[0] return result def _parse_address_query(self, info: bytes) -> Dict[str, Any]: """0x1A Address Query: datetime(6) + gps(12) + phone_number.""" result: Dict[str, Any] = {} pos = 0 dt = self.parse_datetime(info, pos) result["datetime"] = dt pos += 6 if len(info) >= pos + 12: gps = self.parse_gps(info, pos) result["gps_info"] = gps pos += 12 # Remaining is phone number in BCD if pos < len(info): result["phone_number"] = info[pos:].hex().rstrip("f") return result def _parse_time_sync(self, info: bytes) -> Dict[str, Any]: """0x1F Time Sync: datetime(6) + language(2).""" result: Dict[str, Any] = {} if len(info) >= 6: result["datetime"] = self.parse_datetime(info, 0) if len(info) >= 8: result["language"] = struct.unpack_from("!H", info, 6)[0] return result def _parse_gps_packet(self, info: bytes) -> Dict[str, Any]: """0x22 GPS: datetime(6) + gps(12) + mcc(2) + mnc(1-2) + lac(2) + cell_id(2) + acc(1) + report_mode(1) + realtime_upload(1) + mileage(4).""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 result["gps_info"] = self.parse_gps(info, pos) pos += 12 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 2: result["lac"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if len(info) >= pos + 2: result["cell_id"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if len(info) >= pos + 1: result["acc"] = info[pos] pos += 1 if len(info) >= pos + 1: result["report_mode"] = info[pos] result["report_mode_name"] = DATA_REPORT_MODES.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 1: result["realtime_upload"] = info[pos] pos += 1 if len(info) >= pos + 4: result["mileage"] = struct.unpack_from("!I", info, pos)[0] pos += 4 return result def _parse_lbs_multi(self, info: bytes) -> Dict[str, Any]: """0x28 LBS Multi: datetime(6) + mcc(2) + mnc(1-2) + main_station + 6 neighbors + timing_advance(1) + language(2).""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): # main + 6 neighbors if len(info) < pos + 5: break station, consumed = self.parse_lbs_station(info, pos) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if stations: result["main_station"] = stations[0] if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 2: result["language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 return result def _parse_wifi(self, info: bytes) -> Dict[str, Any]: """0x2C WIFI: datetime(6) + mcc(2) + mnc(1-2) + main+6 stations + timing_advance(1) + wifi_count(1) + wifi_list.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 5: break station, consumed = self.parse_lbs_station(info, pos) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed return result def _parse_heartbeat_ext(self, info: bytes) -> Dict[str, Any]: """0x36 Heartbeat Extended: terminal_info(1) + battery(1) + gsm_signal(1) + language(2) + extension(N).""" result: Dict[str, Any] = {} pos = 0 if len(info) < 1: raise ValueError("Heartbeat Ext info too short") ti = info[pos] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } pos += 1 if len(info) >= pos + 1: result["battery_level"] = info[pos] pos += 1 if len(info) >= pos + 1: result["gsm_signal"] = info[pos] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 2: result["language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if pos < len(info): result["extension_modules"] = info[pos:] return result def _parse_time_sync_2(self, info: bytes) -> Dict[str, Any]: """0x8A Time Sync 2: empty info body (serial + CRC only).""" return {} def _parse_general_info(self, info: bytes) -> Dict[str, Any]: """0x94 General Info: sub_protocol(1) + data_content(N).""" result: Dict[str, Any] = {} if len(info) >= 1: result["sub_protocol"] = info[0] result["data_content"] = info[1:] if len(info) > 1 else b"" return result def _parse_gps_4g(self, info: bytes) -> Dict[str, Any]: """0xA0 GPS 4G: like 0x22 but LAC=4 bytes, CellID=8 bytes.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 result["gps_info"] = self.parse_gps(info, pos) pos += 12 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 4: result["lac"] = struct.unpack_from("!I", info, pos)[0] pos += 4 if len(info) >= pos + 8: result["cell_id"] = struct.unpack_from("!Q", info, pos)[0] pos += 8 if len(info) >= pos + 1: result["acc"] = info[pos] pos += 1 if len(info) >= pos + 1: result["report_mode"] = info[pos] result["report_mode_name"] = DATA_REPORT_MODES.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 1: result["realtime_upload"] = info[pos] pos += 1 if len(info) >= pos + 4: result["mileage"] = struct.unpack_from("!I", info, pos)[0] pos += 4 return result def _parse_lbs_4g(self, info: bytes) -> Dict[str, Any]: """0xA1 LBS 4G: like 0x28 but LAC=4 bytes, CellID=8 bytes per station.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 13: # 4 + 8 + 1 break station, consumed = self.parse_lbs_station( info, pos, lac_size=4, cell_id_size=8 ) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if stations: result["main_station"] = stations[0] if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 2: result["language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 return result def _parse_wifi_4g(self, info: bytes) -> Dict[str, Any]: """0xA2 WIFI 4G: like 0x2C but LAC=4 bytes, CellID=8 bytes.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 13: break station, consumed = self.parse_lbs_station( info, pos, lac_size=4, cell_id_size=8 ) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed return result def _parse_alarm_single_fence(self, info: bytes) -> Dict[str, Any]: """0xA3 Single Fence Alarm: datetime(6) + gps(12) + lbs_length(1) + mcc(2) + mnc(1-2) + lac(4) + cell_id(8) + terminal_info(1) + voltage(2) + gsm_signal(1) + alarm_language(2).""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 if len(info) >= pos + 12: result["gps_info"] = self.parse_gps(info, pos) pos += 12 if len(info) >= pos + 1: result["lbs_length"] = info[pos] pos += 1 if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 4: result["lac"] = struct.unpack_from("!I", info, pos)[0] pos += 4 if len(info) >= pos + 8: result["cell_id"] = struct.unpack_from("!Q", info, pos)[0] pos += 8 if len(info) >= pos + 1: ti = info[pos] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } pos += 1 if len(info) >= pos + 2: result["voltage"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if len(info) >= pos + 1: result["gsm_signal"] = info[pos] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 2: result["alarm_language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 return result def _parse_alarm_lbs_4g(self, info: bytes) -> Dict[str, Any]: """0xA5 LBS 4G Alarm: similar to 0xA3 but LBS-based.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 if len(info) >= pos + 1: result["lbs_length"] = info[pos] pos += 1 if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 4: result["lac"] = struct.unpack_from("!I", info, pos)[0] pos += 4 if len(info) >= pos + 8: result["cell_id"] = struct.unpack_from("!Q", info, pos)[0] pos += 8 if len(info) >= pos + 1: ti = info[pos] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } pos += 1 if len(info) >= pos + 2: result["voltage"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if len(info) >= pos + 1: result["gsm_signal"] = info[pos] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 2: result["alarm_language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 return result def _parse_alarm_wifi(self, info: bytes) -> Dict[str, Any]: """0xA9 WIFI Alarm: datetime + gps + lbs + terminal_info + voltage + gsm + wifi_count + wifi_list + alarm_language.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 if len(info) >= pos + 12: result["gps_info"] = self.parse_gps(info, pos) pos += 12 if len(info) >= pos + 1: result["lbs_length"] = info[pos] pos += 1 if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed if len(info) >= pos + 4: result["lac"] = struct.unpack_from("!I", info, pos)[0] pos += 4 if len(info) >= pos + 8: result["cell_id"] = struct.unpack_from("!Q", info, pos)[0] pos += 8 if len(info) >= pos + 1: ti = info[pos] result["terminal_info"] = ti result["terminal_info_bits"] = { "oil_electricity_connected": bool(ti & 0x80), "gps_tracking_on": bool(ti & 0x40), "alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"), "charging": bool(ti & 0x04), "acc_on": bool(ti & 0x02), "armed": bool(ti & 0x01), } pos += 1 if len(info) >= pos + 2: result["voltage"] = struct.unpack_from("!H", info, pos)[0] pos += 2 if len(info) >= pos + 1: result["gsm_signal"] = info[pos] result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown") pos += 1 if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed if len(info) >= pos + 2: result["alarm_language"] = struct.unpack_from("!H", info, pos)[0] pos += 2 return result def _parse_attendance(self, info: bytes) -> Dict[str, Any]: """0xB0 Attendance: GPS + WIFI + LBS combined attendance data.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 # GPS data if len(info) >= pos + 12: result["gps_info"] = self.parse_gps(info, pos) pos += 12 # LBS data if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 5: break station, consumed = self.parse_lbs_station(info, pos) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 # WIFI data if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed # Attendance-specific trailing data if pos < len(info): result["attendance_data"] = info[pos:] return result def _parse_attendance_4g(self, info: bytes) -> Dict[str, Any]: """0xB1 Attendance 4G: 4G version of attendance.""" result: Dict[str, Any] = {} pos = 0 result["datetime"] = self.parse_datetime(info, pos) pos += 6 # GPS data if len(info) >= pos + 12: result["gps_info"] = self.parse_gps(info, pos) pos += 12 # LBS data (4G variant) if len(info) >= pos + 3: mcc_mnc, consumed = self.parse_mcc_mnc(info, pos) result.update(mcc_mnc) pos += consumed stations: List[Dict[str, Any]] = [] for i in range(7): if len(info) < pos + 13: break station, consumed = self.parse_lbs_station( info, pos, lac_size=4, cell_id_size=8 ) station["is_main"] = (i == 0) stations.append(station) pos += consumed result["stations"] = stations if len(info) >= pos + 1: result["timing_advance"] = info[pos] pos += 1 # WIFI data if len(info) >= pos + 1: wifi_count = info[pos] result["wifi_count"] = wifi_count pos += 1 wifi_list, consumed = self.parse_wifi_list(info, pos, wifi_count) result["wifi_list"] = wifi_list pos += consumed if pos < len(info): result["attendance_data"] = info[pos:] return result def _parse_bt_punch(self, info: bytes) -> Dict[str, Any]: """0xB2 BT Punch: bluetooth punch card data.""" result: Dict[str, Any] = {} pos = 0 if len(info) >= 6: result["datetime"] = self.parse_datetime(info, pos) pos += 6 # Remaining is BT punch-specific payload if pos < len(info): result["bt_data"] = info[pos:] return result def _parse_bt_location(self, info: bytes) -> Dict[str, Any]: """0xB3 BT Location: bluetooth location data.""" result: Dict[str, Any] = {} pos = 0 if len(info) >= 6: result["datetime"] = self.parse_datetime(info, pos) pos += 6 if pos < len(info): result["bt_data"] = info[pos:] return result # ------------------------------------------------------------------ # Handler dispatch table (defined after methods so names resolve) # ------------------------------------------------------------------ _PROTOCOL_HANDLERS = { PROTO_LOGIN: _parse_login, PROTO_HEARTBEAT: _parse_heartbeat, PROTO_LBS_ADDRESS_REQ: _parse_lbs_address_req, PROTO_ADDRESS_QUERY: _parse_address_query, PROTO_TIME_SYNC: _parse_time_sync, PROTO_GPS: _parse_gps_packet, PROTO_LBS_MULTI: _parse_lbs_multi, PROTO_WIFI: _parse_wifi, PROTO_HEARTBEAT_EXT: _parse_heartbeat_ext, PROTO_TIME_SYNC_2: _parse_time_sync_2, PROTO_GENERAL_INFO: _parse_general_info, PROTO_GPS_4G: _parse_gps_4g, PROTO_LBS_4G: _parse_lbs_4g, PROTO_WIFI_4G: _parse_wifi_4g, PROTO_ALARM_SINGLE_FENCE: _parse_alarm_single_fence, PROTO_ALARM_LBS_4G: _parse_alarm_lbs_4g, PROTO_ALARM_WIFI: _parse_alarm_wifi, PROTO_ATTENDANCE: _parse_attendance, PROTO_ATTENDANCE_4G: _parse_attendance_4g, PROTO_BT_PUNCH: _parse_bt_punch, PROTO_BT_LOCATION: _parse_bt_location, }