Files
desungongpai/app/services/fence_checker.py
default b25eafc483 feat: 性能优化 + 设备总览轨迹展示 + 广播指令API
性能: SQLite WAL模式、aiohttp Session复用、TCP连接锁+空闲超时、
device_id缓存、WebSocket并发广播、API Key认证缓存、围栏N+1查询
批量化、逆地理编码并行化、新增5个DB索引、日志降级DEBUG

功能: 广播指令API(broadcast)、exclude_type低精度后端过滤、
前端设备总览Tab+多设备轨迹叠加+高亮联动+搜索+专属颜色

via [HAPI](https://hapi.run)

Co-Authored-By: HAPI <noreply@hapi.run>
2026-03-31 09:41:09 +00:00

404 lines
13 KiB
Python

"""Fence checker service - geofence judgment engine with auto-attendance.
Checks whether a device's reported coordinates fall inside its bound fences.
Creates automatic attendance records (clock_in/clock_out) on state transitions.
"""
import json
import logging
import math
from datetime import datetime, timedelta, timezone
from typing import Optional
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import now_cst, settings
from app.models import (
AttendanceRecord,
Device,
DeviceFenceBinding,
DeviceFenceState,
FenceConfig,
)
logger = logging.getLogger(__name__)
_EARTH_RADIUS_M = 6_371_000.0
# ---------------------------------------------------------------------------
# Geometry helpers (WGS-84)
# ---------------------------------------------------------------------------
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Return distance in meters between two WGS-84 points."""
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (
math.sin(dlat / 2) ** 2
+ math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
)
return _EARTH_RADIUS_M * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def is_inside_circle(
lat: float, lon: float,
center_lat: float, center_lng: float,
radius_m: float,
) -> bool:
"""Check if point is inside a circle (haversine)."""
return haversine_distance(lat, lon, center_lat, center_lng) <= radius_m
def is_inside_polygon(
lat: float, lon: float,
vertices: list[list[float]],
) -> bool:
"""Ray-casting algorithm. vertices = [[lng, lat], ...] in WGS-84."""
n = len(vertices)
if n < 3:
return False
inside = False
j = n - 1
for i in range(n):
xi, yi = vertices[i][0], vertices[i][1] # lng, lat
xj, yj = vertices[j][0], vertices[j][1]
if ((yi > lat) != (yj > lat)) and (
lon < (xj - xi) * (lat - yi) / (yj - yi) + xi
):
inside = not inside
j = i
return inside
def is_inside_fence(
lat: float, lon: float,
fence: FenceConfig,
tolerance_m: float = 0,
) -> bool:
"""Check if a point is inside a fence, with optional tolerance buffer."""
if fence.fence_type == "circle":
if fence.center_lat is None or fence.center_lng is None or fence.radius is None:
return False
return is_inside_circle(
lat, lon,
fence.center_lat, fence.center_lng,
fence.radius + tolerance_m,
)
# polygon / rectangle: parse points JSON
if not fence.points:
return False
try:
vertices = json.loads(fence.points)
except (json.JSONDecodeError, TypeError):
logger.warning("Fence %d has invalid points JSON", fence.id)
return False
if not isinstance(vertices, list) or len(vertices) < 3:
return False
# For polygon with tolerance, check point-in-polygon first
if is_inside_polygon(lat, lon, vertices):
return True
# If not inside but tolerance > 0, check distance to nearest edge
if tolerance_m > 0:
return _min_distance_to_polygon(lat, lon, vertices) <= tolerance_m
return False
def _min_distance_to_polygon(
lat: float, lon: float,
vertices: list[list[float]],
) -> float:
"""Approximate minimum distance from point to polygon edges (meters)."""
min_dist = float("inf")
n = len(vertices)
for i in range(n):
j = (i + 1) % n
# Each vertex is [lng, lat]
dist = _point_to_segment_distance(
lat, lon,
vertices[i][1], vertices[i][0],
vertices[j][1], vertices[j][0],
)
if dist < min_dist:
min_dist = dist
return min_dist
def _point_to_segment_distance(
plat: float, plon: float,
alat: float, alon: float,
blat: float, blon: float,
) -> float:
"""Approximate distance from point P to line segment AB (meters)."""
# Project P onto AB using flat-earth approximation (good for short segments)
dx = blon - alon
dy = blat - alat
if dx == 0 and dy == 0:
return haversine_distance(plat, plon, alat, alon)
t = max(0, min(1, ((plon - alon) * dx + (plat - alat) * dy) / (dx * dx + dy * dy)))
proj_lat = alat + t * dy
proj_lon = alon + t * dx
return haversine_distance(plat, plon, proj_lat, proj_lon)
def _get_tolerance_for_location_type(location_type: str) -> float:
"""Return tolerance in meters based on location type accuracy."""
if location_type in ("lbs", "lbs_4g"):
return float(settings.FENCE_LBS_TOLERANCE_METERS)
if location_type in ("wifi", "wifi_4g"):
return float(settings.FENCE_WIFI_TOLERANCE_METERS)
# GPS: no extra tolerance
return 0.0
# ---------------------------------------------------------------------------
# Daily dedup helper
# ---------------------------------------------------------------------------
async def _has_attendance_today(
session: AsyncSession,
device_id: int,
attendance_type: str,
) -> bool:
"""Check if device already has an attendance record of given type today (CST)."""
cst_now = datetime.now(timezone(timedelta(hours=8)))
day_start = cst_now.replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
day_end = day_start + timedelta(days=1)
result = await session.execute(
select(func.count()).select_from(AttendanceRecord).where(
AttendanceRecord.device_id == device_id,
AttendanceRecord.attendance_type == attendance_type,
AttendanceRecord.recorded_at >= day_start,
AttendanceRecord.recorded_at < day_end,
)
)
return (result.scalar() or 0) > 0
# ---------------------------------------------------------------------------
# Main fence check entry point
# ---------------------------------------------------------------------------
async def check_device_fences(
session: AsyncSession,
device_id: int,
imei: str,
latitude: float,
longitude: float,
location_type: str,
address: Optional[str],
recorded_at: datetime,
*,
mcc: Optional[int] = None,
mnc: Optional[int] = None,
lac: Optional[int] = None,
cell_id: Optional[int] = None,
) -> list[dict]:
"""Check all bound active fences for a device. Returns attendance events.
Called after each location report is stored. Creates automatic
AttendanceRecords for fence entry/exit transitions.
"""
# 1. Query active fences bound to this device
result = await session.execute(
select(FenceConfig)
.join(DeviceFenceBinding, DeviceFenceBinding.fence_id == FenceConfig.id)
.where(
DeviceFenceBinding.device_id == device_id,
FenceConfig.is_active == 1,
)
)
fences = list(result.scalars().all())
if not fences:
return []
# Query device for latest battery/signal info (from heartbeats)
device = await session.get(Device, device_id)
device_info = {
"battery_level": device.battery_level if device else None,
"gsm_signal": device.gsm_signal if device else None,
"mcc": mcc,
"mnc": mnc,
"lac": lac,
"cell_id": cell_id,
}
# 2. Batch-load all fence states in one query (avoid N+1)
fence_ids = [f.id for f in fences]
states_result = await session.execute(
select(DeviceFenceState).where(
DeviceFenceState.device_id == device_id,
DeviceFenceState.fence_id.in_(fence_ids),
)
)
states_map: dict[int, DeviceFenceState] = {s.fence_id: s for s in states_result.scalars().all()}
# Pre-check today's attendance dedup once (not per-fence)
_today_clock_in = await _has_attendance_today(session, device_id, "clock_in")
tolerance = _get_tolerance_for_location_type(location_type)
events: list[dict] = []
now = now_cst()
min_interval = settings.FENCE_MIN_INSIDE_SECONDS
for fence in fences:
currently_inside = is_inside_fence(latitude, longitude, fence, tolerance)
state = states_map.get(fence.id)
was_inside = bool(state and state.is_inside)
# 3. Detect transition
if currently_inside and not was_inside:
# ENTRY: outside -> inside = clock_in
if state and state.last_transition_at:
elapsed = (now - state.last_transition_at).total_seconds()
if elapsed < min_interval:
logger.debug(
"Fence %d debounce: %ds < %ds, skip clock_in for device %d",
fence.id, int(elapsed), min_interval, device_id,
)
_update_state(state, currently_inside, now, latitude, longitude)
continue
# Daily dedup: only one clock_in per device per day (pre-fetched)
if _today_clock_in:
logger.info(
"Fence skip clock_in: device=%d fence=%d(%s) already clocked in today",
device_id, fence.id, fence.name,
)
else:
attendance = _create_attendance(
device_id, imei, "clock_in", latitude, longitude,
address, recorded_at, fence, device_info,
)
session.add(attendance)
event = _build_event(
device_id, imei, fence, "clock_in",
latitude, longitude, address, recorded_at,
)
events.append(event)
logger.info(
"Fence auto clock_in: device=%d fence=%d(%s)",
device_id, fence.id, fence.name,
)
elif not currently_inside and was_inside:
# EXIT: inside -> outside — skip clock_out (GPS drift causes false exits)
logger.debug(
"Fence exit ignored: device=%d fence=%d(%s), no clock_out created",
device_id, fence.id, fence.name,
)
# 4. Update state
if state is None:
state = DeviceFenceState(
device_id=device_id,
fence_id=fence.id,
is_inside=currently_inside,
last_transition_at=now if (currently_inside != was_inside) else None,
last_check_at=now,
last_latitude=latitude,
last_longitude=longitude,
)
session.add(state)
else:
if currently_inside != was_inside:
state.last_transition_at = now
state.is_inside = currently_inside
state.last_check_at = now
state.last_latitude = latitude
state.last_longitude = longitude
await session.flush()
return events
def _update_state(
state: DeviceFenceState,
is_inside: bool,
now: datetime,
lat: float,
lon: float,
) -> None:
"""Update state fields without creating a transition."""
state.last_check_at = now
state.last_latitude = lat
state.last_longitude = lon
# Don't update is_inside or last_transition_at during debounce
def _create_attendance(
device_id: int,
imei: str,
attendance_type: str,
latitude: float,
longitude: float,
address: Optional[str],
recorded_at: datetime,
fence: FenceConfig,
device_info: Optional[dict] = None,
) -> AttendanceRecord:
"""Create an auto-generated fence attendance record."""
info = device_info or {}
return AttendanceRecord(
device_id=device_id,
imei=imei,
attendance_type=attendance_type,
attendance_source="fence",
protocol_number=0, # synthetic, not from device protocol
gps_positioned=True,
latitude=latitude,
longitude=longitude,
address=address,
recorded_at=recorded_at,
battery_level=info.get("battery_level"),
gsm_signal=info.get("gsm_signal"),
mcc=info.get("mcc"),
mnc=info.get("mnc"),
lac=info.get("lac"),
cell_id=info.get("cell_id"),
lbs_data={
"source": "fence_auto",
"fence_id": fence.id,
"fence_name": fence.name,
},
)
def _build_event(
device_id: int,
imei: str,
fence: FenceConfig,
attendance_type: str,
latitude: float,
longitude: float,
address: Optional[str],
recorded_at: datetime,
) -> dict:
"""Build a WebSocket broadcast event dict."""
return {
"device_id": device_id,
"imei": imei,
"fence_id": fence.id,
"fence_name": fence.name,
"attendance_type": attendance_type,
"latitude": latitude,
"longitude": longitude,
"address": address,
"recorded_at": recorded_at.isoformat() if recorded_at else None,
"source": "fence_auto",
}