"""REST API for packages (distinct packages across scans).""" import csv import io from fastapi import APIRouter, Depends, Query, Response from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from guarddog_nexus.constants import ( CSV_MEDIA_TYPE, DEFAULT_OFFSET, DEFAULT_PAGE_SIZE, DEFAULT_SORT_BY_PACKAGES, DEFAULT_SORT_DIR, MAX_PAGE_SIZE, ) from guarddog_nexus.database import get_session from guarddog_nexus.models import Finding, Scan from guarddog_nexus.queries import build_package_list_query router = APIRouter(prefix="/api/v1/packages", tags=["packages"]) @router.get("") async def list_packages( limit: int = Query(DEFAULT_PAGE_SIZE, le=MAX_PAGE_SIZE), offset: int = Query(DEFAULT_OFFSET, ge=0), ecosystem: str | None = Query(None), flagged: bool | None = Query(None), search: str | None = Query(None), repository: str | None = Query(None), sort_by: str = Query(DEFAULT_SORT_BY_PACKAGES), sort_dir: str = Query(DEFAULT_SORT_DIR), session: AsyncSession = Depends(get_session), ): rows_q, total_q = build_package_list_query( flagged=flagged, ecosystem=ecosystem, repository=repository, search=search, sort_by=sort_by, sort_dir=sort_dir, limit=limit, offset=offset, ) total = await session.scalar(total_q) rows = (await session.execute(rows_q)).all() return { "total": total, "limit": limit, "offset": offset, "packages": [ { "name": r.pkg_name, "version": r.pkg_ver, "ecosystem": r.ecosystem, "repository": r.repository, "last_scanned_at": r.last_scan.isoformat() if r.last_scan else None, "flagged": bool(r.is_flagged), "total_findings": r.findings_sum, "latest_scan_id": r.sid, } for r in rows ], } @router.get("/export") async def export_packages_csv( flagged: bool | None = Query(None), search: str | None = Query(None), session: AsyncSession = Depends(get_session), ): rows_q, _total_q = build_package_list_query( flagged=flagged, search=search, sort_by=DEFAULT_SORT_BY_PACKAGES, sort_dir=DEFAULT_SORT_DIR, limit=MAX_PAGE_SIZE, offset=0, ) rows = (await session.execute(rows_q)).all() output = io.StringIO() writer = csv.writer(output) writer.writerow( [ "name", "version", "ecosystem", "repository", "last_scanned_at", "flagged", "total_findings", ] ) for r in rows: writer.writerow( [ r.pkg_name, r.pkg_ver, r.ecosystem, r.repository, r.last_scan.isoformat() if r.last_scan else "", bool(r.is_flagged), r.findings_sum, ] ) return Response( content=output.getvalue(), media_type=CSV_MEDIA_TYPE, headers={"Content-Disposition": "attachment; filename=packages_export.csv"}, ) @router.get("/{name}/{version}") async def get_package( name: str, version: str, session: AsyncSession = Depends(get_session), ): scans = ( ( await session.execute( select(Scan) .where(Scan.package_name == name, Scan.package_version == version) .order_by(Scan.started_at.desc()) ) ) .scalars() .all() ) if not scans: return {"detail": "Not found"} all_findings: list[dict] = [] for s in scans: findings = ( (await session.execute(select(Finding).where(Finding.scan_id == s.id))) .scalars() .all() ) for f in findings: all_findings.append({"id": f.id, **f.data, "report": f.report}) return { "name": scans[0].package_name, "version": scans[0].package_version, "ecosystem": scans[0].ecosystem, "repository": scans[0].repository, "flagged": any(s.flagged for s in scans), "scans": [ { "id": s.id, "status": s.status, "total_findings": s.total_findings, "flagged": s.flagged, "started_at": s.started_at.isoformat() if s.started_at else None, } for s in scans ], "findings": all_findings, }