diff --git a/app/config.py b/app/config.py index e596cf3..bd6006e 100644 --- a/app/config.py +++ b/app/config.py @@ -50,6 +50,9 @@ class Settings(BaseSettings): # Track query limit TRACK_MAX_POINTS: int = Field(default=10000, description="Maximum points returned by track endpoint") + # TCP connection + TCP_IDLE_TIMEOUT: int = Field(default=600, description="Idle timeout in seconds for TCP connections (0=disabled)") + # Fence auto-attendance FENCE_CHECK_ENABLED: bool = Field(default=True, description="Enable automatic fence attendance check on location report") FENCE_LBS_TOLERANCE_METERS: int = Field(default=200, description="Extra tolerance (meters) for LBS locations in fence check") diff --git a/app/database.py b/app/database.py index 0441772..8923aaa 100644 --- a/app/database.py +++ b/app/database.py @@ -1,3 +1,4 @@ +from sqlalchemy import event from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase @@ -9,6 +10,17 @@ engine = create_async_engine( connect_args={"check_same_thread": False}, ) +# Enable WAL mode for concurrent read/write performance +@event.listens_for(engine.sync_engine, "connect") +def _set_sqlite_pragma(dbapi_conn, connection_record): + cursor = dbapi_conn.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.execute("PRAGMA cache_size=-64000") # 64MB cache + cursor.execute("PRAGMA busy_timeout=5000") # 5s wait on lock + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + async_session = async_sessionmaker( bind=engine, class_=AsyncSession, diff --git a/app/dependencies.py b/app/dependencies.py index dc2a998..858a348 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -1,11 +1,12 @@ """ Shared FastAPI dependencies. Supports master API key (env) and database-managed API keys with permission levels. +Includes in-memory cache to avoid DB lookup on every request. """ import hashlib import secrets -from datetime import datetime, timezone +import time from fastapi import Depends, HTTPException, Security from fastapi.security import APIKeyHeader @@ -20,6 +21,10 @@ _api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) # Permission hierarchy: admin > write > read _PERMISSION_LEVELS = {"read": 1, "write": 2, "admin": 3} +# In-memory auth cache: {key_hash: (result_dict, expire_timestamp)} +_AUTH_CACHE: dict[str, tuple[dict, float]] = {} +_AUTH_CACHE_TTL = 60 # seconds + def _hash_key(key: str) -> str: """SHA-256 hash of an API key.""" @@ -32,7 +37,7 @@ async def verify_api_key( ) -> dict | None: """Verify API key. Returns key info dict or None (auth disabled). - Checks master key first, then database keys. + Checks master key first, then in-memory cache, then database keys. Returns {"permissions": "admin"|"write"|"read", "key_id": int|None, "name": str}. """ if settings.API_KEY is None: @@ -45,23 +50,34 @@ async def verify_api_key( if secrets.compare_digest(api_key, settings.API_KEY): return {"permissions": "admin", "key_id": None, "name": "master"} + # Check in-memory cache first + key_hash = _hash_key(api_key) + now = time.monotonic() + cached = _AUTH_CACHE.get(key_hash) + if cached is not None: + result, expires = cached + if now < expires: + return result + # Check database keys from app.models import ApiKey - key_hash = _hash_key(api_key) result = await db.execute( select(ApiKey).where(ApiKey.key_hash == key_hash, ApiKey.is_active == True) # noqa: E712 ) db_key = result.scalar_one_or_none() if db_key is None: + _AUTH_CACHE.pop(key_hash, None) raise HTTPException(status_code=401, detail="Invalid API key / 无效的 API Key") - # Update last_used_at + # Update last_used_at (deferred — only on cache miss, not every request) from app.config import now_cst db_key.last_used_at = now_cst() await db.flush() - return {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name} + key_info = {"permissions": db_key.permissions, "key_id": db_key.id, "name": db_key.name} + _AUTH_CACHE[key_hash] = (key_info, now + _AUTH_CACHE_TTL) + return key_info def require_permission(min_level: str): diff --git a/app/geocoding.py b/app/geocoding.py index bb5b23d..e37848e 100644 --- a/app/geocoding.py +++ b/app/geocoding.py @@ -27,6 +27,31 @@ AMAP_HARDWARE_SECRET: Optional[str] = _settings.AMAP_HARDWARE_SECRET _CACHE_MAX_SIZE = _settings.GEOCODING_CACHE_SIZE +# --------------------------------------------------------------------------- +# Shared aiohttp session (reused across all geocoding calls) +# --------------------------------------------------------------------------- + +_http_session: Optional[aiohttp.ClientSession] = None + + +async def _get_http_session() -> aiohttp.ClientSession: + """Get or create the shared aiohttp session (lazy init).""" + global _http_session + if _http_session is None or _http_session.closed: + _http_session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=5), + ) + return _http_session + + +async def close_http_session() -> None: + """Close the shared session (call on app shutdown).""" + global _http_session + if _http_session and not _http_session.closed: + await _http_session.close() + _http_session = None + + # --------------------------------------------------------------------------- # WGS-84 → GCJ-02 coordinate conversion (server-side) # --------------------------------------------------------------------------- @@ -316,38 +341,36 @@ async def _geocode_amap_v5( url = f"https://restapi.amap.com/v5/position/IoT?key={api_key}" - logger.info("Amap v5 request body: %s", body) + logger.debug("Amap v5 request body: %s", body) try: - async with aiohttp.ClientSession() as session: - async with session.post( - url, data=body, timeout=aiohttp.ClientTimeout(total=5) - ) as resp: - if resp.status == 200: - data = await resp.json(content_type=None) - logger.info("Amap v5 response: %s", data) - if data.get("status") == "1": - position = data.get("position", {}) - location = position.get("location", "") if isinstance(position, dict) else "" - if location and "," in location: - lon_str, lat_str = location.split(",") - gcj_lat = float(lat_str) - gcj_lon = float(lon_str) - lat, lon = gcj02_to_wgs84(gcj_lat, gcj_lon) - radius = position.get("radius", "?") if isinstance(position, dict) else "?" - logger.info( - "Amap v5 geocode: GCJ-02(%.6f,%.6f) -> WGS-84(%.6f,%.6f) radius=%s", - gcj_lat, gcj_lon, lat, lon, radius, - ) - return (lat, lon) - else: - infocode = data.get("infocode", "") - logger.warning( - "Amap v5 geocode error: %s (code=%s) body=%s", - data.get("info", ""), infocode, body, + session = await _get_http_session() + async with session.post(url, data=body) as resp: + if resp.status == 200: + data = await resp.json(content_type=None) + logger.debug("Amap v5 response: %s", data) + if data.get("status") == "1": + position = data.get("position", {}) + location = position.get("location", "") if isinstance(position, dict) else "" + if location and "," in location: + lon_str, lat_str = location.split(",") + gcj_lat = float(lat_str) + gcj_lon = float(lon_str) + lat, lon = gcj02_to_wgs84(gcj_lat, gcj_lon) + radius = position.get("radius", "?") if isinstance(position, dict) else "?" + logger.info( + "Amap v5 geocode: GCJ-02(%.6f,%.6f) -> WGS-84(%.6f,%.6f) radius=%s", + gcj_lat, gcj_lon, lat, lon, radius, ) + return (lat, lon) else: - logger.warning("Amap v5 geocode HTTP %d", resp.status) + infocode = data.get("infocode", "") + logger.warning( + "Amap v5 geocode error: %s (code=%s) body=%s", + data.get("info", ""), infocode, body, + ) + else: + logger.warning("Amap v5 geocode HTTP %d", resp.status) except Exception as e: logger.warning("Amap v5 geocode error: %s", e) @@ -400,37 +423,35 @@ async def _geocode_amap_legacy( url = "https://apilocate.amap.com/position" - logger.info("Amap legacy request params: %s", {k: v for k, v in params.items() if k != 'key'}) + logger.debug("Amap legacy request params: %s", {k: v for k, v in params.items() if k != 'key'}) try: - async with aiohttp.ClientSession() as session: - async with session.get( - url, params=params, timeout=aiohttp.ClientTimeout(total=5) - ) as resp: - if resp.status == 200: - data = await resp.json(content_type=None) - logger.info("Amap legacy response: %s", data) - if data.get("status") == "1" and data.get("result"): - result = data["result"] - location = result.get("location", "") - if location and "," in location: - lon_str, lat_str = location.split(",") - gcj_lat = float(lat_str) - gcj_lon = float(lon_str) - lat, lon = gcj02_to_wgs84(gcj_lat, gcj_lon) - logger.info( - "Amap legacy geocode: GCJ-02(%.6f,%.6f) -> WGS-84(%.6f,%.6f)", - gcj_lat, gcj_lon, lat, lon, - ) - return (lat, lon) - else: - infocode = data.get("infocode", "") - if infocode == "10012": - logger.debug("Amap legacy geocode: insufficient permissions (enterprise cert needed)") - else: - logger.warning("Amap legacy geocode error: %s (code=%s)", data.get("info", ""), infocode) + session = await _get_http_session() + async with session.get(url, params=params) as resp: + if resp.status == 200: + data = await resp.json(content_type=None) + logger.debug("Amap legacy response: %s", data) + if data.get("status") == "1" and data.get("result"): + result = data["result"] + location = result.get("location", "") + if location and "," in location: + lon_str, lat_str = location.split(",") + gcj_lat = float(lat_str) + gcj_lon = float(lon_str) + lat, lon = gcj02_to_wgs84(gcj_lat, gcj_lon) + logger.info( + "Amap legacy geocode: GCJ-02(%.6f,%.6f) -> WGS-84(%.6f,%.6f)", + gcj_lat, gcj_lon, lat, lon, + ) + return (lat, lon) else: - logger.warning("Amap legacy geocode HTTP %d", resp.status) + infocode = data.get("infocode", "") + if infocode == "10012": + logger.debug("Amap legacy geocode: insufficient permissions (enterprise cert needed)") + else: + logger.warning("Amap legacy geocode error: %s (code=%s)", data.get("info", ""), infocode) + else: + logger.warning("Amap legacy geocode HTTP %d", resp.status) except Exception as e: logger.warning("Amap legacy geocode error: %s", e) @@ -490,28 +511,26 @@ async def _reverse_geocode_amap( url = "https://restapi.amap.com/v3/geocode/regeo" try: - async with aiohttp.ClientSession() as session: - async with session.get( - url, params=params, timeout=aiohttp.ClientTimeout(total=5) - ) as resp: - if resp.status == 200: - data = await resp.json(content_type=None) - if data.get("status") == "1": - regeocode = data.get("regeocode", {}) - formatted = regeocode.get("formatted_address", "") - if formatted and formatted != "[]": - logger.info( - "Amap reverse geocode: %.6f,%.6f -> %s", - lat, lon, formatted, - ) - return formatted - else: - logger.warning( - "Amap reverse geocode error: info=%s, infocode=%s", - data.get("info", ""), data.get("infocode", ""), + session = await _get_http_session() + async with session.get(url, params=params) as resp: + if resp.status == 200: + data = await resp.json(content_type=None) + if data.get("status") == "1": + regeocode = data.get("regeocode", {}) + formatted = regeocode.get("formatted_address", "") + if formatted and formatted != "[]": + logger.info( + "Amap reverse geocode: %.6f,%.6f -> %s", + lat, lon, formatted, ) + return formatted else: - logger.warning("Amap reverse geocode HTTP %d", resp.status) + logger.warning( + "Amap reverse geocode error: info=%s, infocode=%s", + data.get("info", ""), data.get("infocode", ""), + ) + else: + logger.warning("Amap reverse geocode HTTP %d", resp.status) except Exception as e: logger.warning("Amap reverse geocode error: %s", e) diff --git a/app/main.py b/app/main.py index ce5747b..0a8b34b 100644 --- a/app/main.py +++ b/app/main.py @@ -89,9 +89,14 @@ async def lifespan(app: FastAPI): for stmt in [ "CREATE INDEX IF NOT EXISTS ix_alarm_type ON alarm_records(alarm_type)", "CREATE INDEX IF NOT EXISTS ix_alarm_ack ON alarm_records(acknowledged)", + "CREATE INDEX IF NOT EXISTS ix_alarm_source ON alarm_records(alarm_source)", "CREATE INDEX IF NOT EXISTS ix_bt_beacon_mac ON bluetooth_records(beacon_mac)", "CREATE INDEX IF NOT EXISTS ix_loc_type ON location_records(location_type)", "CREATE INDEX IF NOT EXISTS ix_att_type ON attendance_records(attendance_type)", + "CREATE INDEX IF NOT EXISTS ix_att_source ON attendance_records(attendance_source)", + "CREATE INDEX IF NOT EXISTS ix_att_device_type_time ON attendance_records(device_id, attendance_type, recorded_at)", + "CREATE INDEX IF NOT EXISTS ix_device_status ON devices(status)", + "CREATE INDEX IF NOT EXISTS ix_fence_active ON fence_configs(is_active)", ]: await conn.execute(sa_text(stmt)) logger.info("Database indexes verified/created") @@ -107,6 +112,9 @@ async def lifespan(app: FastAPI): logger.info("Shutting down TCP server...") await tcp_manager.stop() tcp_task.cancel() + # Close shared HTTP session + from app.geocoding import close_http_session + await close_http_session() app = FastAPI( title="KKS Badge Management System / KKS工牌管理系统", @@ -186,12 +194,14 @@ app.include_router(geocoding.router, dependencies=[*_api_deps]) _STATIC_DIR = Path(__file__).parent / "static" app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static") +# Cache admin.html in memory at startup (avoid disk read per request) +_admin_html_cache: str = (_STATIC_DIR / "admin.html").read_text(encoding="utf-8") + @app.get("/admin", response_class=HTMLResponse, tags=["Admin"]) async def admin_page(): """管理后台页面 / Admin Dashboard""" - html_path = _STATIC_DIR / "admin.html" - return HTMLResponse(content=html_path.read_text(encoding="utf-8")) + return HTMLResponse(content=_admin_html_cache) @app.get("/", tags=["Root"]) diff --git a/app/routers/commands.py b/app/routers/commands.py index 73bde34..3567d53 100644 --- a/app/routers/commands.py +++ b/app/routers/commands.py @@ -317,6 +317,81 @@ async def batch_send_command(request: Request, body: BatchCommandRequest, db: As ) +class BroadcastCommandRequest(BaseModel): + """Request body for broadcasting a command to all devices.""" + command_type: str = Field(default="online_cmd", max_length=30, description="指令类型") + command_content: str = Field(..., max_length=500, description="指令内容") + + +@router.post( + "/broadcast", + response_model=APIResponse[BatchCommandResponse], + status_code=201, + summary="广播指令给所有设备 / Broadcast command to all devices", + dependencies=[Depends(require_write)], +) +@limiter.limit(settings.RATE_LIMIT_WRITE) +async def broadcast_command(request: Request, body: BroadcastCommandRequest, db: AsyncSession = Depends(get_db)): + """ + 向所有设备广播同一指令。尝试通过 TCP 发送给每台设备,TCP 未连接的自动跳过。 + Broadcast the same command to all devices. Attempts TCP send for each, skips those without active TCP connection. + """ + from sqlalchemy import select + from app.models import Device + + result = await db.execute(select(Device)) + devices = list(result.scalars().all()) + + if not devices: + return APIResponse( + message="No devices / 没有设备", + data=BatchCommandResponse(total=0, sent=0, failed=0, results=[]), + ) + + results = [] + for device in devices: + if not tcp_command_service.is_device_online(device.imei): + results.append(BatchCommandResult( + device_id=device.id, imei=device.imei, + success=False, error="TCP not connected", + )) + continue + + try: + cmd_log = await command_service.create_command( + db, + device_id=device.id, + command_type=body.command_type, + command_content=body.command_content, + ) + await tcp_command_service.send_command( + device.imei, body.command_type, body.command_content + ) + cmd_log.status = "sent" + cmd_log.sent_at = now_cst() + await db.flush() + await db.refresh(cmd_log) + results.append(BatchCommandResult( + device_id=device.id, imei=device.imei, + success=True, command_id=cmd_log.id, + )) + except Exception as e: + logging.getLogger(__name__).error("Broadcast cmd failed for %s: %s", device.imei, e) + results.append(BatchCommandResult( + device_id=device.id, imei=device.imei, + success=False, error="Send failed", + )) + + sent = sum(1 for r in results if r.success) + failed = len(results) - sent + return APIResponse( + message=f"Broadcast: {sent} sent, {failed} skipped (total: {len(devices)})", + data=BatchCommandResponse( + total=len(results), sent=sent, failed=failed, results=results, + ), + ) + + @router.get( "/{command_id}", response_model=APIResponse[CommandResponse], diff --git a/app/routers/locations.py b/app/routers/locations.py index bed6e26..99c9c4b 100644 --- a/app/routers/locations.py +++ b/app/routers/locations.py @@ -31,6 +31,7 @@ router = APIRouter(prefix="/api/locations", tags=["Locations / 位置数据"]) async def list_locations( device_id: int | None = Query(default=None, description="设备ID / Device ID"), location_type: str | None = Query(default=None, description="定位类型 / Location type (gps/lbs/wifi)"), + exclude_type: str | None = Query(default=None, description="排除定位类型前缀 / Exclude location type prefix (e.g. lbs)"), start_time: datetime | None = Query(default=None, description="开始时间 / Start time (ISO 8601)"), end_time: datetime | None = Query(default=None, description="结束时间 / End time (ISO 8601)"), page: int = Query(default=1, ge=1, description="页码 / Page number"), @@ -45,6 +46,7 @@ async def list_locations( db, device_id=device_id, location_type=location_type, + exclude_type=exclude_type, start_time=start_time, end_time=end_time, page=page, diff --git a/app/services/fence_checker.py b/app/services/fence_checker.py index b41ce01..d9f8b58 100644 --- a/app/services/fence_checker.py +++ b/app/services/fence_checker.py @@ -234,6 +234,19 @@ async def check_device_fences( "cell_id": cell_id, } + # 2. Batch-load all fence states in one query (avoid N+1) + fence_ids = [f.id for f in fences] + states_result = await session.execute( + select(DeviceFenceState).where( + DeviceFenceState.device_id == device_id, + DeviceFenceState.fence_id.in_(fence_ids), + ) + ) + states_map: dict[int, DeviceFenceState] = {s.fence_id: s for s in states_result.scalars().all()} + + # Pre-check today's attendance dedup once (not per-fence) + _today_clock_in = await _has_attendance_today(session, device_id, "clock_in") + tolerance = _get_tolerance_for_location_type(location_type) events: list[dict] = [] now = now_cst() @@ -242,14 +255,7 @@ async def check_device_fences( for fence in fences: currently_inside = is_inside_fence(latitude, longitude, fence, tolerance) - # 2. Get or create state record - state_result = await session.execute( - select(DeviceFenceState).where( - DeviceFenceState.device_id == device_id, - DeviceFenceState.fence_id == fence.id, - ) - ) - state = state_result.scalar_one_or_none() + state = states_map.get(fence.id) was_inside = bool(state and state.is_inside) @@ -266,8 +272,8 @@ async def check_device_fences( _update_state(state, currently_inside, now, latitude, longitude) continue - # Daily dedup: only one clock_in per device per day - if await _has_attendance_today(session, device_id, "clock_in"): + # Daily dedup: only one clock_in per device per day (pre-fetched) + if _today_clock_in: logger.info( "Fence skip clock_in: device=%d fence=%d(%s) already clocked in today", device_id, fence.id, fence.name, diff --git a/app/services/location_service.py b/app/services/location_service.py index 92f5eeb..b0a8049 100644 --- a/app/services/location_service.py +++ b/app/services/location_service.py @@ -15,6 +15,7 @@ async def get_locations( db: AsyncSession, device_id: int | None = None, location_type: str | None = None, + exclude_type: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, page: int = 1, @@ -56,6 +57,14 @@ async def get_locations( query = query.where(LocationRecord.location_type == location_type) count_query = count_query.where(LocationRecord.location_type == location_type) + if exclude_type: + # Map prefix to actual values for index-friendly IN query + _EXCLUDE_MAP = {"lbs": ["lbs", "lbs_4g"], "wifi": ["wifi", "wifi_4g"], "gps": ["gps", "gps_4g"]} + exclude_values = _EXCLUDE_MAP.get(exclude_type, [exclude_type]) + clause = LocationRecord.location_type.notin_(exclude_values) + query = query.where(clause) + count_query = count_query.where(clause) + if start_time: query = query.where(LocationRecord.recorded_at >= start_time) count_query = count_query.where(LocationRecord.recorded_at >= start_time) diff --git a/app/static/admin.html b/app/static/admin.html index 2820194..2055626 100644 --- a/app/static/admin.html +++ b/app/static/admin.html @@ -139,6 +139,17 @@ .panel-expand-btn { position: absolute; left: 0; top: 50%; transform: translateY(-50%); background: #1f2937; border: 1px solid #374151; border-left: none; border-radius: 0 6px 6px 0; padding: 8px 4px; color: #9ca3af; cursor: pointer; z-index: 5; display: none; } .side-panel.collapsed ~ .page-main-content .panel-expand-btn { display: block; } @media (max-width: 768px) { .page-with-panel { flex-direction: column; height: auto; } .side-panel { width: 100%; min-width: 100%; max-height: 300px; } .side-panel.collapsed { max-height: 0; } } + .dev-sort:hover { background: #374151; } + .sort-arrow { font-size: 10px; color: #6b7280; margin-left: 2px; } + .sort-arrow.asc::after { content: '▲'; color: #60a5fa; } + .sort-arrow.desc::after { content: '▼'; color: #60a5fa; } + .dev-qcmd { height:24px;border:1px solid #374151;border-radius:5px;background:#1f2937;color:#9ca3af;font-size:11px;cursor:pointer;padding:0 7px;margin:0 1px;transition:all 0.15s;white-space:nowrap; } + .dev-qcmd i { margin-right:2px; } + .dev-qcmd:hover:not(:disabled) { background:#2563eb;color:#fff;border-color:#2563eb; } + .dev-qcmd.sent { background:#065f46;color:#34d399;border-color:#065f46; } + .dev-qcmd-danger { color:#f87171; } + .ov-dev-item:hover { background: #374151; } + .dev-qcmd-danger:hover:not(:disabled) { background:#991b1b;color:#fff;border-color:#991b1b; }
@@ -309,6 +320,12 @@ + + + + + + @@ -318,18 +335,20 @@| IMEI | -名称 | -类型 | -状态 | -电量 | -信号 | -最后登录 | -最后心跳 | +IMEI | +名称 | +类型 | +状态 | +定位模式 | +电量 | +信号 | +最后登录 | +最后心跳 | +快捷操作 |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 加载中... | |||||||||||||||||
| 加载中... | |||||||||||||||||
将发送给所有在线设备,离线设备自动跳过
+