"""REST API for scans.""" from fastapi import APIRouter, Depends, Query from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from guarddog_nexus.database import get_session from guarddog_nexus.models import Finding, Scan router = APIRouter(prefix="/api/v1/scans", tags=["scans"]) VALID_SORT_FIELDS = { "id": Scan.id, "package_name": Scan.package_name, "started_at": Scan.started_at, "status": Scan.status, "total_findings": Scan.total_findings, "flagged": Scan.flagged, } @router.get("") async def list_scans( limit: int = Query(50, le=200), offset: int = Query(0, ge=0), flagged: bool | None = Query(None), search: str | None = Query(None), status: str | None = Query(None), repository: str | None = Query(None), sort_by: str = Query("started_at"), sort_dir: str = Query("desc"), session: AsyncSession = Depends(get_session), ): q = select(Scan) if flagged is not None: q = q.where(Scan.flagged == flagged) if status: q = q.where(Scan.status == status) if repository: q = q.where(Scan.repository == repository) if search: pattern = f"%{search}%" q = q.where( Scan.package_name.ilike(pattern) | Scan.package_version.ilike(pattern) ) sort_field = VALID_SORT_FIELDS.get(sort_by, Scan.started_at) sort_dir = "asc" if sort_dir.lower() == "asc" else "desc" q = q.order_by(sort_field.desc() if sort_dir == "desc" else sort_field.asc()) q = q.offset(offset).limit(limit) total = await session.scalar(select(func.count(Scan.id))) scans = (await session.execute(q)).scalars().all() return { "total": total, "limit": limit, "offset": offset, "scans": [ { "id": s.id, "package_name": s.package_name, "package_version": s.package_version, "ecosystem": s.ecosystem, "repository": s.repository, "status": s.status, "total_findings": s.total_findings, "flagged": s.flagged, "started_at": s.started_at.isoformat() if s.started_at else None, "finished_at": s.finished_at.isoformat() if s.finished_at else None, "error_message": s.error_message, } for s in scans ], } @router.get("/stats") async def scan_stats(session: AsyncSession = Depends(get_session)): total_scans = await session.scalar(select(func.count(Scan.id))) flagged_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.flagged == True)) recent_flagged = await session.scalar( select(func.count(Scan.id)).where( Scan.flagged == True, Scan.started_at >= func.datetime("now", "-7 days"), ) ) total_findings = await session.scalar(select(func.count(Finding.id))) top_rules = ( await session.execute( select( func.json_extract(Finding.data, "$.rule").label("rule"), func.count(Finding.id).label("cnt"), ) .group_by(text("rule")) .order_by(text("cnt DESC")) .limit(10) ) ).all() latest_scan = await session.scalar(select(Scan).order_by(Scan.started_at.desc()).limit(1)) return { "total_scans": total_scans, "flagged_scans": flagged_scans, "recent_flagged": recent_flagged, "total_findings": total_findings, "top_rules": [{"rule": r.rule, "count": r.cnt} for r in top_rules], "latest_scan_at": latest_scan.started_at.isoformat() if latest_scan else None, } @router.get("/{scan_id}") async def get_scan(scan_id: int, session: AsyncSession = Depends(get_session)): scan = await session.scalar( select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings)) ) if not scan: return {"detail": "Not found"} return { "id": scan.id, "package_name": scan.package_name, "package_version": scan.package_version, "ecosystem": scan.ecosystem, "repository": scan.repository, "nexus_asset_url": scan.nexus_asset_url, "sha256": scan.sha256, "status": scan.status, "total_findings": scan.total_findings, "flagged": scan.flagged, "started_at": scan.started_at.isoformat() if scan.started_at else None, "finished_at": scan.finished_at.isoformat() if scan.finished_at else None, "error_message": scan.error_message, "findings": [{"id": f.id, **f.data} for f in scan.findings], }