Files
desungongpai/app/routers/system.py

300 lines
11 KiB
Python
Raw Permalink Normal View History

"""
System Management Router - 系统管理接口
Audit logs, runtime config, database backup, firmware info.
"""
import shutil
import time
from datetime import datetime, timedelta
from pathlib import Path
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import FileResponse
from sqlalchemy import select, func, delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings, now_cst
from app.database import get_db
from app.dependencies import require_admin
from app.models import AuditLog, Device
from app.schemas import (
APIResponse,
AuditLogResponse,
PaginatedList,
SystemConfigResponse,
SystemConfigUpdate,
)
router = APIRouter(prefix="/api/system", tags=["System / 系统管理"])
# ---------------------------------------------------------------------------
# Audit Logs
# ---------------------------------------------------------------------------
@router.get(
"/audit-logs",
response_model=APIResponse[PaginatedList[AuditLogResponse]],
summary="获取审计日志 / List audit logs",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def list_audit_logs(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
method: str | None = Query(None, description="Filter by HTTP method (POST/PUT/DELETE)"),
path_contains: str | None = Query(None, description="Filter by path keyword"),
operator: str | None = Query(None, description="Filter by operator name"),
start_time: datetime | None = None,
end_time: datetime | None = None,
db: AsyncSession = Depends(get_db),
):
"""查询操作审计日志 (admin only)。"""
query = select(AuditLog)
count_query = select(func.count(AuditLog.id))
if method:
query = query.where(AuditLog.method == method.upper())
count_query = count_query.where(AuditLog.method == method.upper())
if path_contains:
query = query.where(AuditLog.path.contains(path_contains))
count_query = count_query.where(AuditLog.path.contains(path_contains))
if operator:
query = query.where(AuditLog.operator == operator)
count_query = count_query.where(AuditLog.operator == operator)
if start_time:
query = query.where(AuditLog.created_at >= start_time)
count_query = count_query.where(AuditLog.created_at >= start_time)
if end_time:
query = query.where(AuditLog.created_at <= end_time)
count_query = count_query.where(AuditLog.created_at <= end_time)
total = (await db.execute(count_query)).scalar() or 0
total_pages = (total + page_size - 1) // page_size
rows = await db.execute(
query.order_by(AuditLog.id.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
items = [AuditLogResponse.model_validate(r) for r in rows.scalars().all()]
return APIResponse(
data=PaginatedList(
items=items, total=total, page=page,
page_size=page_size, total_pages=total_pages,
)
)
@router.delete(
"/audit-logs",
response_model=APIResponse,
summary="清理审计日志 / Clean audit logs",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def clean_audit_logs(
days: int = Query(..., ge=1, le=3650, description="Delete logs older than N days"),
db: AsyncSession = Depends(get_db),
):
"""删除N天前的审计日志 (admin only)。"""
cutoff = now_cst() - timedelta(days=days)
result = await db.execute(
delete(AuditLog).where(AuditLog.created_at < cutoff)
)
return APIResponse(
message=f"已删除 {result.rowcount}{days} 天前的审计日志",
data={"deleted": result.rowcount},
)
# ---------------------------------------------------------------------------
# System Config
# ---------------------------------------------------------------------------
@router.get(
"/config",
response_model=APIResponse[SystemConfigResponse],
summary="获取运行时配置 / Get runtime config",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def get_config():
"""获取当前运行时配置参数 (admin only)。"""
return APIResponse(data=SystemConfigResponse(
data_retention_days=settings.DATA_RETENTION_DAYS,
data_cleanup_interval_hours=settings.DATA_CLEANUP_INTERVAL_HOURS,
tcp_idle_timeout=settings.TCP_IDLE_TIMEOUT,
fence_check_enabled=settings.FENCE_CHECK_ENABLED,
fence_lbs_tolerance_meters=settings.FENCE_LBS_TOLERANCE_METERS,
fence_wifi_tolerance_meters=settings.FENCE_WIFI_TOLERANCE_METERS,
fence_min_inside_seconds=settings.FENCE_MIN_INSIDE_SECONDS,
rate_limit_default=settings.RATE_LIMIT_DEFAULT,
rate_limit_write=settings.RATE_LIMIT_WRITE,
track_max_points=settings.TRACK_MAX_POINTS,
geocoding_cache_size=settings.GEOCODING_CACHE_SIZE,
))
@router.put(
"/config",
response_model=APIResponse[SystemConfigResponse],
summary="更新运行时配置 / Update runtime config",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def update_config(body: SystemConfigUpdate):
"""更新运行时配置参数 (admin only)。仅影响当前进程,重启后恢复 .env 值。"""
update_data = body.model_dump(exclude_unset=True)
for key, value in update_data.items():
attr_name = key.upper()
if hasattr(settings, attr_name):
object.__setattr__(settings, attr_name, value)
return APIResponse(
message=f"已更新 {len(update_data)} 项配置 (进程级,重启后恢复)",
data=SystemConfigResponse(
data_retention_days=settings.DATA_RETENTION_DAYS,
data_cleanup_interval_hours=settings.DATA_CLEANUP_INTERVAL_HOURS,
tcp_idle_timeout=settings.TCP_IDLE_TIMEOUT,
fence_check_enabled=settings.FENCE_CHECK_ENABLED,
fence_lbs_tolerance_meters=settings.FENCE_LBS_TOLERANCE_METERS,
fence_wifi_tolerance_meters=settings.FENCE_WIFI_TOLERANCE_METERS,
fence_min_inside_seconds=settings.FENCE_MIN_INSIDE_SECONDS,
rate_limit_default=settings.RATE_LIMIT_DEFAULT,
rate_limit_write=settings.RATE_LIMIT_WRITE,
track_max_points=settings.TRACK_MAX_POINTS,
geocoding_cache_size=settings.GEOCODING_CACHE_SIZE,
),
)
# ---------------------------------------------------------------------------
# Database Backup
# ---------------------------------------------------------------------------
@router.post(
"/backup",
summary="创建数据库备份 / Create database backup",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def create_backup():
"""
触发 SQLite 数据库备份返回备份文件下载 (admin only)
使用 SQLite 原生 backup API 确保一致性
"""
import aiosqlite
db_path = settings.DATABASE_URL.replace("sqlite+aiosqlite:///", "")
if not Path(db_path).exists():
return APIResponse(code=500, message="Database file not found")
backup_dir = Path(db_path).parent / "backups"
backup_dir.mkdir(exist_ok=True)
timestamp = now_cst().strftime("%Y%m%d_%H%M%S")
backup_name = f"badge_admin_backup_{timestamp}.db"
backup_path = backup_dir / backup_name
# Use SQLite online backup API for consistency
async with aiosqlite.connect(db_path) as source:
async with aiosqlite.connect(str(backup_path)) as dest:
await source.backup(dest)
size_mb = round(backup_path.stat().st_size / 1024 / 1024, 2)
return FileResponse(
path=str(backup_path),
filename=backup_name,
media_type="application/octet-stream",
headers={
"X-Backup-Size-MB": str(size_mb),
"X-Backup-Timestamp": timestamp,
},
)
@router.get(
"/backups",
response_model=APIResponse[list[dict]],
summary="列出数据库备份 / List backups",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def list_backups():
"""列出所有已有的数据库备份文件 (admin only)。"""
db_path = settings.DATABASE_URL.replace("sqlite+aiosqlite:///", "")
backup_dir = Path(db_path).parent / "backups"
if not backup_dir.exists():
return APIResponse(data=[])
backups = []
for f in sorted(backup_dir.glob("*.db"), reverse=True):
backups.append({
"filename": f.name,
"size_mb": round(f.stat().st_size / 1024 / 1024, 2),
"created_at": datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
})
return APIResponse(data=backups)
@router.delete(
"/backups/{filename}",
response_model=APIResponse,
summary="删除数据库备份 / Delete backup",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def delete_backup(filename: str):
"""删除指定的数据库备份文件 (admin only)。"""
db_path = settings.DATABASE_URL.replace("sqlite+aiosqlite:///", "")
backup_path = Path(db_path).parent / "backups" / filename
if not backup_path.exists() or not backup_path.suffix == ".db":
return APIResponse(code=404, message="Backup not found")
# Prevent path traversal
if ".." in filename or "/" in filename:
return APIResponse(code=400, message="Invalid filename")
backup_path.unlink()
return APIResponse(message=f"已删除备份 {filename}")
# ---------------------------------------------------------------------------
# Device Firmware Info
# ---------------------------------------------------------------------------
@router.get(
"/firmware",
response_model=APIResponse[list[dict]],
summary="设备固件信息 / Device firmware info",
dependencies=[Depends(require_admin)] if settings.API_KEY else [],
)
async def list_firmware_info(
status: str | None = Query(None, description="Filter by device status"),
db: AsyncSession = Depends(get_db),
):
"""
获取所有设备的固件相关信息 (型号ICCIDIMSI等)
实际固件版本需通过 VERSION# 指令查询 (admin only)。
"""
query = select(Device).order_by(Device.id)
if status:
query = query.where(Device.status == status)
result = await db.execute(query)
devices = result.scalars().all()
firmware_list = []
for d in devices:
firmware_list.append({
"device_id": d.id,
"imei": d.imei,
"name": d.name,
"device_type": d.device_type,
"status": d.status,
"iccid": d.iccid,
"imsi": d.imsi,
"last_login": d.last_login.isoformat() if d.last_login else None,
"last_heartbeat": d.last_heartbeat.isoformat() if d.last_heartbeat else None,
})
return APIResponse(data=firmware_list)