Files
desungongpai/app/dependencies.py
default 9cd9dd9d76 feat: 信标设备绑定 + 蓝牙模式管理 + 系统管理增强 + 数据导出
- 新增 DeviceBeaconBinding 模型,信标-设备多对多绑定 CRUD
- 蓝牙打卡模式批量配置/恢复正常模式 API
- 反向同步: 查询设备 BTMACSET 配置更新数据库绑定 (独立 session 解决事务隔离)
- 设备列表快捷操作弹窗修复 (fire-and-forget IIFE 替代阻塞轮询)
- 保存按钮防抖: 围栏/信标绑定保存点击后 disabled + 转圈防重复提交
- 审计日志中间件 + 系统配置/备份/固件 API
- 设备分组管理 + 告警规则配置
- 5个数据导出 API (CSV UTF-8 BOM)
- 位置热力图 + 告警条件删除 + 位置清理

via [HAPI](https://hapi.run)

Co-Authored-By: HAPI <noreply@hapi.run>
2026-04-01 07:06:37 +00:00

107 lines
3.4 KiB
Python

"""
Shared FastAPI dependencies.
Supports master API key (env) and database-managed API keys with permission levels.
Includes in-memory cache to avoid DB lookup on every request.
"""
import hashlib
import secrets
import time
from fastapi import Depends, HTTPException, Request, Security
from fastapi.security import APIKeyHeader
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Permission hierarchy: admin > write > read
_PERMISSION_LEVELS = {"read": 1, "write": 2, "admin": 3}
# In-memory auth cache: {key_hash: (result_dict, expire_timestamp)}
_AUTH_CACHE: dict[str, tuple[dict, float]] = {}
_AUTH_CACHE_TTL = 60 # seconds
def _hash_key(key: str) -> str:
"""SHA-256 hash of an API key."""
return hashlib.sha256(key.encode()).hexdigest()
async def verify_api_key(
request: Request,
api_key: str | None = Security(_api_key_header),
db: AsyncSession = Depends(get_db),
) -> dict | None:
"""Verify API key. Returns key info dict or None (auth disabled).
Checks master key first, then in-memory cache, then database keys.
Returns {"permissions": "admin"|"write"|"read", "key_id": int|None, "name": str}.
"""
if settings.API_KEY is None:
return None # Auth disabled
if api_key is None:
raise HTTPException(status_code=401, detail="Missing API key / 缺少 API Key")
# Check master key
if secrets.compare_digest(api_key, settings.API_KEY):
info = {"permissions": "admin", "key_id": None, "name": "master"}
request.state.key_info = info
return info
# Check in-memory cache first
key_hash = _hash_key(api_key)
now = time.monotonic()
cached = _AUTH_CACHE.get(key_hash)
if cached is not None:
result, expires = cached
if now < expires:
return result
# Check database keys
from app.models import ApiKey
result = await db.execute(
select(ApiKey).where(ApiKey.key_hash == key_hash, ApiKey.is_active == True) # noqa: E712
)
db_key = result.scalar_one_or_none()
if db_key is None:
_AUTH_CACHE.pop(key_hash, None)
raise HTTPException(status_code=401, detail="Invalid API key / 无效的 API Key")
# Update last_used_at (deferred — only on cache miss, not every request)
from app.config import now_cst
db_key.last_used_at = now_cst()
await db.flush()
key_info = {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name}
_AUTH_CACHE[key_hash] = (key_info, now + _AUTH_CACHE_TTL)
request.state.key_info = key_info
return key_info
def require_permission(min_level: str):
"""Factory for permission-checking dependencies."""
async def _check(key_info: dict | None = Depends(verify_api_key)):
if key_info is None:
return # Auth disabled
current = _PERMISSION_LEVELS.get(key_info["permissions"], 0)
required = _PERMISSION_LEVELS.get(min_level, 0)
if current < required:
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Requires '{min_level}' / 权限不足,需要 '{min_level}' 权限",
)
return key_info
return _check
require_write = require_permission("write")
require_admin = require_permission("admin")