""" KKS Bluetooth Badge Protocol Packet Builder Constructs server response packets for the KKS badge protocol. """ from __future__ import annotations import struct import time from datetime import datetime, timezone from typing import Optional from .constants import ( PROTO_HEARTBEAT, PROTO_HEARTBEAT_EXT, PROTO_LBS_ADDRESS_REQ, PROTO_LBS_MULTI_REPLY, PROTO_LOGIN, PROTO_MESSAGE, PROTO_ONLINE_CMD, PROTO_TIME_SYNC, PROTO_TIME_SYNC_2, PROTO_ADDRESS_REPLY_EN, START_MARKER_LONG, START_MARKER_SHORT, STOP_MARKER, ) from .crc import crc_itu class PacketBuilder: """Builds server response packets for the KKS badge protocol.""" # ------------------------------------------------------------------ # Core builder # ------------------------------------------------------------------ @staticmethod def build_response( protocol_number: int, serial_number: int, info_content: bytes = b"", ) -> bytes: """ Build a complete response packet. Packet layout (short form, 0x7878): START(2) + LENGTH(1) + PROTO(1) + INFO(N) + SERIAL(2) + CRC(2) + STOP(2) LENGTH = 1(proto) + N(info) + 2(serial) + 2(crc) If the payload exceeds 255 bytes the long form (0x7979, 2-byte length) is used automatically. Parameters ---------- protocol_number : int Protocol number byte. serial_number : int Packet serial number (16-bit). info_content : bytes Information content (may be empty). Returns ------- bytes The fully assembled packet. """ proto_byte = struct.pack("B", protocol_number) serial_bytes = struct.pack("!H", serial_number) # Payload for length calculation: proto + info + serial + crc payload_len = 1 + len(info_content) + 2 + 2 # proto + info + serial + crc if payload_len > 0xFF: # Long packet length_bytes = struct.pack("!H", payload_len) start_marker = START_MARKER_LONG else: length_bytes = struct.pack("B", payload_len) start_marker = START_MARKER_SHORT # CRC is computed over: length_bytes + proto + info + serial crc_input = length_bytes + proto_byte + info_content + serial_bytes crc_value = crc_itu(crc_input) crc_bytes = struct.pack("!H", crc_value) return ( start_marker + length_bytes + proto_byte + info_content + serial_bytes + crc_bytes + STOP_MARKER ) # ------------------------------------------------------------------ # Specific response builders # ------------------------------------------------------------------ def build_login_response(self, serial_number: int) -> bytes: """ Build a login response (0x01). The server responds with an empty info content to acknowledge login. """ return self.build_response(PROTO_LOGIN, serial_number) def build_heartbeat_response( self, serial_number: int, protocol: int = PROTO_HEARTBEAT, ) -> bytes: """ Build a heartbeat response. Works for both standard heartbeat (0x13) and extended heartbeat (0x36). """ return self.build_response(protocol, serial_number) def build_time_sync_response( self, serial_number: int, protocol: int = PROTO_TIME_SYNC, language: int = 0x0001, ) -> bytes: """ Build a time sync response (0x1F). Returns the current UTC time as a 4-byte Unix timestamp + 2-byte language. For Chinese (0x0001), the timestamp is GMT+8. """ utc_now = int(time.time()) if language == 0x0001: utc_now += 8 * 3600 # GMT+8 for Chinese info = struct.pack("!IH", utc_now, language) return self.build_response(protocol, serial_number, info) def build_time_sync_8a_response(self, serial_number: int) -> bytes: """ Build a Time Sync 2 response (0x8A). Returns the current UTC time as YY MM DD HH MM SS (6 bytes). """ now = datetime.now(timezone.utc) info = struct.pack( "BBBBBB", now.year - 2000, now.month, now.day, now.hour, now.minute, now.second, ) return self.build_response(PROTO_TIME_SYNC_2, serial_number, info) def build_lbs_multi_response(self, serial_number: int) -> bytes: """ Build an LBS Multi Reply response (0x2E). The server acknowledges with an empty info content. """ return self.build_response(PROTO_LBS_MULTI_REPLY, serial_number) def build_online_command( self, serial_number: int, server_flag: int, command: str, language: int = 0x0001, ) -> bytes: """ Build an online command packet (0x80). Parameters ---------- serial_number : int Packet serial number. server_flag : int Server flag bits (32-bit). command : str The command string to send (ASCII). language : int Language code (default 0x0001 = Chinese). Returns ------- bytes Complete packet. """ cmd_bytes = command.encode("ascii") # inner_len = server_flag(4) + cmd_content(N) + language(2) inner_len = 4 + len(cmd_bytes) + 2 info = struct.pack("B", inner_len) # 1 byte inner length info += struct.pack("!I", server_flag) # 4 bytes server flag info += cmd_bytes # N bytes command info += struct.pack("!H", language) # 2 bytes language return self.build_response(PROTO_ONLINE_CMD, serial_number, info) def build_message( self, serial_number: int, server_flag: int, message_text: str, language: int = 0x0001, ) -> bytes: """ Build a message packet (0x82). The message is encoded in UTF-16 Big-Endian. Parameters ---------- serial_number : int Packet serial number. server_flag : int Server flag bits (32-bit). message_text : str The message string to send. language : int Language code (default 0x0001 = Chinese). Returns ------- bytes Complete packet. """ msg_bytes = message_text.encode("utf-16-be") # inner_len = server_flag(4) + msg_content(N) + language(2) inner_len = 4 + len(msg_bytes) + 2 info = struct.pack("B", inner_len) # 1 byte inner length info += struct.pack("!I", server_flag) # 4 bytes server flag info += msg_bytes # N bytes message (UTF16BE) info += struct.pack("!H", language) # 2 bytes language return self.build_response(PROTO_MESSAGE, serial_number, info) def build_address_reply_cn( self, serial_number: int, server_flag: int = 0, address: str = "", phone: str = "", protocol: int = PROTO_LBS_ADDRESS_REQ, is_alarm: bool = False, ) -> bytes: """ Build a Chinese address reply packet (0x17). Format: cmd_length(1) + server_flag(4) + ADDRESS/ALARMSMS + && + addr(UTF16BE) + && + phone(21) + ## """ flag_bytes = struct.pack("!I", server_flag) marker = b"ALARMSMS" if is_alarm else b"ADDRESS" separator = b"&&" terminator = b"##" addr_bytes = address.encode("utf-16-be") # Phone field: 21 bytes ASCII, zero-padded phone_bytes = phone.encode("ascii", errors="ignore")[:21].ljust(21, b"0") inner = flag_bytes + marker + separator + addr_bytes + separator + phone_bytes + terminator # 0x17 uses 1-byte cmd_length cmd_len = min(len(inner), 0xFF) info = bytes([cmd_len]) + inner return self.build_response(protocol, serial_number, info) def build_address_reply_en( self, serial_number: int, server_flag: int = 0, address: str = "", phone: str = "", protocol: int = PROTO_ADDRESS_REPLY_EN, is_alarm: bool = False, ) -> bytes: """ Build an English address reply packet (0x97). Format: cmd_length(2) + server_flag(4) + ADDRESS/ALARMSMS + && + addr(UTF-8) + && + phone(21) + ## """ flag_bytes = struct.pack("!I", server_flag) marker = b"ALARMSMS" if is_alarm else b"ADDRESS" separator = b"&&" terminator = b"##" addr_bytes = address.encode("utf-8") phone_bytes = phone.encode("ascii", errors="ignore")[:21].ljust(21, b"0") inner = flag_bytes + marker + separator + addr_bytes + separator + phone_bytes + terminator # 0x97 uses 2-byte cmd_length info = struct.pack("!H", len(inner)) + inner return self.build_response(protocol, serial_number, info)