"""REST API for scans.""" import csv import io from fastapi import APIRouter, Depends, Query, Response from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from guarddog_nexus.database import get_session from guarddog_nexus.models import Finding, Scan router = APIRouter(prefix="/api/v1/scans", tags=["scans"]) VALID_SORT_FIELDS = { "id": Scan.id, "package_name": Scan.package_name, "started_at": Scan.started_at, "status": Scan.status, "total_findings": Scan.total_findings, "flagged": Scan.flagged, } @router.get("") async def list_scans( limit: int = Query(50, le=200), offset: int = Query(0, ge=0), flagged: bool | None = Query(None), search: str | None = Query(None), status: str | None = Query(None), repository: str | None = Query(None), sort_by: str = Query("started_at"), sort_dir: str = Query("desc"), session: AsyncSession = Depends(get_session), ): q = select(Scan) if flagged is not None: q = q.where(Scan.flagged == flagged) if status: q = q.where(Scan.status == status) if repository: q = q.where(Scan.repository == repository) if search: pattern = f"%{search}%" q = q.where( Scan.package_name.ilike(pattern) | Scan.package_version.ilike(pattern) ) sort_field = VALID_SORT_FIELDS.get(sort_by, Scan.started_at) sort_dir = "asc" if sort_dir.lower() == "asc" else "desc" q = q.order_by(sort_field.desc() if sort_dir == "desc" else sort_field.asc()) q = q.offset(offset).limit(limit) total = await session.scalar(select(func.count(Scan.id))) scans = (await session.execute(q)).scalars().all() return { "total": total, "limit": limit, "offset": offset, "scans": [ { "id": s.id, "package_name": s.package_name, "package_version": s.package_version, "ecosystem": s.ecosystem, "repository": s.repository, "status": s.status, "total_findings": s.total_findings, "flagged": s.flagged, "started_at": s.started_at.isoformat() if s.started_at else None, "finished_at": s.finished_at.isoformat() if s.finished_at else None, "error_message": s.error_message, } for s in scans ], } @router.get("/export") async def export_scans_csv( flagged: bool | None = Query(None), search: str | None = Query(None), status: str | None = Query(None), session: AsyncSession = Depends(get_session), ): q = select(Scan) if flagged is not None: q = q.where(Scan.flagged == flagged) if status: q = q.where(Scan.status == status) if search: pattern = f"%{search}%" q = q.where( Scan.package_name.ilike(pattern) | Scan.package_version.ilike(pattern) ) q = q.order_by(Scan.started_at.desc()) scans = (await session.execute(q)).scalars().all() output = io.StringIO() writer = csv.writer(output) writer.writerow([ "id", "package_name", "package_version", "ecosystem", "repository", "status", "total_findings", "flagged", "started_at", "finished_at", "error_message", "sha256" ]) for s in scans: writer.writerow([ s.id, s.package_name, s.package_version, s.ecosystem, s.repository, s.status, s.total_findings, s.flagged, s.started_at.isoformat() if s.started_at else "", s.finished_at.isoformat() if s.finished_at else "", s.error_message or "", s.sha256 or "", ]) return Response( content=output.getvalue(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=scans_export.csv"}, ) @router.get("/stats") async def scan_stats(session: AsyncSession = Depends(get_session)): total_scans = await session.scalar(select(func.count(Scan.id))) flagged_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.flagged == True)) recent_flagged = await session.scalar( select(func.count(Scan.id)).where( Scan.flagged == True, Scan.started_at >= func.datetime("now", "-7 days"), ) ) total_findings = await session.scalar(select(func.count(Finding.id))) top_rules = ( await session.execute( select( func.json_extract(Finding.data, "$.rule").label("rule"), func.count(Finding.id).label("cnt"), ) .group_by(text("rule")) .order_by(text("cnt DESC")) .limit(10) ) ).all() latest_scan = await session.scalar(select(Scan).order_by(Scan.started_at.desc()).limit(1)) return { "total_scans": total_scans, "flagged_scans": flagged_scans, "recent_flagged": recent_flagged, "total_findings": total_findings, "top_rules": [{"rule": r.rule, "count": r.cnt} for r in top_rules], "latest_scan_at": latest_scan.started_at.isoformat() if latest_scan else None, } @router.get("/{scan_id}") async def get_scan(scan_id: int, session: AsyncSession = Depends(get_session)): scan = await session.scalar( select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings)) ) if not scan: return {"detail": "Not found"} return { "id": scan.id, "package_name": scan.package_name, "package_version": scan.package_version, "ecosystem": scan.ecosystem, "repository": scan.repository, "nexus_asset_url": scan.nexus_asset_url, "sha256": scan.sha256, "status": scan.status, "total_findings": scan.total_findings, "flagged": scan.flagged, "started_at": scan.started_at.isoformat() if scan.started_at else None, "finished_at": scan.finished_at.isoformat() if scan.finished_at else None, "error_message": scan.error_message, "findings": [{"id": f.id, **f.data} for f in scan.findings], }