- 前向地理编码升级为高德IoT v5 API (POST restapi.amap.com/v5/position/IoT) - 修复LBS定位偏差: 添加network=LTE参数区分4G/2G, bts格式补充cage字段 - 新增电子围栏管理模块 (circle/polygon/rectangle), 支持地图绘制和POI搜索 - 新增设备-围栏多对多绑定 (DeviceFenceBinding/DeviceFenceState) - 围栏自动考勤引擎 (fence_checker.py): haversine距离、ray-casting多边形判定、容差机制、防抖 - TCP位置上报自动检测围栏进出, 生成考勤记录并WebSocket广播 - 前端围栏页面: 绑定设备弹窗、POI搜索定位、左侧围栏面板 - 新增fence_attendance WebSocket topic via [HAPI](https://hapi.run) Co-Authored-By: HAPI <noreply@hapi.run>
87 lines
2.7 KiB
Python
87 lines
2.7 KiB
Python
"""
|
|
Shared FastAPI dependencies.
|
|
Supports master API key (env) and database-managed API keys with permission levels.
|
|
"""
|
|
|
|
import hashlib
|
|
import secrets
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import Depends, HTTPException, Security
|
|
from fastapi.security import APIKeyHeader
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import settings
|
|
from app.database import get_db
|
|
|
|
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
|
|
|
|
# Permission hierarchy: admin > write > read
|
|
_PERMISSION_LEVELS = {"read": 1, "write": 2, "admin": 3}
|
|
|
|
|
|
def _hash_key(key: str) -> str:
|
|
"""SHA-256 hash of an API key."""
|
|
return hashlib.sha256(key.encode()).hexdigest()
|
|
|
|
|
|
async def verify_api_key(
|
|
api_key: str | None = Security(_api_key_header),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> dict | None:
|
|
"""Verify API key. Returns key info dict or None (auth disabled).
|
|
|
|
Checks master key first, then database keys.
|
|
Returns {"permissions": "admin"|"write"|"read", "key_id": int|None, "name": str}.
|
|
"""
|
|
if settings.API_KEY is None:
|
|
return None # Auth disabled
|
|
|
|
if api_key is None:
|
|
raise HTTPException(status_code=401, detail="Missing API key / 缺少 API Key")
|
|
|
|
# Check master key
|
|
if secrets.compare_digest(api_key, settings.API_KEY):
|
|
return {"permissions": "admin", "key_id": None, "name": "master"}
|
|
|
|
# Check database keys
|
|
from app.models import ApiKey
|
|
|
|
key_hash = _hash_key(api_key)
|
|
result = await db.execute(
|
|
select(ApiKey).where(ApiKey.key_hash == key_hash, ApiKey.is_active == True) # noqa: E712
|
|
)
|
|
db_key = result.scalar_one_or_none()
|
|
if db_key is None:
|
|
raise HTTPException(status_code=401, detail="Invalid API key / 无效的 API Key")
|
|
|
|
# Update last_used_at
|
|
from app.config import now_cst
|
|
db_key.last_used_at = now_cst()
|
|
await db.flush()
|
|
|
|
return {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name}
|
|
|
|
|
|
def require_permission(min_level: str):
|
|
"""Factory for permission-checking dependencies."""
|
|
|
|
async def _check(key_info: dict | None = Depends(verify_api_key)):
|
|
if key_info is None:
|
|
return # Auth disabled
|
|
current = _PERMISSION_LEVELS.get(key_info["permissions"], 0)
|
|
required = _PERMISSION_LEVELS.get(min_level, 0)
|
|
if current < required:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Insufficient permissions. Requires '{min_level}' / 权限不足,需要 '{min_level}' 权限",
|
|
)
|
|
return key_info
|
|
|
|
return _check
|
|
|
|
|
|
require_write = require_permission("write")
|
|
require_admin = require_permission("admin")
|