""" Location Service - 位置数据服务 Provides query operations for GPS / LBS / WIFI location records. """ from datetime import datetime from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models import LocationRecord async def get_locations( db: AsyncSession, device_id: int | None = None, location_type: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, page: int = 1, page_size: int = 20, ) -> tuple[list[LocationRecord], int]: """ 获取位置记录列表(分页)/ Get paginated location records. Parameters ---------- db : AsyncSession Database session. device_id : int, optional Filter by device ID. location_type : str, optional Filter by location type (gps, lbs, wifi, gps_4g, lbs_4g, wifi_4g). start_time : datetime, optional Filter records after this time. end_time : datetime, optional Filter records before this time. page : int Page number (1-indexed). page_size : int Number of items per page. Returns ------- tuple[list[LocationRecord], int] (list of location records, total count) """ query = select(LocationRecord) count_query = select(func.count(LocationRecord.id)) if device_id is not None: query = query.where(LocationRecord.device_id == device_id) count_query = count_query.where(LocationRecord.device_id == device_id) if location_type: query = query.where(LocationRecord.location_type == location_type) count_query = count_query.where(LocationRecord.location_type == location_type) if start_time: query = query.where(LocationRecord.recorded_at >= start_time) count_query = count_query.where(LocationRecord.recorded_at >= start_time) if end_time: query = query.where(LocationRecord.recorded_at <= end_time) count_query = count_query.where(LocationRecord.recorded_at <= end_time) total_result = await db.execute(count_query) total = total_result.scalar() or 0 offset = (page - 1) * page_size query = query.order_by(LocationRecord.recorded_at.desc()).offset(offset).limit(page_size) result = await db.execute(query) records = list(result.scalars().all()) return records, total async def get_latest_location( db: AsyncSession, device_id: int ) -> LocationRecord | None: """ 获取设备最新位置 / Get the most recent location for a device. Parameters ---------- db : AsyncSession Database session. device_id : int Device ID. Returns ------- LocationRecord | None """ result = await db.execute( select(LocationRecord) .where(LocationRecord.device_id == device_id) .order_by(LocationRecord.recorded_at.desc()) .limit(1) ) return result.scalar_one_or_none() async def get_batch_latest_locations( db: AsyncSession, device_ids: list[int] ) -> list[LocationRecord]: """ 批量获取多设备最新位置 / Get the most recent location for each device in the list. Uses a subquery with MAX(id) GROUP BY device_id for efficiency. """ if not device_ids: return [] # Subquery: max id per device_id subq = ( select(func.max(LocationRecord.id).label("max_id")) .where(LocationRecord.device_id.in_(device_ids)) .group_by(LocationRecord.device_id) .subquery() ) result = await db.execute( select(LocationRecord).where(LocationRecord.id.in_(select(subq.c.max_id))) ) return list(result.scalars().all()) async def get_all_online_latest_locations( db: AsyncSession, ) -> list[LocationRecord]: """ 获取所有在线设备的最新位置 / Get latest location for all online devices. """ from app.models import Device # Get online device IDs online_result = await db.execute( select(Device.id).where(Device.status == "online") ) online_ids = [row[0] for row in online_result.all()] if not online_ids: return [] return await get_batch_latest_locations(db, online_ids) async def get_device_track( db: AsyncSession, device_id: int, start_time: datetime, end_time: datetime, max_points: int = 10000, ) -> list[LocationRecord]: """ 获取设备轨迹 / Get device movement track within a time range. Parameters ---------- db : AsyncSession Database session. device_id : int Device ID. start_time : datetime Start of time range. end_time : datetime End of time range. Returns ------- list[LocationRecord] Location records ordered by recorded_at ascending (chronological). """ result = await db.execute( select(LocationRecord) .where( LocationRecord.device_id == device_id, LocationRecord.recorded_at >= start_time, LocationRecord.recorded_at <= end_time, ) .order_by(LocationRecord.recorded_at.asc()) .limit(max_points) ) return list(result.scalars().all())