Files
desungongpai/app/dependencies.py
default 1d06cc5415 feat: 高德IoT v5 API升级、电子围栏管理、设备绑定自动考勤
- 前向地理编码升级为高德IoT v5 API (POST restapi.amap.com/v5/position/IoT)
- 修复LBS定位偏差: 添加network=LTE参数区分4G/2G, bts格式补充cage字段
- 新增电子围栏管理模块 (circle/polygon/rectangle), 支持地图绘制和POI搜索
- 新增设备-围栏多对多绑定 (DeviceFenceBinding/DeviceFenceState)
- 围栏自动考勤引擎 (fence_checker.py): haversine距离、ray-casting多边形判定、容差机制、防抖
- TCP位置上报自动检测围栏进出, 生成考勤记录并WebSocket广播
- 前端围栏页面: 绑定设备弹窗、POI搜索定位、左侧围栏面板
- 新增fence_attendance WebSocket topic

via [HAPI](https://hapi.run)

Co-Authored-By: HAPI <noreply@hapi.run>
2026-03-27 13:04:11 +00:00

87 lines
2.7 KiB
Python

"""
Shared FastAPI dependencies.
Supports master API key (env) and database-managed API keys with permission levels.
"""
import hashlib
import secrets
from datetime import datetime, timezone
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Permission hierarchy: admin > write > read
_PERMISSION_LEVELS = {"read": 1, "write": 2, "admin": 3}
def _hash_key(key: str) -> str:
"""SHA-256 hash of an API key."""
return hashlib.sha256(key.encode()).hexdigest()
async def verify_api_key(
api_key: str | None = Security(_api_key_header),
db: AsyncSession = Depends(get_db),
) -> dict | None:
"""Verify API key. Returns key info dict or None (auth disabled).
Checks master key first, then database keys.
Returns {"permissions": "admin"|"write"|"read", "key_id": int|None, "name": str}.
"""
if settings.API_KEY is None:
return None # Auth disabled
if api_key is None:
raise HTTPException(status_code=401, detail="Missing API key / 缺少 API Key")
# Check master key
if secrets.compare_digest(api_key, settings.API_KEY):
return {"permissions": "admin", "key_id": None, "name": "master"}
# Check database keys
from app.models import ApiKey
key_hash = _hash_key(api_key)
result = await db.execute(
select(ApiKey).where(ApiKey.key_hash == key_hash, ApiKey.is_active == True) # noqa: E712
)
db_key = result.scalar_one_or_none()
if db_key is None:
raise HTTPException(status_code=401, detail="Invalid API key / 无效的 API Key")
# Update last_used_at
from app.config import now_cst
db_key.last_used_at = now_cst()
await db.flush()
return {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name}
def require_permission(min_level: str):
"""Factory for permission-checking dependencies."""
async def _check(key_info: dict | None = Depends(verify_api_key)):
if key_info is None:
return # Auth disabled
current = _PERMISSION_LEVELS.get(key_info["permissions"], 0)
required = _PERMISSION_LEVELS.get(min_level, 0)
if current < required:
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Requires '{min_level}' / 权限不足,需要 '{min_level}' 权限",
)
return key_info
return _check
require_write = require_permission("write")
require_admin = require_permission("admin")