176 lines
5.5 KiB
Python
176 lines
5.5 KiB
Python
"""REST API for scans."""
|
|
|
|
import csv
|
|
import io
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Response
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from ..constants import (
|
|
CSV_MEDIA_TYPE,
|
|
DEFAULT_OFFSET,
|
|
DEFAULT_PAGE_SIZE,
|
|
DEFAULT_SORT_BY_SCANS,
|
|
DEFAULT_SORT_DIR,
|
|
MAX_PAGE_SIZE,
|
|
)
|
|
from ..db.engine import get_session
|
|
from ..db.models import Scan
|
|
from ..db.queries import build_scan_list_query, get_dashboard_stats
|
|
from ..schemas import ScanDetailOut, ScanListResponse, StatsResponse, serialize_finding
|
|
|
|
router = APIRouter(prefix="/api/v1/scans", tags=["scans"])
|
|
|
|
|
|
@router.get("", response_model=ScanListResponse)
|
|
async def list_scans(
|
|
limit: int = Query(DEFAULT_PAGE_SIZE, le=MAX_PAGE_SIZE),
|
|
offset: int = Query(DEFAULT_OFFSET, ge=0),
|
|
flagged: bool | None = Query(None),
|
|
search: str | None = Query(None),
|
|
status: str | None = Query(None),
|
|
repository: str | None = Query(None),
|
|
sort_by: str = Query(DEFAULT_SORT_BY_SCANS),
|
|
sort_dir: str = Query(DEFAULT_SORT_DIR),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> dict:
|
|
q, count_q = build_scan_list_query(
|
|
flagged=flagged,
|
|
status=status,
|
|
repository=repository,
|
|
search=search,
|
|
sort_by=sort_by,
|
|
sort_dir=sort_dir,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
scans = (await session.execute(q)).scalars().all()
|
|
total = await session.scalar(count_q)
|
|
|
|
return {
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"scans": [
|
|
{
|
|
"id": s.id,
|
|
"package_name": s.package_name,
|
|
"package_version": s.package_version,
|
|
"ecosystem": s.ecosystem,
|
|
"repository": s.repository,
|
|
"status": s.status,
|
|
"total_findings": s.total_findings,
|
|
"flagged": s.flagged,
|
|
"started_at": s.started_at.isoformat() if s.started_at else None,
|
|
"finished_at": s.finished_at.isoformat() if s.finished_at else None,
|
|
"error_message": s.error_message,
|
|
}
|
|
for s in scans
|
|
],
|
|
}
|
|
|
|
|
|
@router.get("/export")
|
|
async def export_scans_csv(
|
|
flagged: bool | None = Query(None),
|
|
search: str | None = Query(None),
|
|
status: str | None = Query(None),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> Response:
|
|
q, _count_q = build_scan_list_query(
|
|
flagged=flagged,
|
|
status=status,
|
|
search=search,
|
|
sort_by=DEFAULT_SORT_BY_SCANS,
|
|
sort_dir=DEFAULT_SORT_DIR,
|
|
limit=MAX_PAGE_SIZE,
|
|
offset=0,
|
|
)
|
|
scans = (await session.execute(q)).scalars().all()
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow(
|
|
[
|
|
"id",
|
|
"package_name",
|
|
"package_version",
|
|
"ecosystem",
|
|
"repository",
|
|
"status",
|
|
"total_findings",
|
|
"flagged",
|
|
"started_at",
|
|
"finished_at",
|
|
"error_message",
|
|
"sha256",
|
|
]
|
|
)
|
|
for s in scans:
|
|
writer.writerow(
|
|
[
|
|
s.id,
|
|
s.package_name,
|
|
s.package_version,
|
|
s.ecosystem,
|
|
s.repository,
|
|
s.status,
|
|
s.total_findings,
|
|
s.flagged,
|
|
s.started_at.isoformat() if s.started_at else "",
|
|
s.finished_at.isoformat() if s.finished_at else "",
|
|
s.error_message or "",
|
|
s.sha256 or "",
|
|
]
|
|
)
|
|
|
|
return Response(
|
|
content=output.getvalue(),
|
|
media_type=CSV_MEDIA_TYPE,
|
|
headers={"Content-Disposition": "attachment; filename=scans_export.csv"},
|
|
)
|
|
|
|
|
|
@router.get("/stats", response_model=StatsResponse)
|
|
async def scan_stats(session: AsyncSession = Depends(get_session)) -> dict:
|
|
dashboard = await get_dashboard_stats(session)
|
|
return {
|
|
"total_scans": dashboard["total_scans"],
|
|
"flagged_scans": dashboard["flagged_scans"],
|
|
"recent_flagged": dashboard["recent_flagged"],
|
|
"total_findings": dashboard["total_findings"],
|
|
"top_rules": dashboard["top_rules"],
|
|
"latest_scan_at": dashboard["latest_flagged"][0].started_at.isoformat()
|
|
if dashboard["latest_flagged"] and dashboard["latest_flagged"][0].started_at
|
|
else None,
|
|
}
|
|
|
|
|
|
@router.get("/{scan_id}", response_model=ScanDetailOut)
|
|
async def get_scan(scan_id: int, session: AsyncSession = Depends(get_session)) -> dict:
|
|
scan = await session.scalar(
|
|
select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings))
|
|
)
|
|
if not scan:
|
|
raise HTTPException(status_code=404, detail="Scan not found")
|
|
return {
|
|
"id": scan.id,
|
|
"package_name": scan.package_name,
|
|
"package_version": scan.package_version,
|
|
"ecosystem": scan.ecosystem,
|
|
"repository": scan.repository,
|
|
"nexus_asset_url": scan.nexus_asset_url,
|
|
"sha256": scan.sha256,
|
|
"status": scan.status,
|
|
"total_findings": scan.total_findings,
|
|
"flagged": scan.flagged,
|
|
"started_at": scan.started_at.isoformat() if scan.started_at else None,
|
|
"finished_at": scan.finished_at.isoformat() if scan.finished_at else None,
|
|
"error_message": scan.error_message,
|
|
"initiator": scan.initiator,
|
|
"source_ip": scan.source_ip,
|
|
"findings": [serialize_finding(f) for f in scan.findings],
|
|
}
|