Files
desungongpai/app/dependencies.py

107 lines
3.4 KiB
Python
Raw Permalink Normal View History

"""
Shared FastAPI dependencies.
Supports master API key (env) and database-managed API keys with permission levels.
Includes in-memory cache to avoid DB lookup on every request.
"""
import hashlib
import secrets
import time
from fastapi import Depends, HTTPException, Request, Security
from fastapi.security import APIKeyHeader
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Permission hierarchy: admin > write > read
_PERMISSION_LEVELS = {"read": 1, "write": 2, "admin": 3}
# In-memory auth cache: {key_hash: (result_dict, expire_timestamp)}
_AUTH_CACHE: dict[str, tuple[dict, float]] = {}
_AUTH_CACHE_TTL = 60 # seconds
def _hash_key(key: str) -> str:
"""SHA-256 hash of an API key."""
return hashlib.sha256(key.encode()).hexdigest()
async def verify_api_key(
request: Request,
api_key: str | None = Security(_api_key_header),
db: AsyncSession = Depends(get_db),
) -> dict | None:
"""Verify API key. Returns key info dict or None (auth disabled).
Checks master key first, then in-memory cache, then database keys.
Returns {"permissions": "admin"|"write"|"read", "key_id": int|None, "name": str}.
"""
if settings.API_KEY is None:
return None # Auth disabled
if api_key is None:
raise HTTPException(status_code=401, detail="Missing API key / 缺少 API Key")
# Check master key
if secrets.compare_digest(api_key, settings.API_KEY):
info = {"permissions": "admin", "key_id": None, "name": "master"}
request.state.key_info = info
return info
# Check in-memory cache first
key_hash = _hash_key(api_key)
now = time.monotonic()
cached = _AUTH_CACHE.get(key_hash)
if cached is not None:
result, expires = cached
if now < expires:
return result
# Check database keys
from app.models import ApiKey
result = await db.execute(
select(ApiKey).where(ApiKey.key_hash == key_hash, ApiKey.is_active == True) # noqa: E712
)
db_key = result.scalar_one_or_none()
if db_key is None:
_AUTH_CACHE.pop(key_hash, None)
raise HTTPException(status_code=401, detail="Invalid API key / 无效的 API Key")
# Update last_used_at (deferred — only on cache miss, not every request)
from app.config import now_cst
db_key.last_used_at = now_cst()
await db.flush()
key_info = {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name}
_AUTH_CACHE[key_hash] = (key_info, now + _AUTH_CACHE_TTL)
request.state.key_info = key_info
return key_info
def require_permission(min_level: str):
"""Factory for permission-checking dependencies."""
async def _check(key_info: dict | None = Depends(verify_api_key)):
if key_info is None:
return # Auth disabled
current = _PERMISSION_LEVELS.get(key_info["permissions"], 0)
required = _PERMISSION_LEVELS.get(min_level, 0)
if current < required:
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Requires '{min_level}' / 权限不足,需要 '{min_level}' 权限",
)
return key_info
return _check
require_write = require_permission("write")
require_admin = require_permission("admin")