Add WebSocket, multi API key, geocoding proxy, beacon map picker, and comprehensive bug fixes

- Multi API Key + permission system (read/write/admin) with SHA-256 hash
- WebSocket real-time push (location, alarm, device_status, attendance, bluetooth)
- Geocoding proxy endpoints for Amap POI search and reverse geocode
- Beacon modal map-based location picker with search and click-to-select
- GCJ-02 ↔ WGS-84 bidirectional coordinate conversion
- Data cleanup scheduler (configurable retention days)
- Fix GPS longitude sign inversion (course_status bit 11: 0=East, 1=West)
- Fix 2G CellID 2→3 bytes across all protocols (0x28, 0x2C, parser.py)
- Fix parser loop guards, alarm_source field length, CommandLog.sent_at
- Fix geocoding IMEI parameterization, require_admin import
- Improve API error messages for 422 validation errors
- Remove beacon floor/area fields (consolidated into name)

via [HAPI](https://hapi.run)

Co-Authored-By: HAPI <noreply@hapi.run>
This commit is contained in:
2026-03-24 05:10:05 +00:00
parent 7d6040af41
commit 11281e5be2
24 changed files with 1636 additions and 730 deletions

View File

@@ -14,9 +14,13 @@ from typing import Any, Dict, List, Tuple
from .constants import (
ALARM_TYPES,
ATTENDANCE_STATUS_MASK,
ATTENDANCE_STATUS_SHIFT,
ATTENDANCE_TYPES,
DATA_REPORT_MODES,
GSM_SIGNAL_LEVELS,
PROTOCOL_NAMES,
VOLTAGE_LEVELS,
PROTO_ADDRESS_QUERY,
PROTO_ALARM_LBS_4G,
PROTO_ALARM_MULTI_FENCE,
@@ -272,29 +276,23 @@ class PacketParser:
speed = data[offset + 9]
course_status = struct.unpack_from("!H", data, offset + 10)[0]
# Decode course/status
is_realtime = bool(course_status & 0x2000) # bit 13 (from MSB: bit 12 if 0-indexed from MSB)
is_gps_positioned = bool(course_status & 0x1000) # bit 12 -> actually bit 11
is_east = bool(course_status & 0x0800) # bit 11 -> bit 10
is_north = bool(course_status & 0x0400) # bit 10 -> bit 9
course = course_status & 0x03FF # lower 10 bits
# Wait -- the standard mapping for this protocol:
# bit 13 (0x2000): real-time GPS
# bit 12 (0x1000): GPS is positioned
# bit 11 (0x0800): East longitude (0=West)
# bit 10 (0x0400): North latitude (0=South, but spec says 1=South sometimes)
# We'll use the most common convention: bit10=1 means South latitude is *negated*.
# Actually, common convention: bit10 = 0 -> South, bit10 = 1 -> North? No --
# In most implementations of this protocol family:
# bit 10 (0x0400): 1 = North latitude, 0 = South
# We'll go with that.
# Decode course/status (per protocol doc):
# bit 13 (0x2000): GPS real-time differential positioning
# bit 12 (0x1000): GPS positioned
# bit 11 (0x0800): 0=East, 1=West (东经/西经)
# bit 10 (0x0400): 0=South, 1=North (南纬/北纬)
# bits 9-0: course (0-360)
is_realtime = bool(course_status & 0x2000)
is_gps_positioned = bool(course_status & 0x1000)
is_west = bool(course_status & 0x0800)
is_north = bool(course_status & 0x0400)
course = course_status & 0x03FF
latitude = lat_raw / 1_800_000.0
longitude = lon_raw / 1_800_000.0
if not is_north:
latitude = -latitude
if not is_east:
if is_west:
longitude = -longitude
return {
@@ -308,7 +306,7 @@ class PacketParser:
"course": course,
"is_realtime": is_realtime,
"is_gps_positioned": is_gps_positioned,
"is_east": is_east,
"is_west": is_west,
"is_north": is_north,
"course_status_raw": course_status,
}
@@ -347,7 +345,7 @@ class PacketParser:
offset: int = 0,
*,
lac_size: int = 2,
cell_id_size: int = 2,
cell_id_size: int = 3,
) -> Tuple[Dict[str, Any], int]:
"""
Parse a single LBS station (LAC + Cell ID + RSSI).
@@ -370,6 +368,8 @@ class PacketParser:
if cell_id_size == 2:
cell_id = struct.unpack_from("!H", data, offset + consumed)[0]
elif cell_id_size == 3:
cell_id = int.from_bytes(data[offset + consumed : offset + consumed + 3], "big")
elif cell_id_size == 4:
cell_id = struct.unpack_from("!I", data, offset + consumed)[0]
else: # 8
@@ -479,8 +479,8 @@ class PacketParser:
result["lac"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
result["cell_id"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
result["cell_id"] = int.from_bytes(info[pos : pos + 3], "big")
pos += 3
# Remaining bytes: phone number (BCD) + alarm_language
if pos < len(info):
@@ -524,7 +524,7 @@ class PacketParser:
return result
def _parse_gps_packet(self, info: bytes) -> Dict[str, Any]:
"""0x22 GPS: datetime(6) + gps(12) + mcc(2) + mnc(1-2) + lac(2) + cell_id(2) + acc(1) + report_mode(1) + realtime_upload(1) + mileage(4)."""
"""0x22 GPS: datetime(6) + gps(12) + mcc(2) + mnc(1-2) + lac(2) + cell_id(3) + acc(1) + report_mode(1) + realtime_upload(1) + mileage(4)."""
result: Dict[str, Any] = {}
pos = 0
@@ -542,9 +542,10 @@ class PacketParser:
result["lac"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
if len(info) >= pos + 2:
result["cell_id"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
# 2G Cell ID is 3 bytes (not 2)
if len(info) >= pos + 3:
result["cell_id"] = int.from_bytes(info[pos:pos + 3], "big")
pos += 3
if len(info) >= pos + 1:
result["acc"] = info[pos]
@@ -579,7 +580,7 @@ class PacketParser:
stations: List[Dict[str, Any]] = []
for i in range(7): # main + 6 neighbors
if len(info) < pos + 5:
if len(info) < pos + 6: # LAC(2) + CellID(3) + RSSI(1) = 6
break
station, consumed = self.parse_lbs_station(info, pos)
station["is_main"] = (i == 0)
@@ -614,7 +615,7 @@ class PacketParser:
stations: List[Dict[str, Any]] = []
for i in range(7):
if len(info) < pos + 5:
if len(info) < pos + 6: # LAC(2) + CellID(3) + RSSI(1) = 6
break
station, consumed = self.parse_lbs_station(info, pos)
station["is_main"] = (i == 0)
@@ -807,8 +808,44 @@ class PacketParser:
return result
@staticmethod
def _parse_alarm_tail(info: bytes, pos: int) -> Tuple[Dict[str, Any], int]:
"""Parse common alarm tail: terminal_info(1) + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1)."""
result: Dict[str, Any] = {}
if len(info) >= pos + 1:
ti = info[pos]
result["terminal_info"] = ti
result["terminal_info_bits"] = {
"oil_electricity_connected": bool(ti & 0x80),
"gps_tracking_on": bool(ti & 0x40),
"alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"),
"charging": bool(ti & 0x04),
"acc_on": bool(ti & 0x02),
"armed": bool(ti & 0x01),
}
pos += 1
if len(info) >= pos + 1:
voltage_level = info[pos]
result["voltage_level"] = voltage_level
result["voltage_name"] = VOLTAGE_LEVELS.get(voltage_level, "Unknown")
result["battery_level"] = min(voltage_level * 17, 100) if voltage_level <= 6 else None
pos += 1
if len(info) >= pos + 1:
result["gsm_signal"] = info[pos]
result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown")
pos += 1
if len(info) >= pos + 1:
alarm_code = info[pos]
result["alarm_code"] = alarm_code
result["alarm_type"] = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}")
pos += 1
if len(info) >= pos + 1:
result["language"] = info[pos]
pos += 1
return result, pos
def _parse_alarm_single_fence(self, info: bytes) -> Dict[str, Any]:
"""0xA3 Single Fence Alarm: datetime(6) + gps(12) + lbs_length(1) + mcc(2) + mnc(1-2) + lac(4) + cell_id(8) + terminal_info(1) + voltage(2) + gsm_signal(1) + alarm_language(2)."""
"""0xA3 Single Fence Alarm: datetime(6) + gps(12) + lbs_length(1) + mcc(2) + mnc(1-2) + lac(4) + cell_id(8) + terminal_info(1) + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1)."""
result: Dict[str, Any] = {}
pos = 0
@@ -836,45 +873,19 @@ class PacketParser:
result["cell_id"] = struct.unpack_from("!Q", info, pos)[0]
pos += 8
if len(info) >= pos + 1:
ti = info[pos]
result["terminal_info"] = ti
result["terminal_info_bits"] = {
"oil_electricity_connected": bool(ti & 0x80),
"gps_tracking_on": bool(ti & 0x40),
"alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"),
"charging": bool(ti & 0x04),
"acc_on": bool(ti & 0x02),
"armed": bool(ti & 0x01),
}
pos += 1
if len(info) >= pos + 2:
result["voltage"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
if len(info) >= pos + 1:
result["gsm_signal"] = info[pos]
result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown")
pos += 1
if len(info) >= pos + 2:
result["alarm_language"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
tail, pos = self._parse_alarm_tail(info, pos)
result.update(tail)
return result
def _parse_alarm_lbs_4g(self, info: bytes) -> Dict[str, Any]:
"""0xA5 LBS 4G Alarm: similar to 0xA3 but LBS-based."""
"""0xA5 LBS 4G Alarm: NO datetime, NO GPS, NO lbs_length.
Content starts directly with MCC(2) + MNC(1-2) + LAC(4) + CellID(8)
+ terminal_info(1) + voltage_level(1) + gsm_signal(1) + alarm_code(1) + language(1).
"""
result: Dict[str, Any] = {}
pos = 0
result["datetime"] = self.parse_datetime(info, pos)
pos += 6
if len(info) >= pos + 1:
result["lbs_length"] = info[pos]
pos += 1
pos = 0 # content starts directly with MCC
if len(info) >= pos + 3:
mcc_mnc, consumed = self.parse_mcc_mnc(info, pos)
@@ -889,85 +900,63 @@ class PacketParser:
result["cell_id"] = struct.unpack_from("!Q", info, pos)[0]
pos += 8
if len(info) >= pos + 1:
ti = info[pos]
result["terminal_info"] = ti
result["terminal_info_bits"] = {
"oil_electricity_connected": bool(ti & 0x80),
"gps_tracking_on": bool(ti & 0x40),
"alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"),
"charging": bool(ti & 0x04),
"acc_on": bool(ti & 0x02),
"armed": bool(ti & 0x01),
}
pos += 1
if len(info) >= pos + 2:
result["voltage"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
if len(info) >= pos + 1:
result["gsm_signal"] = info[pos]
result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown")
pos += 1
if len(info) >= pos + 2:
result["alarm_language"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
tail, pos = self._parse_alarm_tail(info, pos)
result.update(tail)
return result
def _parse_alarm_wifi(self, info: bytes) -> Dict[str, Any]:
"""0xA9 WIFI Alarm: datetime + gps + lbs + terminal_info + voltage + gsm + wifi_count + wifi_list + alarm_language."""
"""0xA9 WIFI Alarm: datetime(6) + MCC(2) + MNC(1-2) + cell_type(1) + cell_count(1)
+ [cell_stations] + timing_advance(1) + wifi_count(1) + [wifi_list] + alarm_code(1) + language(1).
No GPS block, no lbs_length. Independent format.
"""
result: Dict[str, Any] = {}
pos = 0
result["datetime"] = self.parse_datetime(info, pos)
pos += 6
if len(info) >= pos + 12:
result["gps_info"] = self.parse_gps(info, pos)
pos += 12
if len(info) >= pos + 1:
result["lbs_length"] = info[pos]
pos += 1
if len(info) >= pos + 3:
mcc_mnc, consumed = self.parse_mcc_mnc(info, pos)
result.update(mcc_mnc)
pos += consumed
if len(info) >= pos + 4:
result["lac"] = struct.unpack_from("!I", info, pos)[0]
pos += 4
if len(info) >= pos + 8:
result["cell_id"] = struct.unpack_from("!Q", info, pos)[0]
pos += 8
if len(info) >= pos + 1:
ti = info[pos]
result["terminal_info"] = ti
result["terminal_info_bits"] = {
"oil_electricity_connected": bool(ti & 0x80),
"gps_tracking_on": bool(ti & 0x40),
"alarm": ALARM_TYPES.get((ti >> 3) & 0x07, "Unknown"),
"charging": bool(ti & 0x04),
"acc_on": bool(ti & 0x02),
"armed": bool(ti & 0x01),
}
pos += 1
# cell_type(1) + cell_count(1)
cell_type = 0 # 0=2G, 1=4G
cell_count = 0
if len(info) >= pos + 2:
result["voltage"] = struct.unpack_from("!H", info, pos)[0]
cell_type = info[pos]
cell_count = info[pos + 1]
result["cell_type"] = cell_type
result["cell_count"] = cell_count
pos += 2
# Parse cell stations
stations: List[Dict[str, Any]] = []
for i in range(cell_count):
if cell_type == 1: # 4G: LAC(4) + CI(8) + RSSI(1) = 13 bytes
if len(info) < pos + 13:
break
station, consumed = self.parse_lbs_station(info, pos, lac_size=4, cell_id_size=8)
stations.append(station)
pos += consumed
else: # 2G: LAC(2) + CI(3) + RSSI(1) = 6 bytes
if len(info) < pos + 6:
break
lac_val = struct.unpack_from("!H", info, pos)[0]
ci_val = int.from_bytes(info[pos + 2:pos + 5], "big")
rssi_val = info[pos + 5]
stations.append({"lac": lac_val, "cell_id": ci_val, "rssi": rssi_val})
pos += 6
result["stations"] = stations
# timing_advance(1)
if len(info) >= pos + 1:
result["gsm_signal"] = info[pos]
result["gsm_signal_name"] = GSM_SIGNAL_LEVELS.get(info[pos], "Unknown")
result["timing_advance"] = info[pos]
pos += 1
# WiFi APs: wifi_count(1) + [mac(6) + signal(1)]*N
if len(info) >= pos + 1:
wifi_count = info[pos]
result["wifi_count"] = wifi_count
@@ -977,39 +966,85 @@ class PacketParser:
result["wifi_list"] = wifi_list
pos += consumed
if len(info) >= pos + 2:
result["alarm_language"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
# alarm_code(1) + language(1)
if len(info) >= pos + 1:
alarm_code = info[pos]
result["alarm_code"] = alarm_code
result["alarm_type"] = ALARM_TYPES.get(alarm_code, f"unknown_0x{alarm_code:02X}")
pos += 1
if len(info) >= pos + 1:
result["language"] = info[pos]
pos += 1
return result
def _parse_attendance(self, info: bytes) -> Dict[str, Any]:
"""0xB0 Attendance: GPS + WIFI + LBS combined attendance data."""
"""0xB0 Attendance: datetime(6) + gps_positioned(1) + reserved(2) + GPS(12)
+ terminal_info(1) + voltage_level(1) + gsm_signal(1) + reserved_ext(2)
+ MCC/MNC + 7 stations(LAC2+CI3+RSSI) + TA(1) + wifi_count(1) + wifi_list.
"""
result: Dict[str, Any] = {}
pos = 0
result["datetime"] = self.parse_datetime(info, pos)
pos += 6
# GPS data
# GPS positioned flag (1 byte)
if len(info) > pos:
result["gps_positioned"] = info[pos] == 1
pos += 1
# Terminal reserved (2 bytes)
if len(info) >= pos + 2:
result["terminal_reserved"] = info[pos:pos + 2]
pos += 2
# GPS data (12 bytes)
if len(info) >= pos + 12:
result["gps_info"] = self.parse_gps(info, pos)
pos += 12
# LBS data
# Terminal info (1 byte) - clock_in/clock_out
if len(info) > pos:
ti = info[pos]
result["terminal_info"] = ti
status_code = (ti >> ATTENDANCE_STATUS_SHIFT) & ATTENDANCE_STATUS_MASK
result["attendance_type"] = ATTENDANCE_TYPES.get(status_code, "unknown")
pos += 1
# Voltage level (1 byte)
if len(info) > pos:
vl = info[pos]
result["voltage_level"] = vl
result["battery_level"] = min(vl * 17, 100) if vl <= 6 else None
pos += 1
# GSM signal (1 byte)
if len(info) > pos:
result["gsm_signal"] = info[pos]
pos += 1
# Reserved extension (2 bytes)
if len(info) >= pos + 2:
pos += 2
# LBS: MCC/MNC
if len(info) >= pos + 3:
mcc_mnc, consumed = self.parse_mcc_mnc(info, pos)
result.update(mcc_mnc)
pos += consumed
# 7 stations: LAC(2) + CI(3) + RSSI(1) = 6 bytes each for 2G
stations: List[Dict[str, Any]] = []
for i in range(7):
if len(info) < pos + 5:
if len(info) < pos + 6:
break
station, consumed = self.parse_lbs_station(info, pos)
station["is_main"] = (i == 0)
lac_val = struct.unpack_from("!H", info, pos)[0]
ci_val = int.from_bytes(info[pos + 2:pos + 5], "big")
rssi_val = info[pos + 5]
station = {"lac": lac_val, "cell_id": ci_val, "rssi": rssi_val, "is_main": (i == 0)}
stations.append(station)
pos += consumed
pos += 6
result["stations"] = stations
@@ -1027,31 +1062,66 @@ class PacketParser:
result["wifi_list"] = wifi_list
pos += consumed
# Attendance-specific trailing data
if pos < len(info):
result["attendance_data"] = info[pos:]
return result
def _parse_attendance_4g(self, info: bytes) -> Dict[str, Any]:
"""0xB1 Attendance 4G: 4G version of attendance."""
"""0xB1 Attendance 4G: same layout as 0xB0 but MNC=2B fixed, LAC=4B, CI=8B."""
result: Dict[str, Any] = {}
pos = 0
result["datetime"] = self.parse_datetime(info, pos)
pos += 6
# GPS data
# GPS positioned flag (1 byte)
if len(info) > pos:
result["gps_positioned"] = info[pos] == 1
pos += 1
# Terminal reserved (2 bytes)
if len(info) >= pos + 2:
result["terminal_reserved"] = info[pos:pos + 2]
pos += 2
# GPS data (12 bytes)
if len(info) >= pos + 12:
result["gps_info"] = self.parse_gps(info, pos)
pos += 12
# LBS data (4G variant)
if len(info) >= pos + 3:
mcc_mnc, consumed = self.parse_mcc_mnc(info, pos)
result.update(mcc_mnc)
pos += consumed
# Terminal info (1 byte) - clock_in/clock_out
if len(info) > pos:
ti = info[pos]
result["terminal_info"] = ti
status_code = (ti >> ATTENDANCE_STATUS_SHIFT) & ATTENDANCE_STATUS_MASK
result["attendance_type"] = ATTENDANCE_TYPES.get(status_code, "unknown")
pos += 1
# Voltage level (1 byte)
if len(info) > pos:
vl = info[pos]
result["voltage_level"] = vl
result["battery_level"] = min(vl * 17, 100) if vl <= 6 else None
pos += 1
# GSM signal (1 byte)
if len(info) > pos:
result["gsm_signal"] = info[pos]
pos += 1
# Reserved extension (2 bytes)
if len(info) >= pos + 2:
pos += 2
# 4G LBS: MCC(2, clear high bit) + MNC(2, fixed) + LAC(4) + CI(8)
if len(info) >= pos + 2:
mcc_raw = struct.unpack_from("!H", info, pos)[0]
result["mcc"] = mcc_raw & 0x7FFF
pos += 2
if len(info) >= pos + 2:
result["mnc"] = struct.unpack_from("!H", info, pos)[0]
result["mnc_2byte"] = True
pos += 2
# 7 stations: LAC(4) + CI(8) + RSSI(1) = 13 bytes each
stations: List[Dict[str, Any]] = []
for i in range(7):
if len(info) < pos + 13:
@@ -1079,13 +1149,10 @@ class PacketParser:
result["wifi_list"] = wifi_list
pos += consumed
if pos < len(info):
result["attendance_data"] = info[pos:]
return result
def _parse_bt_punch(self, info: bytes) -> Dict[str, Any]:
"""0xB2 BT Punch: bluetooth punch card data."""
"""0xB2 BT Punch: datetime(6) + RSSI(1,signed) + MAC(6) + UUID(16) + Major(2) + Minor(2) + Battery(2) + TerminalInfo(1) + Reserved(2)."""
result: Dict[str, Any] = {}
pos = 0
@@ -1093,14 +1160,63 @@ class PacketParser:
result["datetime"] = self.parse_datetime(info, pos)
pos += 6
# Remaining is BT punch-specific payload
if pos < len(info):
result["bt_data"] = info[pos:]
# RSSI (1 byte, signed)
if len(info) > pos:
result["rssi"] = struct.unpack_from("b", info, pos)[0]
pos += 1
# MAC address (6 bytes)
if len(info) >= pos + 6:
result["beacon_mac"] = ":".join(f"{b:02X}" for b in info[pos:pos + 6])
pos += 6
# UUID (16 bytes)
if len(info) >= pos + 16:
uuid_bytes = info[pos:pos + 16]
result["beacon_uuid"] = (
f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-"
f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-"
f"{uuid_bytes[10:16].hex()}"
).upper()
pos += 16
# Major (2 bytes)
if len(info) >= pos + 2:
result["beacon_major"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
# Minor (2 bytes)
if len(info) >= pos + 2:
result["beacon_minor"] = struct.unpack_from("!H", info, pos)[0]
pos += 2
# Beacon battery (2 bytes, unit 0.01V)
if len(info) >= pos + 2:
raw_batt = struct.unpack_from("!H", info, pos)[0]
result["beacon_battery"] = raw_batt * 0.01
result["beacon_battery_unit"] = "V"
pos += 2
# Terminal info (1 byte) - clock_in/clock_out
if len(info) > pos:
ti = info[pos]
result["terminal_info"] = ti
status_code = (ti >> ATTENDANCE_STATUS_SHIFT) & ATTENDANCE_STATUS_MASK
result["attendance_type"] = ATTENDANCE_TYPES.get(status_code, "clock_in")
pos += 1
# Terminal reserved (2 bytes)
if len(info) >= pos + 2:
result["terminal_reserved"] = info[pos:pos + 2]
pos += 2
return result
def _parse_bt_location(self, info: bytes) -> Dict[str, Any]:
"""0xB3 BT Location: bluetooth location data."""
"""0xB3 BT Location: datetime(6) + beacon_count(1) + per-beacon(30 bytes each).
Per beacon: RSSI(1,signed) + MAC(6) + UUID(16) + Major(2) + Minor(2) + Battery(2) + BattUnit(1) = 30 bytes.
"""
result: Dict[str, Any] = {}
pos = 0
@@ -1108,8 +1224,101 @@ class PacketParser:
result["datetime"] = self.parse_datetime(info, pos)
pos += 6
if pos < len(info):
result["bt_data"] = info[pos:]
beacon_count = 0
if len(info) > pos:
beacon_count = info[pos]
result["beacon_count"] = beacon_count
pos += 1
beacons: List[Dict[str, Any]] = []
for _ in range(beacon_count):
if len(info) < pos + 30:
break
rssi = struct.unpack_from("b", info, pos)[0]
pos += 1
mac = ":".join(f"{b:02X}" for b in info[pos:pos + 6])
pos += 6
uuid_bytes = info[pos:pos + 16]
uuid_str = (
f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-"
f"{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-"
f"{uuid_bytes[10:16].hex()}"
).upper()
pos += 16
major = struct.unpack_from("!H", info, pos)[0]
pos += 2
minor = struct.unpack_from("!H", info, pos)[0]
pos += 2
raw_batt = struct.unpack_from("!H", info, pos)[0]
pos += 2
batt_unit_byte = info[pos]
pos += 1
if batt_unit_byte == 0:
battery_val = raw_batt * 0.01
battery_unit = "V"
else:
battery_val = float(raw_batt)
battery_unit = "%"
beacons.append({
"rssi": rssi,
"mac": mac,
"uuid": uuid_str,
"major": major,
"minor": minor,
"battery": battery_val,
"battery_unit": battery_unit,
})
result["beacons"] = beacons
return result
def _parse_alarm_multi_fence(self, info: bytes) -> Dict[str, Any]:
"""0xA4 Multi Fence Alarm: same as 0xA3 + fence_id(1) at the end."""
result = self._parse_alarm_single_fence(info)
# After the standard alarm fields, 0xA4 has an extra fence_id byte
# We need to re-parse to find the fence_id position
# The simplest approach: fence_id is the last unparsed byte
# Since _parse_alarm_single_fence consumed up to language(1),
# the fence_id follows it. Calculate position from the end.
# Format ends with: ...alarm_code(1) + language(1) + fence_id(1)
if len(info) > 0:
result["fence_id"] = info[-1]
return result
def _parse_online_cmd_reply(self, info: bytes) -> Dict[str, Any]:
"""0x81 Online Command Reply: length(1) + server_flag(4) + content(N) + language(2)."""
result: Dict[str, Any] = {}
if len(info) < 1:
return result
result["cmd_length"] = info[0]
pos = 1
if len(info) >= pos + 4:
result["server_flag"] = struct.unpack_from("!I", info, pos)[0]
pos += 4
# Content is between server_flag and language(2 bytes at end)
if len(info) > pos + 2:
try:
result["response_content"] = info[pos:-2].decode("utf-8", errors="replace")
except Exception:
result["response_content"] = info[pos:-2].hex()
result["language"] = struct.unpack_from("!H", info, len(info) - 2)[0]
elif len(info) > pos:
try:
result["response_content"] = info[pos:].decode("utf-8", errors="replace")
except Exception:
result["response_content"] = info[pos:].hex()
return result
@@ -1133,8 +1342,10 @@ class PacketParser:
PROTO_LBS_4G: _parse_lbs_4g,
PROTO_WIFI_4G: _parse_wifi_4g,
PROTO_ALARM_SINGLE_FENCE: _parse_alarm_single_fence,
PROTO_ALARM_MULTI_FENCE: _parse_alarm_multi_fence,
PROTO_ALARM_LBS_4G: _parse_alarm_lbs_4g,
PROTO_ALARM_WIFI: _parse_alarm_wifi,
PROTO_ONLINE_CMD_REPLY: _parse_online_cmd_reply,
PROTO_ATTENDANCE: _parse_attendance,
PROTO_ATTENDANCE_4G: _parse_attendance_4g,
PROTO_BT_PUNCH: _parse_bt_punch,