Files
guarddog-nexus/guarddog_nexus/api/scans.py
Marker689 97b89d3fdf ui: добавить экспорт в CSV для сканирований и пакетов
- api/scans.py: добавить /api/v1/scans/export endpoint с фильтрами
- api/packages.py: добавить /api/v1/packages/export endpoint с фильтрами
- scans_list.html: добавить кнопку Export CSV в filter bar
- packages_list.html: добавить кнопку Export CSV в filter bar
- CSV включает все поля с правильным форматированием
2026-05-10 03:16:58 +03:00

189 lines
6.1 KiB
Python

"""REST API for scans."""
import csv
import io
from fastapi import APIRouter, Depends, Query, Response
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from guarddog_nexus.database import get_session
from guarddog_nexus.models import Finding, Scan
router = APIRouter(prefix="/api/v1/scans", tags=["scans"])
VALID_SORT_FIELDS = {
"id": Scan.id,
"package_name": Scan.package_name,
"started_at": Scan.started_at,
"status": Scan.status,
"total_findings": Scan.total_findings,
"flagged": Scan.flagged,
}
@router.get("")
async def list_scans(
limit: int = Query(50, le=200),
offset: int = Query(0, ge=0),
flagged: bool | None = Query(None),
search: str | None = Query(None),
status: str | None = Query(None),
repository: str | None = Query(None),
sort_by: str = Query("started_at"),
sort_dir: str = Query("desc"),
session: AsyncSession = Depends(get_session),
):
q = select(Scan)
if flagged is not None:
q = q.where(Scan.flagged == flagged)
if status:
q = q.where(Scan.status == status)
if repository:
q = q.where(Scan.repository == repository)
if search:
pattern = f"%{search}%"
q = q.where(
Scan.package_name.ilike(pattern) | Scan.package_version.ilike(pattern)
)
sort_field = VALID_SORT_FIELDS.get(sort_by, Scan.started_at)
sort_dir = "asc" if sort_dir.lower() == "asc" else "desc"
q = q.order_by(sort_field.desc() if sort_dir == "desc" else sort_field.asc())
q = q.offset(offset).limit(limit)
total = await session.scalar(select(func.count(Scan.id)))
scans = (await session.execute(q)).scalars().all()
return {
"total": total,
"limit": limit,
"offset": offset,
"scans": [
{
"id": s.id,
"package_name": s.package_name,
"package_version": s.package_version,
"ecosystem": s.ecosystem,
"repository": s.repository,
"status": s.status,
"total_findings": s.total_findings,
"flagged": s.flagged,
"started_at": s.started_at.isoformat() if s.started_at else None,
"finished_at": s.finished_at.isoformat() if s.finished_at else None,
"error_message": s.error_message,
}
for s in scans
],
}
@router.get("/export")
async def export_scans_csv(
flagged: bool | None = Query(None),
search: str | None = Query(None),
status: str | None = Query(None),
session: AsyncSession = Depends(get_session),
):
q = select(Scan)
if flagged is not None:
q = q.where(Scan.flagged == flagged)
if status:
q = q.where(Scan.status == status)
if search:
pattern = f"%{search}%"
q = q.where(
Scan.package_name.ilike(pattern) | Scan.package_version.ilike(pattern)
)
q = q.order_by(Scan.started_at.desc())
scans = (await session.execute(q)).scalars().all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow([
"id", "package_name", "package_version", "ecosystem", "repository",
"status", "total_findings", "flagged", "started_at", "finished_at",
"error_message", "sha256"
])
for s in scans:
writer.writerow([
s.id, s.package_name, s.package_version, s.ecosystem, s.repository,
s.status, s.total_findings, s.flagged,
s.started_at.isoformat() if s.started_at else "",
s.finished_at.isoformat() if s.finished_at else "",
s.error_message or "",
s.sha256 or "",
])
return Response(
content=output.getvalue(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=scans_export.csv"},
)
@router.get("/stats")
async def scan_stats(session: AsyncSession = Depends(get_session)):
total_scans = await session.scalar(select(func.count(Scan.id)))
flagged_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.flagged == True))
recent_flagged = await session.scalar(
select(func.count(Scan.id)).where(
Scan.flagged == True,
Scan.started_at >= func.datetime("now", "-7 days"),
)
)
total_findings = await session.scalar(select(func.count(Finding.id)))
top_rules = (
await session.execute(
select(
func.json_extract(Finding.data, "$.rule").label("rule"),
func.count(Finding.id).label("cnt"),
)
.group_by(text("rule"))
.order_by(text("cnt DESC"))
.limit(10)
)
).all()
latest_scan = await session.scalar(select(Scan).order_by(Scan.started_at.desc()).limit(1))
return {
"total_scans": total_scans,
"flagged_scans": flagged_scans,
"recent_flagged": recent_flagged,
"total_findings": total_findings,
"top_rules": [{"rule": r.rule, "count": r.cnt} for r in top_rules],
"latest_scan_at": latest_scan.started_at.isoformat() if latest_scan else None,
}
@router.get("/{scan_id}")
async def get_scan(scan_id: int, session: AsyncSession = Depends(get_session)):
scan = await session.scalar(
select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings))
)
if not scan:
return {"detail": "Not found"}
return {
"id": scan.id,
"package_name": scan.package_name,
"package_version": scan.package_version,
"ecosystem": scan.ecosystem,
"repository": scan.repository,
"nexus_asset_url": scan.nexus_asset_url,
"sha256": scan.sha256,
"status": scan.status,
"total_findings": scan.total_findings,
"flagged": scan.flagged,
"started_at": scan.started_at.isoformat() if scan.started_at else None,
"finished_at": scan.finished_at.isoformat() if scan.finished_at else None,
"error_message": scan.error_message,
"findings": [{"id": f.id, **f.data} for f in scan.findings],
}