""" Shared FastAPI dependencies. Supports master API key (env) and database-managed API keys with permission levels. Includes in-memory cache to avoid DB lookup on every request. """ import hashlib import secrets import time from fastapi import Depends, HTTPException, Request, Security from fastapi.security import APIKeyHeader from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.database import get_db _api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) # Permission hierarchy: admin > write > read _PERMISSION_LEVELS = {"read": 1, "write": 2, "admin": 3} # In-memory auth cache: {key_hash: (result_dict, expire_timestamp)} _AUTH_CACHE: dict[str, tuple[dict, float]] = {} _AUTH_CACHE_TTL = 60 # seconds def _hash_key(key: str) -> str: """SHA-256 hash of an API key.""" return hashlib.sha256(key.encode()).hexdigest() async def verify_api_key( request: Request, api_key: str | None = Security(_api_key_header), db: AsyncSession = Depends(get_db), ) -> dict | None: """Verify API key. Returns key info dict or None (auth disabled). Checks master key first, then in-memory cache, then database keys. Returns {"permissions": "admin"|"write"|"read", "key_id": int|None, "name": str}. """ if settings.API_KEY is None: return None # Auth disabled if api_key is None: raise HTTPException(status_code=401, detail="Missing API key / 缺少 API Key") # Check master key if secrets.compare_digest(api_key, settings.API_KEY): info = {"permissions": "admin", "key_id": None, "name": "master"} request.state.key_info = info return info # Check in-memory cache first key_hash = _hash_key(api_key) now = time.monotonic() cached = _AUTH_CACHE.get(key_hash) if cached is not None: result, expires = cached if now < expires: return result # Check database keys from app.models import ApiKey result = await db.execute( select(ApiKey).where(ApiKey.key_hash == key_hash, ApiKey.is_active == True) # noqa: E712 ) db_key = result.scalar_one_or_none() if db_key is None: _AUTH_CACHE.pop(key_hash, None) raise HTTPException(status_code=401, detail="Invalid API key / 无效的 API Key") # Update last_used_at (deferred — only on cache miss, not every request) from app.config import now_cst db_key.last_used_at = now_cst() await db.flush() key_info = {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name} _AUTH_CACHE[key_hash] = (key_info, now + _AUTH_CACHE_TTL) request.state.key_info = key_info return key_info def require_permission(min_level: str): """Factory for permission-checking dependencies.""" async def _check(key_info: dict | None = Depends(verify_api_key)): if key_info is None: return # Auth disabled current = _PERMISSION_LEVELS.get(key_info["permissions"], 0) required = _PERMISSION_LEVELS.get(min_level, 0) if current < required: raise HTTPException( status_code=403, detail=f"Insufficient permissions. Requires '{min_level}' / 权限不足,需要 '{min_level}' 权限", ) return key_info return _check require_write = require_permission("write") require_admin = require_permission("admin")