""" Attendance Router - 考勤管理接口 API endpoints for attendance record queries and statistics. """ import math from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import Response from sqlalchemy import func, select, case from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models import AttendanceRecord from app.services.export_utils import build_csv_content, csv_filename from app.schemas import ( APIResponse, AttendanceRecordResponse, PaginatedList, ) from app.services import device_service router = APIRouter(prefix="/api/attendance", tags=["Attendance / 考勤管理"]) @router.get( "", response_model=APIResponse[PaginatedList[AttendanceRecordResponse]], summary="获取考勤记录列表 / List attendance records", ) async def list_attendance( device_id: int | None = Query(default=None, description="设备ID / Device ID"), attendance_type: str | None = Query(default=None, description="考勤类型 / Attendance type"), attendance_source: str | None = Query(default=None, description="考勤来源 / Source (device/bluetooth/fence)"), start_time: datetime | None = Query(default=None, description="开始时间 / Start time (ISO 8601)"), end_time: datetime | None = Query(default=None, description="结束时间 / End time (ISO 8601)"), page: int = Query(default=1, ge=1, description="页码 / Page number"), page_size: int = Query(default=20, ge=1, le=100, description="每页数量 / Items per page"), db: AsyncSession = Depends(get_db), ): """ 获取考勤记录列表,支持按设备、考勤类型、来源、时间范围过滤。 List attendance records with filters for device, type, source, and time range. """ query = select(AttendanceRecord) count_query = select(func.count(AttendanceRecord.id)) if device_id is not None: query = query.where(AttendanceRecord.device_id == device_id) count_query = count_query.where(AttendanceRecord.device_id == device_id) if attendance_type: query = query.where(AttendanceRecord.attendance_type == attendance_type) count_query = count_query.where(AttendanceRecord.attendance_type == attendance_type) if attendance_source: query = query.where(AttendanceRecord.attendance_source == attendance_source) count_query = count_query.where(AttendanceRecord.attendance_source == attendance_source) if start_time: query = query.where(AttendanceRecord.recorded_at >= start_time) count_query = count_query.where(AttendanceRecord.recorded_at >= start_time) if end_time: query = query.where(AttendanceRecord.recorded_at <= end_time) count_query = count_query.where(AttendanceRecord.recorded_at <= end_time) total_result = await db.execute(count_query) total = total_result.scalar() or 0 offset = (page - 1) * page_size query = query.order_by(AttendanceRecord.recorded_at.desc()).offset(offset).limit(page_size) result = await db.execute(query) records = list(result.scalars().all()) return APIResponse( data=PaginatedList( items=[AttendanceRecordResponse.model_validate(r) for r in records], total=total, page=page, page_size=page_size, total_pages=math.ceil(total / page_size) if total else 0, ) ) @router.get( "/export", summary="导出考勤记录 CSV / Export attendance records CSV", ) async def export_attendance( device_id: int | None = Query(default=None, description="设备ID"), attendance_type: str | None = Query(default=None, description="考勤类型 (clock_in/clock_out)"), attendance_source: str | None = Query(default=None, description="来源 (device/bluetooth/fence)"), start_time: datetime | None = Query(default=None, description="开始时间 ISO 8601"), end_time: datetime | None = Query(default=None, description="结束时间 ISO 8601"), db: AsyncSession = Depends(get_db), ): """导出考勤记录为 CSV,支持设备/类型/来源/时间筛选。最多导出 50000 条。""" query = select(AttendanceRecord) if device_id is not None: query = query.where(AttendanceRecord.device_id == device_id) if attendance_type: query = query.where(AttendanceRecord.attendance_type == attendance_type) if attendance_source: query = query.where(AttendanceRecord.attendance_source == attendance_source) if start_time: query = query.where(AttendanceRecord.recorded_at >= start_time) if end_time: query = query.where(AttendanceRecord.recorded_at <= end_time) query = query.order_by(AttendanceRecord.recorded_at.desc()).limit(50000) result = await db.execute(query) records = list(result.scalars().all()) headers = ["ID", "设备ID", "IMEI", "考勤类型", "来源", "纬度", "经度", "电量", "信号", "地址", "记录时间"] fields = ["id", "device_id", "imei", "attendance_type", "attendance_source", "latitude", "longitude", "battery_level", "gsm_signal", "address", "recorded_at"] content = build_csv_content(headers, records, fields) return Response( content=content, media_type="text/csv; charset=utf-8", headers={"Content-Disposition": f"attachment; filename={csv_filename('attendance')}"}, ) @router.get( "/stats", response_model=APIResponse[dict], summary="获取考勤统计(增强版)/ Get enhanced attendance statistics", ) async def attendance_stats( device_id: int | None = Query(default=None, description="设备ID / Device ID (optional)"), start_time: datetime | None = Query(default=None, description="开始时间 / Start time"), end_time: datetime | None = Query(default=None, description="结束时间 / End time"), days: int = Query(default=7, ge=1, le=90, description="趋势天数 / Trend days"), db: AsyncSession = Depends(get_db), ): """ 增强版考勤统计:总数、按类型/来源/设备分组、按天趋势、今日统计。 Enhanced: total, by type/source/device, daily trend, today count. """ from app.config import now_cst now = now_cst() base_filter = [] if device_id is not None: base_filter.append(AttendanceRecord.device_id == device_id) if start_time: base_filter.append(AttendanceRecord.recorded_at >= start_time) if end_time: base_filter.append(AttendanceRecord.recorded_at <= end_time) def _where(q): return q.where(*base_filter) if base_filter else q # Total total = (await db.execute(_where(select(func.count(AttendanceRecord.id))))).scalar() or 0 # By type type_result = await db.execute(_where( select(AttendanceRecord.attendance_type, func.count(AttendanceRecord.id)) .group_by(AttendanceRecord.attendance_type) )) by_type = {row[0]: row[1] for row in type_result.all()} # By source source_result = await db.execute(_where( select(AttendanceRecord.attendance_source, func.count(AttendanceRecord.id)) .group_by(AttendanceRecord.attendance_source) )) by_source = {(row[0] or "unknown"): row[1] for row in source_result.all()} # By device (top 20) device_result = await db.execute(_where( select(AttendanceRecord.device_id, AttendanceRecord.imei, func.count(AttendanceRecord.id)) .group_by(AttendanceRecord.device_id, AttendanceRecord.imei) .order_by(func.count(AttendanceRecord.id).desc()) .limit(20) )) by_device = [{"device_id": row[0], "imei": row[1], "count": row[2]} for row in device_result.all()] # Daily trend (last N days) cutoff = now - timedelta(days=days) trend_result = await db.execute( select( func.date(AttendanceRecord.recorded_at).label("day"), func.count(AttendanceRecord.id), ) .where(AttendanceRecord.recorded_at >= cutoff) .group_by("day").order_by("day") ) daily_trend = {str(row[0]): row[1] for row in trend_result.all()} # Today today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) today_count = (await db.execute( select(func.count(AttendanceRecord.id)).where(AttendanceRecord.recorded_at >= today_start) )).scalar() or 0 return APIResponse(data={ "total": total, "today": today_count, "by_type": by_type, "by_source": by_source, "by_device": by_device, "daily_trend": daily_trend, }) @router.get( "/report", response_model=APIResponse[dict], summary="考勤报表 / Attendance report", ) async def attendance_report( device_id: int | None = Query(default=None, description="设备ID (可选,不传则所有设备)"), start_date: str = Query(..., description="开始日期 YYYY-MM-DD"), end_date: str = Query(..., description="结束日期 YYYY-MM-DD"), db: AsyncSession = Depends(get_db), ): """ 考勤报表:按设备+日期聚合,返回每个设备每天的签到次数、首次签到时间、末次签到时间。 Attendance report: per device per day aggregation. """ from datetime import datetime as dt try: s_date = dt.strptime(start_date, "%Y-%m-%d") e_date = dt.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) except ValueError: raise HTTPException(status_code=400, detail="日期格式需为 YYYY-MM-DD") if s_date > e_date: raise HTTPException(status_code=400, detail="start_date must be <= end_date") filters = [ AttendanceRecord.recorded_at >= s_date, AttendanceRecord.recorded_at <= e_date, ] if device_id is not None: filters.append(AttendanceRecord.device_id == device_id) result = await db.execute( select( AttendanceRecord.device_id, AttendanceRecord.imei, func.date(AttendanceRecord.recorded_at).label("day"), func.count(AttendanceRecord.id).label("punch_count"), func.min(AttendanceRecord.recorded_at).label("first_punch"), func.max(AttendanceRecord.recorded_at).label("last_punch"), func.group_concat(AttendanceRecord.attendance_source).label("sources"), ) .where(*filters) .group_by(AttendanceRecord.device_id, AttendanceRecord.imei, "day") .order_by(AttendanceRecord.device_id, "day") ) rows = result.all() report = [] for row in rows: report.append({ "device_id": row[0], "imei": row[1], "date": str(row[2]), "punch_count": row[3], "first_punch": str(row[4]) if row[4] else None, "last_punch": str(row[5]) if row[5] else None, "sources": list(set(row[6].split(","))) if row[6] else [], }) # Summary: total days in range, devices with records, attendance rate total_days = (e_date - s_date).days + 1 unique_devices = len({r["device_id"] for r in report}) device_days = len(report) from app.models import Device if device_id is not None: total_device_count = 1 else: total_device_count = (await db.execute(select(func.count(Device.id)))).scalar() or 1 expected_device_days = total_days * total_device_count attendance_rate = round(device_days / expected_device_days * 100, 1) if expected_device_days else 0 return APIResponse(data={ "start_date": start_date, "end_date": end_date, "total_days": total_days, "total_devices": unique_devices, "attendance_rate": attendance_rate, "records": report, }) @router.get( "/report/export", summary="导出考勤报表 CSV / Export attendance report CSV", ) async def export_attendance_report( device_id: int | None = Query(default=None, description="设备ID (可选)"), start_date: str = Query(..., description="开始日期 YYYY-MM-DD"), end_date: str = Query(..., description="结束日期 YYYY-MM-DD"), db: AsyncSession = Depends(get_db), ): """导出考勤日报表 CSV(每设备每天汇总)。""" from datetime import datetime as dt try: s_date = dt.strptime(start_date, "%Y-%m-%d") e_date = dt.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) except ValueError: raise HTTPException(status_code=400, detail="日期格式需为 YYYY-MM-DD") if s_date > e_date: raise HTTPException(status_code=400, detail="start_date must be <= end_date") filters = [ AttendanceRecord.recorded_at >= s_date, AttendanceRecord.recorded_at <= e_date, ] if device_id is not None: filters.append(AttendanceRecord.device_id == device_id) result = await db.execute( select( AttendanceRecord.device_id, AttendanceRecord.imei, func.date(AttendanceRecord.recorded_at).label("day"), func.count(AttendanceRecord.id).label("punch_count"), func.min(AttendanceRecord.recorded_at).label("first_punch"), func.max(AttendanceRecord.recorded_at).label("last_punch"), func.group_concat(AttendanceRecord.attendance_source).label("sources"), ) .where(*filters) .group_by(AttendanceRecord.device_id, AttendanceRecord.imei, "day") .order_by(AttendanceRecord.device_id, "day") ) rows = result.all() headers = ["设备ID", "IMEI", "日期", "打卡次数", "首次打卡", "末次打卡", "来源"] extractors = [ lambda r: r[0], lambda r: r[1], lambda r: str(r[2]), lambda r: r[3], lambda r: r[4], lambda r: r[5], lambda r: ",".join(set(r[6].split(","))) if r[6] else "", ] content = build_csv_content(headers, rows, extractors) return Response( content=content, media_type="text/csv; charset=utf-8", headers={"Content-Disposition": f"attachment; filename={csv_filename('attendance_report')}"}, ) @router.get( "/device/{device_id}", response_model=APIResponse[PaginatedList[AttendanceRecordResponse]], summary="获取设备考勤记录 / Get device attendance records", ) async def device_attendance( device_id: int, start_time: datetime | None = Query(default=None, description="开始时间 / Start time"), end_time: datetime | None = Query(default=None, description="结束时间 / End time"), page: int = Query(default=1, ge=1, description="页码 / Page number"), page_size: int = Query(default=20, ge=1, le=100, description="每页数量 / Items per page"), db: AsyncSession = Depends(get_db), ): """ 获取指定设备的考勤记录。 Get attendance records for a specific device. """ # Verify device exists device = await device_service.get_device(db, device_id) if device is None: raise HTTPException(status_code=404, detail=f"Device {device_id} not found / 未找到设备{device_id}") query = select(AttendanceRecord).where(AttendanceRecord.device_id == device_id) count_query = select(func.count(AttendanceRecord.id)).where(AttendanceRecord.device_id == device_id) if start_time: query = query.where(AttendanceRecord.recorded_at >= start_time) count_query = count_query.where(AttendanceRecord.recorded_at >= start_time) if end_time: query = query.where(AttendanceRecord.recorded_at <= end_time) count_query = count_query.where(AttendanceRecord.recorded_at <= end_time) total_result = await db.execute(count_query) total = total_result.scalar() or 0 offset = (page - 1) * page_size query = query.order_by(AttendanceRecord.recorded_at.desc()).offset(offset).limit(page_size) result = await db.execute(query) records = list(result.scalars().all()) return APIResponse( data=PaginatedList( items=[AttendanceRecordResponse.model_validate(r) for r in records], total=total, page=page, page_size=page_size, total_pages=math.ceil(total / page_size) if total else 0, ) ) @router.post( "/batch-delete", response_model=APIResponse[dict], summary="批量删除考勤记录 / Batch delete attendance records", ) async def batch_delete_attendance( body: dict, db: AsyncSession = Depends(get_db), ): """ 批量删除考勤记录,通过 body 传递 attendance_ids 列表,最多 500 条。 Batch delete attendance records by IDs (max 500). """ attendance_ids = body.get("attendance_ids", []) if not attendance_ids: raise HTTPException(status_code=400, detail="attendance_ids is required") if len(attendance_ids) > 500: raise HTTPException(status_code=400, detail="Max 500 records per request") result = await db.execute( select(AttendanceRecord).where(AttendanceRecord.id.in_(attendance_ids)) ) records = list(result.scalars().all()) for r in records: await db.delete(r) await db.flush() return APIResponse(data={"deleted": len(records)}) # NOTE: /{attendance_id} must be after /stats and /device/{device_id} to avoid route conflicts @router.get( "/{attendance_id}", response_model=APIResponse[AttendanceRecordResponse], summary="获取考勤记录详情 / Get attendance record", ) async def get_attendance(attendance_id: int, db: AsyncSession = Depends(get_db)): """按ID获取考勤记录详情 / Get attendance record details by ID.""" result = await db.execute( select(AttendanceRecord).where(AttendanceRecord.id == attendance_id) ) record = result.scalar_one_or_none() if record is None: raise HTTPException(status_code=404, detail=f"Attendance {attendance_id} not found") return APIResponse(data=AttendanceRecordResponse.model_validate(record)) @router.delete( "/{attendance_id}", response_model=APIResponse[dict], summary="删除单条考勤记录 / Delete attendance record", ) async def delete_attendance(attendance_id: int, db: AsyncSession = Depends(get_db)): """按ID删除单条考勤记录 / Delete a single attendance record by ID.""" result = await db.execute( select(AttendanceRecord).where(AttendanceRecord.id == attendance_id) ) record = result.scalar_one_or_none() if record is None: raise HTTPException(status_code=404, detail=f"Attendance {attendance_id} not found") await db.delete(record) await db.flush() return APIResponse(data={"deleted": 1})