Files
desungongpai/app/services/beacon_service.py

554 lines
19 KiB
Python
Raw Permalink Normal View History

"""
Beacon Service - 蓝牙信标管理服务
Provides CRUD operations for Bluetooth beacon configuration.
"""
import asyncio
import logging
import re
from datetime import datetime, timezone
from sqlalchemy import delete as sa_delete, func, select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import BeaconConfig, CommandLog, Device, DeviceBeaconBinding
from app.schemas import BeaconConfigCreate, BeaconConfigUpdate
from app.services import tcp_command_service
logger = logging.getLogger(__name__)
async def get_beacons(
db: AsyncSession,
page: int = 1,
page_size: int = 20,
status_filter: str | None = None,
search: str | None = None,
) -> tuple[list[BeaconConfig], int]:
"""Get paginated beacon list with optional filters."""
query = select(BeaconConfig)
count_query = select(func.count(BeaconConfig.id))
if status_filter:
query = query.where(BeaconConfig.status == status_filter)
count_query = count_query.where(BeaconConfig.status == status_filter)
if search:
like = f"%{search}%"
cond = or_(
BeaconConfig.beacon_mac.ilike(like),
BeaconConfig.name.ilike(like),
BeaconConfig.area.ilike(like),
)
query = query.where(cond)
count_query = count_query.where(cond)
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
offset = (page - 1) * page_size
query = query.order_by(BeaconConfig.created_at.desc()).offset(offset).limit(page_size)
result = await db.execute(query)
return list(result.scalars().all()), total
async def get_beacon(db: AsyncSession, beacon_id: int) -> BeaconConfig | None:
result = await db.execute(
select(BeaconConfig).where(BeaconConfig.id == beacon_id)
)
return result.scalar_one_or_none()
async def get_beacon_by_mac(db: AsyncSession, mac: str) -> BeaconConfig | None:
result = await db.execute(
select(BeaconConfig).where(BeaconConfig.beacon_mac == mac)
)
return result.scalar_one_or_none()
async def create_beacon(db: AsyncSession, data: BeaconConfigCreate) -> BeaconConfig:
beacon = BeaconConfig(**data.model_dump())
db.add(beacon)
await db.flush()
await db.refresh(beacon)
return beacon
async def update_beacon(
db: AsyncSession, beacon_id: int, data: BeaconConfigUpdate
) -> BeaconConfig | None:
beacon = await get_beacon(db, beacon_id)
if beacon is None:
return None
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(beacon, key, value)
from app.config import now_cst
beacon.updated_at = now_cst()
await db.flush()
await db.refresh(beacon)
return beacon
async def delete_beacon(db: AsyncSession, beacon_id: int) -> bool:
beacon = await get_beacon(db, beacon_id)
if beacon is None:
return False
await db.delete(beacon)
await db.flush()
return True
# ---------------------------------------------------------------------------
# Device-Beacon Binding
# ---------------------------------------------------------------------------
async def get_beacon_devices(db: AsyncSession, beacon_id: int) -> list[dict]:
"""Get devices bound to a beacon."""
result = await db.execute(
select(
DeviceBeaconBinding.id.label("binding_id"),
DeviceBeaconBinding.device_id,
Device.name.label("device_name"),
Device.imei,
)
.join(Device, Device.id == DeviceBeaconBinding.device_id)
.where(DeviceBeaconBinding.beacon_id == beacon_id)
.order_by(Device.name)
)
return [row._asdict() for row in result.all()]
async def bind_devices_to_beacon(
db: AsyncSession, beacon_id: int, device_ids: list[int],
) -> dict:
"""Bind multiple devices to a beacon. Idempotent."""
beacon = await get_beacon(db, beacon_id)
if beacon is None:
return {"created": 0, "already_bound": 0, "not_found": len(device_ids), "error": "Beacon not found"}
result = await db.execute(
select(Device.id).where(Device.id.in_(device_ids))
)
existing_device_ids = set(row[0] for row in result.all())
result = await db.execute(
select(DeviceBeaconBinding.device_id).where(
DeviceBeaconBinding.beacon_id == beacon_id,
DeviceBeaconBinding.device_id.in_(device_ids),
)
)
already_bound_ids = set(row[0] for row in result.all())
created = 0
for did in device_ids:
if did not in existing_device_ids or did in already_bound_ids:
continue
db.add(DeviceBeaconBinding(device_id=did, beacon_id=beacon_id))
created += 1
await db.flush()
return {
"created": created,
"already_bound": len(already_bound_ids & existing_device_ids),
"not_found": len(set(device_ids) - existing_device_ids),
}
async def unbind_devices_from_beacon(
db: AsyncSession, beacon_id: int, device_ids: list[int],
) -> int:
"""Unbind devices from a beacon."""
result = await db.execute(
sa_delete(DeviceBeaconBinding).where(
DeviceBeaconBinding.beacon_id == beacon_id,
DeviceBeaconBinding.device_id.in_(device_ids),
)
)
await db.flush()
return result.rowcount
async def sync_device_beacons(db: AsyncSession, device_id: int) -> dict:
"""Query all beacons bound to a device and send BTMACSET commands via TCP.
BTMACSET supports up to 10 MACs per slot, 5 slots total (default + 1-4).
Returns {"sent": bool, "mac_count": int, "commands": [...], "error": str|None}.
"""
# Get device IMEI
result = await db.execute(select(Device).where(Device.id == device_id))
device = result.scalar_one_or_none()
if device is None:
return {"sent": False, "mac_count": 0, "commands": [], "error": "设备不存在"}
# Get all beacons bound to this device
result = await db.execute(
select(BeaconConfig.beacon_mac)
.join(DeviceBeaconBinding, DeviceBeaconBinding.beacon_id == BeaconConfig.id)
.where(DeviceBeaconBinding.device_id == device_id)
.order_by(BeaconConfig.id)
)
macs = [row[0] for row in result.all()]
if not tcp_command_service.is_device_online(device.imei):
return {"sent": False, "mac_count": len(macs), "commands": [], "error": "设备离线,无法发送指令"}
# Build BTMACSET commands: up to 10 MACs per slot
# Slot names: BTMACSET (default), BTMACSET1, BTMACSET2, BTMACSET3, BTMACSET4
slot_names = ["BTMACSET", "BTMACSET1", "BTMACSET2", "BTMACSET3", "BTMACSET4"]
commands_sent = []
if not macs:
# Clear default slot
cmd = "BTMACSET,#"
await tcp_command_service.send_command(device.imei, "online_cmd", cmd)
commands_sent.append(cmd)
else:
for i in range(0, min(len(macs), 50), 10):
slot_idx = i // 10
chunk = macs[i:i + 10]
cmd = f"{slot_names[slot_idx]},{','.join(chunk)}#"
await tcp_command_service.send_command(device.imei, "online_cmd", cmd)
commands_sent.append(cmd)
return {"sent": True, "mac_count": len(macs), "commands": commands_sent, "error": None}
# ---------------------------------------------------------------------------
# Reverse sync: query devices → update DB bindings
# ---------------------------------------------------------------------------
_MAC_PATTERN = re.compile(r"([0-9A-Fa-f]{2}(?::[0-9A-Fa-f]{2}){5})")
def _parse_btmacset_response(text: str) -> list[str]:
"""Extract MAC addresses from BTMACSET query response.
Example responses:
'setting OK.bt mac address:1,C3:00:00:34:43:5E;'
'bt mac address:1,C3:00:00:34:43:5E,AA:BB:CC:DD:EE:FF;'
"""
return [m.upper() for m in _MAC_PATTERN.findall(text)]
async def reverse_sync_from_devices(db: AsyncSession) -> dict:
"""Send BTMACSET# query to all online devices, parse responses, update bindings.
Uses separate DB sessions for command creation and polling to avoid
transaction isolation issues with the TCP handler's independent session.
"""
from app.database import async_session as make_session
from app.services import command_service
from app.config import now_cst
# Get all online devices
result = await db.execute(
select(Device).where(Device.status == "online")
)
devices = list(result.scalars().all())
if not devices:
return {"queried": 0, "responded": 0, "updated": 0, "details": [], "error": "没有在线设备"}
# Build beacon MAC → id lookup
result = await db.execute(select(BeaconConfig.id, BeaconConfig.beacon_mac))
mac_to_beacon_id = {row[1].upper(): row[0] for row in result.all()}
# --- Phase 1: Create CommandLogs and send commands (committed session) ---
sent_devices: list[tuple[int, str, str | None, int]] = [] # (dev_id, imei, name, cmd_log_id)
async with make_session() as cmd_session:
async with cmd_session.begin():
for dev in devices:
if not tcp_command_service.is_device_online(dev.imei):
continue
cmd_log = await command_service.create_command(
cmd_session, device_id=dev.id,
command_type="online_cmd", command_content="BTMACSET#",
)
try:
ok = await tcp_command_service.send_command(dev.imei, "online_cmd", "BTMACSET#")
if ok:
cmd_log.status = "sent"
cmd_log.sent_at = now_cst()
sent_devices.append((dev.id, dev.imei, dev.name, cmd_log.id))
else:
cmd_log.status = "failed"
except Exception:
cmd_log.status = "failed"
# Transaction committed here — TCP handler can now see these CommandLogs
if not sent_devices:
return {"queried": 0, "responded": 0, "updated": 0, "details": [], "error": "无法发送指令到任何设备"}
# --- Phase 2: Poll for responses (fresh session each iteration) ---
responded: dict[int, str] = {}
for attempt in range(10):
await asyncio.sleep(1)
pending_ids = [cid for _, _, _, cid in sent_devices if _ not in responded]
# Rebuild pending from device IDs not yet responded
pending_cmd_ids = [cid for did, _, _, cid in sent_devices if did not in responded]
if not pending_cmd_ids:
break
async with make_session() as poll_session:
result = await poll_session.execute(
select(CommandLog.device_id, CommandLog.response_content).where(
CommandLog.id.in_(pending_cmd_ids),
CommandLog.status == "success",
)
)
for row in result.all():
responded[row[0]] = row[1] or ""
# --- Phase 3: Parse responses and update bindings ---
details = []
updated_count = 0
for dev_id, imei, name, cmd_id in sent_devices:
resp_text = responded.get(dev_id)
if resp_text is None:
details.append({"device_id": dev_id, "imei": imei, "name": name, "status": "无响应"})
continue
found_macs = _parse_btmacset_response(resp_text)
matched_beacon_ids = set()
for mac in found_macs:
bid = mac_to_beacon_id.get(mac)
if bid:
matched_beacon_ids.add(bid)
# Get current bindings for this device
result = await db.execute(
select(DeviceBeaconBinding.beacon_id).where(
DeviceBeaconBinding.device_id == dev_id
)
)
current_bindings = set(row[0] for row in result.all())
to_add = matched_beacon_ids - current_bindings
for bid in to_add:
db.add(DeviceBeaconBinding(device_id=dev_id, beacon_id=bid))
to_remove = current_bindings - matched_beacon_ids
if to_remove:
await db.execute(
sa_delete(DeviceBeaconBinding).where(
DeviceBeaconBinding.device_id == dev_id,
DeviceBeaconBinding.beacon_id.in_(to_remove),
)
)
changes = len(to_add) + len(to_remove)
updated_count += 1 if changes else 0
details.append({
"device_id": dev_id, "imei": imei, "name": name,
"status": "已同步",
"device_macs": found_macs,
"matched_beacons": len(matched_beacon_ids),
"added": len(to_add), "removed": len(to_remove),
"response": resp_text,
})
await db.flush()
return {
"queried": len(sent_devices),
"responded": len(responded),
"updated": updated_count,
"details": details,
"error": None,
}
# ---------------------------------------------------------------------------
# Setup Bluetooth clock-in mode for devices
# ---------------------------------------------------------------------------
# Full config sequence per P241 docs:
# CLOCKWAY,3# → manual + Bluetooth clock
# MODE,2# → Bluetooth positioning mode
# BTMACSET,...# → write bound beacon MACs
# BTMP3SW,1# → enable voice broadcast
_BT_SETUP_STEPS = [
("CLOCKWAY,3#", "设置打卡方式: 手动+蓝牙"),
# MODE,2# inserted dynamically
# BTMACSET,...# inserted dynamically
("BTMP3SW,1#", "开启语音播报"),
]
async def setup_bluetooth_mode(
db: AsyncSession,
device_ids: list[int] | None = None,
) -> dict:
"""Configure devices for Bluetooth beacon clock-in mode.
Sends the full command sequence to each device:
1. CLOCKWAY,3# (manual + BT clock)
2. MODE,2# (BT positioning)
3. BTMACSET,... (bound beacon MACs)
4. BTMP3SW,1# (voice broadcast on)
If device_ids is None, targets all online devices.
"""
if device_ids:
result = await db.execute(
select(Device).where(Device.id.in_(device_ids))
)
else:
result = await db.execute(
select(Device).where(Device.status == "online")
)
devices = list(result.scalars().all())
if not devices:
return {"total": 0, "sent": 0, "failed": 0, "details": [], "error": "没有可操作的设备"}
# Pre-load all beacon bindings: device_id → [mac1, mac2, ...]
all_device_ids = [d.id for d in devices]
result = await db.execute(
select(DeviceBeaconBinding.device_id, BeaconConfig.beacon_mac)
.join(BeaconConfig, BeaconConfig.id == DeviceBeaconBinding.beacon_id)
.where(DeviceBeaconBinding.device_id.in_(all_device_ids))
.order_by(DeviceBeaconBinding.device_id, BeaconConfig.id)
)
device_macs: dict[int, list[str]] = {}
for row in result.all():
device_macs.setdefault(row[0], []).append(row[1])
details = []
sent_count = 0
fail_count = 0
for dev in devices:
if not tcp_command_service.is_device_online(dev.imei):
details.append({
"device_id": dev.id, "imei": dev.imei, "name": dev.name,
"status": "离线", "commands": [],
})
fail_count += 1
continue
macs = device_macs.get(dev.id, [])
# Build command sequence
commands = [
"CLOCKWAY,3#",
"MODE,2#",
]
# BTMACSET: split into slots of 10
slot_names = ["BTMACSET", "BTMACSET1", "BTMACSET2", "BTMACSET3", "BTMACSET4"]
if macs:
for i in range(0, min(len(macs), 50), 10):
slot_idx = i // 10
chunk = macs[i:i + 10]
commands.append(f"{slot_names[slot_idx]},{','.join(chunk)}#")
commands.append("BTMP3SW,1#")
# Send commands sequentially with small delay
sent_cmds = []
has_error = False
for cmd in commands:
try:
ok = await tcp_command_service.send_command(dev.imei, "online_cmd", cmd)
sent_cmds.append({"cmd": cmd, "ok": ok})
if not ok:
has_error = True
# Small delay between commands to avoid overwhelming device
await asyncio.sleep(0.3)
except Exception as e:
sent_cmds.append({"cmd": cmd, "ok": False, "error": str(e)})
has_error = True
if has_error:
fail_count += 1
else:
sent_count += 1
details.append({
"device_id": dev.id, "imei": dev.imei, "name": dev.name,
"status": "部分失败" if has_error else "已配置",
"beacon_count": len(macs),
"commands": sent_cmds,
})
return {
"total": len(devices),
"sent": sent_count,
"failed": fail_count,
"details": details,
"error": None,
}
async def restore_normal_mode(
db: AsyncSession,
device_ids: list[int] | None = None,
) -> dict:
"""Restore devices from Bluetooth clock-in mode to normal (smart) mode.
Sends:
1. CLOCKWAY,1# (manual clock only)
2. MODE,3# (smart positioning)
3. BTMP3SW,0# (voice broadcast off)
"""
if device_ids:
result = await db.execute(
select(Device).where(Device.id.in_(device_ids))
)
else:
result = await db.execute(
select(Device).where(Device.status == "online")
)
devices = list(result.scalars().all())
if not devices:
return {"total": 0, "sent": 0, "failed": 0, "details": [], "error": "没有可操作的设备"}
commands = ["CLOCKWAY,1#", "MODE,3#", "BTMP3SW,0#"]
details = []
sent_count = 0
fail_count = 0
for dev in devices:
if not tcp_command_service.is_device_online(dev.imei):
details.append({
"device_id": dev.id, "imei": dev.imei, "name": dev.name,
"status": "离线", "commands": [],
})
fail_count += 1
continue
sent_cmds = []
has_error = False
for cmd in commands:
try:
ok = await tcp_command_service.send_command(dev.imei, "online_cmd", cmd)
sent_cmds.append({"cmd": cmd, "ok": ok})
if not ok:
has_error = True
await asyncio.sleep(0.3)
except Exception as e:
sent_cmds.append({"cmd": cmd, "ok": False, "error": str(e)})
has_error = True
if has_error:
fail_count += 1
else:
sent_count += 1
details.append({
"device_id": dev.id, "imei": dev.imei, "name": dev.name,
"status": "部分失败" if has_error else "已恢复",
"commands": sent_cmds,
})
return {
"total": len(devices),
"sent": sent_count,
"failed": fail_count,
"details": details,
"error": None,
}