Files
desungongpai/app/routers/heartbeats.py

211 lines
7.5 KiB
Python
Raw Normal View History

"""
Heartbeats Router - 心跳数据接口
API endpoints for querying device heartbeat records.
"""
import math
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models import HeartbeatRecord
from app.schemas import (
APIResponse,
HeartbeatRecordResponse,
PaginatedList,
)
from app.services import device_service
router = APIRouter(prefix="/api/heartbeats", tags=["Heartbeats / 心跳数据"])
@router.get(
"",
response_model=APIResponse[PaginatedList[HeartbeatRecordResponse]],
summary="获取心跳记录列表 / List heartbeat records",
)
async def list_heartbeats(
device_id: int | None = Query(default=None, description="设备ID / Device ID"),
start_time: datetime | None = Query(default=None, description="开始时间 / Start time (ISO 8601)"),
end_time: datetime | None = Query(default=None, description="结束时间 / End time (ISO 8601)"),
page: int = Query(default=1, ge=1, description="页码 / Page number"),
page_size: int = Query(default=20, ge=1, le=100, description="每页数量 / Items per page"),
db: AsyncSession = Depends(get_db),
):
"""
获取心跳记录列表支持按设备和时间范围过滤
List heartbeat records with optional device and time range filters.
"""
query = select(HeartbeatRecord)
count_query = select(func.count(HeartbeatRecord.id))
if device_id is not None:
query = query.where(HeartbeatRecord.device_id == device_id)
count_query = count_query.where(HeartbeatRecord.device_id == device_id)
if start_time:
query = query.where(HeartbeatRecord.created_at >= start_time)
count_query = count_query.where(HeartbeatRecord.created_at >= start_time)
if end_time:
query = query.where(HeartbeatRecord.created_at <= end_time)
count_query = count_query.where(HeartbeatRecord.created_at <= end_time)
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
offset = (page - 1) * page_size
query = query.order_by(HeartbeatRecord.created_at.desc()).offset(offset).limit(page_size)
result = await db.execute(query)
records = list(result.scalars().all())
return APIResponse(
data=PaginatedList(
items=[HeartbeatRecordResponse.model_validate(r) for r in records],
total=total,
page=page,
page_size=page_size,
total_pages=math.ceil(total / page_size) if total else 0,
)
)
@router.get(
"/stats",
response_model=APIResponse[dict],
summary="心跳统计 / Heartbeat statistics",
)
async def heartbeat_stats(
hours: int = Query(default=24, ge=1, le=168, description="统计时间范围(小时)"),
db: AsyncSession = Depends(get_db),
):
"""
心跳数据统计总记录数活跃设备数平均电量按设备心跳间隔分析
Heartbeat stats: total, active devices, avg battery, interval analysis.
"""
from app.config import now_cst
from app.models import Device
from datetime import timedelta
now = now_cst()
cutoff = now - timedelta(hours=hours)
# Total in range
total = (await db.execute(
select(func.count(HeartbeatRecord.id)).where(HeartbeatRecord.created_at >= cutoff)
)).scalar() or 0
# Unique devices with heartbeats in range
active_devices = (await db.execute(
select(func.count(func.distinct(HeartbeatRecord.device_id)))
.where(HeartbeatRecord.created_at >= cutoff)
)).scalar() or 0
# Total registered devices
total_devices = (await db.execute(select(func.count(Device.id)))).scalar() or 0
# Avg battery and signal in range
avg_result = await db.execute(
select(
func.avg(HeartbeatRecord.battery_level),
func.avg(HeartbeatRecord.gsm_signal),
).where(HeartbeatRecord.created_at >= cutoff)
)
avg_row = avg_result.one()
avg_battery = round(float(avg_row[0]), 1) if avg_row[0] else None
avg_signal = round(float(avg_row[1]), 1) if avg_row[1] else None
# Per-device heartbeat count in range (for interval estimation)
# Devices with < expected heartbeats may be anomalous
per_device_result = await db.execute(
select(
HeartbeatRecord.device_id,
HeartbeatRecord.imei,
func.count(HeartbeatRecord.id).label("hb_count"),
func.min(HeartbeatRecord.created_at).label("first_hb"),
func.max(HeartbeatRecord.created_at).label("last_hb"),
)
.where(HeartbeatRecord.created_at >= cutoff)
.group_by(HeartbeatRecord.device_id, HeartbeatRecord.imei)
.order_by(func.count(HeartbeatRecord.id).desc())
)
device_intervals = []
anomalous_devices = []
for row in per_device_result.all():
hb_count = row[2]
first_hb = row[3]
last_hb = row[4]
if hb_count > 1 and first_hb and last_hb:
span_min = (last_hb - first_hb).total_seconds() / 60
avg_interval_min = round(span_min / (hb_count - 1), 1) if hb_count > 1 else 0
else:
avg_interval_min = 0
entry = {
"device_id": row[0], "imei": row[1],
"heartbeat_count": hb_count,
"avg_interval_minutes": avg_interval_min,
}
device_intervals.append(entry)
# Flag devices with very few heartbeats (expected ~12/h * hours)
if hb_count < max(1, hours * 2):
anomalous_devices.append(entry)
return APIResponse(data={
"period_hours": hours,
"total_heartbeats": total,
"active_devices": active_devices,
"total_devices": total_devices,
"inactive_devices": total_devices - active_devices,
"avg_battery_level": avg_battery,
"avg_gsm_signal": avg_signal,
"device_intervals": device_intervals[:20],
"anomalous_devices": anomalous_devices[:10],
})
@router.post(
"/batch-delete",
response_model=APIResponse[dict],
summary="批量删除心跳记录 / Batch delete heartbeats",
)
async def batch_delete_heartbeats(
body: dict,
db: AsyncSession = Depends(get_db),
):
"""批量删除心跳记录最多500条。 / Batch delete heartbeat records (max 500)."""
record_ids = body.get("record_ids", [])
if not record_ids:
raise HTTPException(status_code=400, detail="record_ids is required")
if len(record_ids) > 500:
raise HTTPException(status_code=400, detail="Max 500 records per request")
result = await db.execute(
select(HeartbeatRecord).where(HeartbeatRecord.id.in_(record_ids))
)
records = list(result.scalars().all())
for r in records:
await db.delete(r)
await db.flush()
return APIResponse(data={"deleted": len(records)})
@router.get(
"/{heartbeat_id}",
response_model=APIResponse[HeartbeatRecordResponse],
summary="获取心跳详情 / Get heartbeat details",
)
async def get_heartbeat(heartbeat_id: int, db: AsyncSession = Depends(get_db)):
"""
按ID获取心跳记录详情
Get heartbeat record details by ID.
"""
result = await db.execute(
select(HeartbeatRecord).where(HeartbeatRecord.id == heartbeat_id)
)
record = result.scalar_one_or_none()
if record is None:
raise HTTPException(status_code=404, detail=f"Heartbeat {heartbeat_id} not found")
return APIResponse(data=HeartbeatRecordResponse.model_validate(record))