""" Heartbeats Router - 心跳数据接口 API endpoints for querying device heartbeat records. """ import math from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models import HeartbeatRecord from app.schemas import ( APIResponse, HeartbeatRecordResponse, PaginatedList, ) from app.services import device_service router = APIRouter(prefix="/api/heartbeats", tags=["Heartbeats / 心跳数据"]) @router.get( "", response_model=APIResponse[PaginatedList[HeartbeatRecordResponse]], summary="获取心跳记录列表 / List heartbeat records", ) async def list_heartbeats( device_id: int | None = Query(default=None, description="设备ID / Device ID"), start_time: datetime | None = Query(default=None, description="开始时间 / Start time (ISO 8601)"), end_time: datetime | None = Query(default=None, description="结束时间 / End time (ISO 8601)"), page: int = Query(default=1, ge=1, description="页码 / Page number"), page_size: int = Query(default=20, ge=1, le=100, description="每页数量 / Items per page"), db: AsyncSession = Depends(get_db), ): """ 获取心跳记录列表,支持按设备和时间范围过滤。 List heartbeat records with optional device and time range filters. """ query = select(HeartbeatRecord) count_query = select(func.count(HeartbeatRecord.id)) if device_id is not None: query = query.where(HeartbeatRecord.device_id == device_id) count_query = count_query.where(HeartbeatRecord.device_id == device_id) if start_time: query = query.where(HeartbeatRecord.created_at >= start_time) count_query = count_query.where(HeartbeatRecord.created_at >= start_time) if end_time: query = query.where(HeartbeatRecord.created_at <= end_time) count_query = count_query.where(HeartbeatRecord.created_at <= end_time) total_result = await db.execute(count_query) total = total_result.scalar() or 0 offset = (page - 1) * page_size query = query.order_by(HeartbeatRecord.created_at.desc()).offset(offset).limit(page_size) result = await db.execute(query) records = list(result.scalars().all()) return APIResponse( data=PaginatedList( items=[HeartbeatRecordResponse.model_validate(r) for r in records], total=total, page=page, page_size=page_size, total_pages=math.ceil(total / page_size) if total else 0, ) ) @router.get( "/stats", response_model=APIResponse[dict], summary="心跳统计 / Heartbeat statistics", ) async def heartbeat_stats( hours: int = Query(default=24, ge=1, le=168, description="统计时间范围(小时)"), db: AsyncSession = Depends(get_db), ): """ 心跳数据统计:总记录数、活跃设备数、平均电量、按设备心跳间隔分析。 Heartbeat stats: total, active devices, avg battery, interval analysis. """ from app.config import now_cst from app.models import Device from datetime import timedelta now = now_cst() cutoff = now - timedelta(hours=hours) # Total in range total = (await db.execute( select(func.count(HeartbeatRecord.id)).where(HeartbeatRecord.created_at >= cutoff) )).scalar() or 0 # Unique devices with heartbeats in range active_devices = (await db.execute( select(func.count(func.distinct(HeartbeatRecord.device_id))) .where(HeartbeatRecord.created_at >= cutoff) )).scalar() or 0 # Total registered devices total_devices = (await db.execute(select(func.count(Device.id)))).scalar() or 0 # Avg battery and signal in range avg_result = await db.execute( select( func.avg(HeartbeatRecord.battery_level), func.avg(HeartbeatRecord.gsm_signal), ).where(HeartbeatRecord.created_at >= cutoff) ) avg_row = avg_result.one() avg_battery = round(float(avg_row[0]), 1) if avg_row[0] else None avg_signal = round(float(avg_row[1]), 1) if avg_row[1] else None # Per-device heartbeat count in range (for interval estimation) # Devices with < expected heartbeats may be anomalous per_device_result = await db.execute( select( HeartbeatRecord.device_id, HeartbeatRecord.imei, func.count(HeartbeatRecord.id).label("hb_count"), func.min(HeartbeatRecord.created_at).label("first_hb"), func.max(HeartbeatRecord.created_at).label("last_hb"), ) .where(HeartbeatRecord.created_at >= cutoff) .group_by(HeartbeatRecord.device_id, HeartbeatRecord.imei) .order_by(func.count(HeartbeatRecord.id).desc()) ) device_intervals = [] anomalous_devices = [] for row in per_device_result.all(): hb_count = row[2] first_hb = row[3] last_hb = row[4] if hb_count > 1 and first_hb and last_hb: span_min = (last_hb - first_hb).total_seconds() / 60 avg_interval_min = round(span_min / (hb_count - 1), 1) if hb_count > 1 else 0 else: avg_interval_min = 0 entry = { "device_id": row[0], "imei": row[1], "heartbeat_count": hb_count, "avg_interval_minutes": avg_interval_min, } device_intervals.append(entry) # Flag devices with very few heartbeats (expected ~12/h * hours) if hb_count < max(1, hours * 2): anomalous_devices.append(entry) return APIResponse(data={ "period_hours": hours, "total_heartbeats": total, "active_devices": active_devices, "total_devices": total_devices, "inactive_devices": total_devices - active_devices, "avg_battery_level": avg_battery, "avg_gsm_signal": avg_signal, "device_intervals": device_intervals[:20], "anomalous_devices": anomalous_devices[:10], }) @router.post( "/batch-delete", response_model=APIResponse[dict], summary="批量删除心跳记录 / Batch delete heartbeats", ) async def batch_delete_heartbeats( body: dict, db: AsyncSession = Depends(get_db), ): """批量删除心跳记录,最多500条。 / Batch delete heartbeat records (max 500).""" record_ids = body.get("record_ids", []) if not record_ids: raise HTTPException(status_code=400, detail="record_ids is required") if len(record_ids) > 500: raise HTTPException(status_code=400, detail="Max 500 records per request") result = await db.execute( select(HeartbeatRecord).where(HeartbeatRecord.id.in_(record_ids)) ) records = list(result.scalars().all()) for r in records: await db.delete(r) await db.flush() return APIResponse(data={"deleted": len(records)}) @router.get( "/{heartbeat_id}", response_model=APIResponse[HeartbeatRecordResponse], summary="获取心跳详情 / Get heartbeat details", ) async def get_heartbeat(heartbeat_id: int, db: AsyncSession = Depends(get_db)): """ 按ID获取心跳记录详情。 Get heartbeat record details by ID. """ result = await db.execute( select(HeartbeatRecord).where(HeartbeatRecord.id == heartbeat_id) ) record = result.scalar_one_or_none() if record is None: raise HTTPException(status_code=404, detail=f"Heartbeat {heartbeat_id} not found") return APIResponse(data=HeartbeatRecordResponse.model_validate(record))