"""Fence checker service - geofence judgment engine with auto-attendance. Checks whether a device's reported coordinates fall inside its bound fences. Creates automatic attendance records (clock_in/clock_out) on state transitions. """ import json import logging import math from datetime import datetime, timedelta, timezone from typing import Optional from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.config import now_cst, settings from app.models import ( AttendanceRecord, Device, DeviceFenceBinding, DeviceFenceState, FenceConfig, ) logger = logging.getLogger(__name__) _EARTH_RADIUS_M = 6_371_000.0 # --------------------------------------------------------------------------- # Geometry helpers (WGS-84) # --------------------------------------------------------------------------- def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Return distance in meters between two WGS-84 points.""" rlat1, rlat2 = math.radians(lat1), math.radians(lat2) dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = ( math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2 ) return _EARTH_RADIUS_M * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def is_inside_circle( lat: float, lon: float, center_lat: float, center_lng: float, radius_m: float, ) -> bool: """Check if point is inside a circle (haversine).""" return haversine_distance(lat, lon, center_lat, center_lng) <= radius_m def is_inside_polygon( lat: float, lon: float, vertices: list[list[float]], ) -> bool: """Ray-casting algorithm. vertices = [[lng, lat], ...] in WGS-84.""" n = len(vertices) if n < 3: return False inside = False j = n - 1 for i in range(n): xi, yi = vertices[i][0], vertices[i][1] # lng, lat xj, yj = vertices[j][0], vertices[j][1] if ((yi > lat) != (yj > lat)) and ( lon < (xj - xi) * (lat - yi) / (yj - yi) + xi ): inside = not inside j = i return inside def is_inside_fence( lat: float, lon: float, fence: FenceConfig, tolerance_m: float = 0, ) -> bool: """Check if a point is inside a fence, with optional tolerance buffer.""" if fence.fence_type == "circle": if fence.center_lat is None or fence.center_lng is None or fence.radius is None: return False return is_inside_circle( lat, lon, fence.center_lat, fence.center_lng, fence.radius + tolerance_m, ) # polygon / rectangle: parse points JSON if not fence.points: return False try: vertices = json.loads(fence.points) except (json.JSONDecodeError, TypeError): logger.warning("Fence %d has invalid points JSON", fence.id) return False if not isinstance(vertices, list) or len(vertices) < 3: return False # For polygon with tolerance, check point-in-polygon first if is_inside_polygon(lat, lon, vertices): return True # If not inside but tolerance > 0, check distance to nearest edge if tolerance_m > 0: return _min_distance_to_polygon(lat, lon, vertices) <= tolerance_m return False def _min_distance_to_polygon( lat: float, lon: float, vertices: list[list[float]], ) -> float: """Approximate minimum distance from point to polygon edges (meters).""" min_dist = float("inf") n = len(vertices) for i in range(n): j = (i + 1) % n # Each vertex is [lng, lat] dist = _point_to_segment_distance( lat, lon, vertices[i][1], vertices[i][0], vertices[j][1], vertices[j][0], ) if dist < min_dist: min_dist = dist return min_dist def _point_to_segment_distance( plat: float, plon: float, alat: float, alon: float, blat: float, blon: float, ) -> float: """Approximate distance from point P to line segment AB (meters).""" # Project P onto AB using flat-earth approximation (good for short segments) dx = blon - alon dy = blat - alat if dx == 0 and dy == 0: return haversine_distance(plat, plon, alat, alon) t = max(0, min(1, ((plon - alon) * dx + (plat - alat) * dy) / (dx * dx + dy * dy))) proj_lat = alat + t * dy proj_lon = alon + t * dx return haversine_distance(plat, plon, proj_lat, proj_lon) def _get_tolerance_for_location_type(location_type: str) -> float: """Return tolerance in meters based on location type accuracy.""" if location_type in ("lbs", "lbs_4g"): return float(settings.FENCE_LBS_TOLERANCE_METERS) if location_type in ("wifi", "wifi_4g"): return float(settings.FENCE_WIFI_TOLERANCE_METERS) # GPS: no extra tolerance return 0.0 # --------------------------------------------------------------------------- # Daily dedup helper # --------------------------------------------------------------------------- async def _has_attendance_today( session: AsyncSession, device_id: int, attendance_type: str, ) -> bool: """Check if device already has an attendance record of given type today (CST).""" cst_now = datetime.now(timezone(timedelta(hours=8))) day_start = cst_now.replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None) day_end = day_start + timedelta(days=1) result = await session.execute( select(func.count()).select_from(AttendanceRecord).where( AttendanceRecord.device_id == device_id, AttendanceRecord.attendance_type == attendance_type, AttendanceRecord.recorded_at >= day_start, AttendanceRecord.recorded_at < day_end, ) ) return (result.scalar() or 0) > 0 # --------------------------------------------------------------------------- # Main fence check entry point # --------------------------------------------------------------------------- async def check_device_fences( session: AsyncSession, device_id: int, imei: str, latitude: float, longitude: float, location_type: str, address: Optional[str], recorded_at: datetime, *, mcc: Optional[int] = None, mnc: Optional[int] = None, lac: Optional[int] = None, cell_id: Optional[int] = None, ) -> list[dict]: """Check all bound active fences for a device. Returns attendance events. Called after each location report is stored. Creates automatic AttendanceRecords for fence entry/exit transitions. """ # 1. Query active fences bound to this device result = await session.execute( select(FenceConfig) .join(DeviceFenceBinding, DeviceFenceBinding.fence_id == FenceConfig.id) .where( DeviceFenceBinding.device_id == device_id, FenceConfig.is_active == 1, ) ) fences = list(result.scalars().all()) if not fences: return [] # Query device for latest battery/signal info (from heartbeats) device = await session.get(Device, device_id) device_info = { "battery_level": device.battery_level if device else None, "gsm_signal": device.gsm_signal if device else None, "mcc": mcc, "mnc": mnc, "lac": lac, "cell_id": cell_id, } tolerance = _get_tolerance_for_location_type(location_type) events: list[dict] = [] now = now_cst() min_interval = settings.FENCE_MIN_INSIDE_SECONDS for fence in fences: currently_inside = is_inside_fence(latitude, longitude, fence, tolerance) # 2. Get or create state record state_result = await session.execute( select(DeviceFenceState).where( DeviceFenceState.device_id == device_id, DeviceFenceState.fence_id == fence.id, ) ) state = state_result.scalar_one_or_none() was_inside = bool(state and state.is_inside) # 3. Detect transition if currently_inside and not was_inside: # ENTRY: outside -> inside = clock_in if state and state.last_transition_at: elapsed = (now - state.last_transition_at).total_seconds() if elapsed < min_interval: logger.debug( "Fence %d debounce: %ds < %ds, skip clock_in for device %d", fence.id, int(elapsed), min_interval, device_id, ) _update_state(state, currently_inside, now, latitude, longitude) continue # Daily dedup: only one clock_in per device per day if await _has_attendance_today(session, device_id, "clock_in"): logger.info( "Fence skip clock_in: device=%d fence=%d(%s) already clocked in today", device_id, fence.id, fence.name, ) else: attendance = _create_attendance( device_id, imei, "clock_in", latitude, longitude, address, recorded_at, fence, device_info, ) session.add(attendance) event = _build_event( device_id, imei, fence, "clock_in", latitude, longitude, address, recorded_at, ) events.append(event) logger.info( "Fence auto clock_in: device=%d fence=%d(%s)", device_id, fence.id, fence.name, ) elif not currently_inside and was_inside: # EXIT: inside -> outside — skip clock_out (GPS drift causes false exits) logger.debug( "Fence exit ignored: device=%d fence=%d(%s), no clock_out created", device_id, fence.id, fence.name, ) # 4. Update state if state is None: state = DeviceFenceState( device_id=device_id, fence_id=fence.id, is_inside=currently_inside, last_transition_at=now if (currently_inside != was_inside) else None, last_check_at=now, last_latitude=latitude, last_longitude=longitude, ) session.add(state) else: if currently_inside != was_inside: state.last_transition_at = now state.is_inside = currently_inside state.last_check_at = now state.last_latitude = latitude state.last_longitude = longitude await session.flush() return events def _update_state( state: DeviceFenceState, is_inside: bool, now: datetime, lat: float, lon: float, ) -> None: """Update state fields without creating a transition.""" state.last_check_at = now state.last_latitude = lat state.last_longitude = lon # Don't update is_inside or last_transition_at during debounce def _create_attendance( device_id: int, imei: str, attendance_type: str, latitude: float, longitude: float, address: Optional[str], recorded_at: datetime, fence: FenceConfig, device_info: Optional[dict] = None, ) -> AttendanceRecord: """Create an auto-generated fence attendance record.""" info = device_info or {} return AttendanceRecord( device_id=device_id, imei=imei, attendance_type=attendance_type, attendance_source="fence", protocol_number=0, # synthetic, not from device protocol gps_positioned=True, latitude=latitude, longitude=longitude, address=address, recorded_at=recorded_at, battery_level=info.get("battery_level"), gsm_signal=info.get("gsm_signal"), mcc=info.get("mcc"), mnc=info.get("mnc"), lac=info.get("lac"), cell_id=info.get("cell_id"), lbs_data={ "source": "fence_auto", "fence_id": fence.id, "fence_name": fence.name, }, ) def _build_event( device_id: int, imei: str, fence: FenceConfig, attendance_type: str, latitude: float, longitude: float, address: Optional[str], recorded_at: datetime, ) -> dict: """Build a WebSocket broadcast event dict.""" return { "device_id": device_id, "imei": imei, "fence_id": fence.id, "fence_name": fence.name, "attendance_type": attendance_type, "latitude": latitude, "longitude": longitude, "address": address, "recorded_at": recorded_at.isoformat() if recorded_at else None, "source": "fence_auto", }