"""REST API for scans.""" import csv import io from fastapi import APIRouter, Depends, Query, Response from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from ..constants import ( CSV_MEDIA_TYPE, DEFAULT_OFFSET, DEFAULT_PAGE_SIZE, DEFAULT_SORT_BY_SCANS, DEFAULT_SORT_DIR, MAX_PAGE_SIZE, ) from ..db.engine import get_session from ..db.models import Scan from ..db.queries import build_scan_list_query, get_dashboard_stats router = APIRouter(prefix="/api/v1/scans", tags=["scans"]) @router.get("") async def list_scans( limit: int = Query(DEFAULT_PAGE_SIZE, le=MAX_PAGE_SIZE), offset: int = Query(DEFAULT_OFFSET, ge=0), flagged: bool | None = Query(None), search: str | None = Query(None), status: str | None = Query(None), repository: str | None = Query(None), sort_by: str = Query(DEFAULT_SORT_BY_SCANS), sort_dir: str = Query(DEFAULT_SORT_DIR), session: AsyncSession = Depends(get_session), ): q, count_q = build_scan_list_query( flagged=flagged, status=status, repository=repository, search=search, sort_by=sort_by, sort_dir=sort_dir, limit=limit, offset=offset, ) scans = (await session.execute(q)).scalars().all() total = await session.scalar(count_q) return { "total": total, "limit": limit, "offset": offset, "scans": [ { "id": s.id, "package_name": s.package_name, "package_version": s.package_version, "ecosystem": s.ecosystem, "repository": s.repository, "status": s.status, "total_findings": s.total_findings, "flagged": s.flagged, "started_at": s.started_at.isoformat() if s.started_at else None, "finished_at": s.finished_at.isoformat() if s.finished_at else None, "error_message": s.error_message, } for s in scans ], } @router.get("/export") async def export_scans_csv( flagged: bool | None = Query(None), search: str | None = Query(None), status: str | None = Query(None), session: AsyncSession = Depends(get_session), ): q, _count_q = build_scan_list_query( flagged=flagged, status=status, search=search, sort_by=DEFAULT_SORT_BY_SCANS, sort_dir=DEFAULT_SORT_DIR, limit=MAX_PAGE_SIZE, offset=0, ) scans = (await session.execute(q)).scalars().all() output = io.StringIO() writer = csv.writer(output) writer.writerow( [ "id", "package_name", "package_version", "ecosystem", "repository", "status", "total_findings", "flagged", "started_at", "finished_at", "error_message", "sha256", ] ) for s in scans: writer.writerow( [ s.id, s.package_name, s.package_version, s.ecosystem, s.repository, s.status, s.total_findings, s.flagged, s.started_at.isoformat() if s.started_at else "", s.finished_at.isoformat() if s.finished_at else "", s.error_message or "", s.sha256 or "", ] ) return Response( content=output.getvalue(), media_type=CSV_MEDIA_TYPE, headers={"Content-Disposition": "attachment; filename=scans_export.csv"}, ) @router.get("/stats") async def scan_stats(session: AsyncSession = Depends(get_session)): dashboard = await get_dashboard_stats(session) return { "total_scans": dashboard["total_scans"], "flagged_scans": dashboard["flagged_scans"], "recent_flagged": dashboard["recent_flagged"], "total_findings": dashboard["total_findings"], "top_rules": dashboard["top_rules"], "latest_scan_at": dashboard["latest_flagged"][0].started_at.isoformat() if dashboard["latest_flagged"] else None, } @router.get("/{scan_id}") async def get_scan(scan_id: int, session: AsyncSession = Depends(get_session)): scan = await session.scalar( select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings)) ) if not scan: return {"detail": "Not found"} return { "id": scan.id, "package_name": scan.package_name, "package_version": scan.package_version, "ecosystem": scan.ecosystem, "repository": scan.repository, "nexus_asset_url": scan.nexus_asset_url, "sha256": scan.sha256, "status": scan.status, "total_findings": scan.total_findings, "flagged": scan.flagged, "started_at": scan.started_at.isoformat() if scan.started_at else None, "finished_at": scan.finished_at.isoformat() if scan.finished_at else None, "error_message": scan.error_message, "initiator": scan.initiator, "source_ip": scan.source_ip, "findings": [{"id": f.id, **f.data, "report": f.report} for f in scan.findings], }