Files
desungongpai/app/dependencies.py

88 lines
2.7 KiB
Python
Raw Normal View History

"""
Shared FastAPI dependencies.
Supports master API key (env) and database-managed API keys with permission levels.
"""
import hashlib
import secrets
from datetime import datetime
from app.config import BEIJING_TZ
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Permission hierarchy: admin > write > read
_PERMISSION_LEVELS = {"read": 1, "write": 2, "admin": 3}
def _hash_key(key: str) -> str:
"""SHA-256 hash of an API key."""
return hashlib.sha256(key.encode()).hexdigest()
async def verify_api_key(
api_key: str | None = Security(_api_key_header),
db: AsyncSession = Depends(get_db),
) -> dict | None:
"""Verify API key. Returns key info dict or None (auth disabled).
Checks master key first, then database keys.
Returns {"permissions": "admin"|"write"|"read", "key_id": int|None, "name": str}.
"""
if settings.API_KEY is None:
return None # Auth disabled
if api_key is None:
raise HTTPException(status_code=401, detail="Missing API key / 缺少 API Key")
# Check master key
if secrets.compare_digest(api_key, settings.API_KEY):
return {"permissions": "admin", "key_id": None, "name": "master"}
# Check database keys
from app.models import ApiKey
key_hash = _hash_key(api_key)
result = await db.execute(
select(ApiKey).where(ApiKey.key_hash == key_hash, ApiKey.is_active == True) # noqa: E712
)
db_key = result.scalar_one_or_none()
if db_key is None:
raise HTTPException(status_code=401, detail="Invalid API key / 无效的 API Key")
# Update last_used_at
db_key.last_used_at = datetime.now(BEIJING_TZ)
await db.flush()
return {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name}
def require_permission(min_level: str):
"""Factory for permission-checking dependencies."""
async def _check(key_info: dict | None = Depends(verify_api_key)):
if key_info is None:
return # Auth disabled
current = _PERMISSION_LEVELS.get(key_info["permissions"], 0)
required = _PERMISSION_LEVELS.get(min_level, 0)
if current < required:
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Requires '{min_level}' / 权限不足,需要 '{min_level}' 权限",
)
return key_info
return _check
require_write = require_permission("write")
require_admin = require_permission("admin")