Files
guarddog-nexus/guarddog_nexus/api/scans.py
Marker689 00424b494a ui: добавить поиск, фильтрацию и сортировку на списки
API:
- scans.py: добавить search, status, sort_by, sort_dir параметры
- packages.py: добавить search, sort_by, sort_dir параметры
- web/routes.py: передать новые параметры в шаблоны

UI:
- scans_list.html: search input (htmx debounce), status filter dropdown,
  sortable columns с hx-get, empty state
- packages_list.html: search input (htmx debounce), sortable columns,
  empty state
- pagination сохраняет все параметры фильтрации/сортировки
2026-05-10 03:12:26 +03:00

140 lines
4.6 KiB
Python

"""REST API for scans."""
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from guarddog_nexus.database import get_session
from guarddog_nexus.models import Finding, Scan
router = APIRouter(prefix="/api/v1/scans", tags=["scans"])
VALID_SORT_FIELDS = {
"id": Scan.id,
"package_name": Scan.package_name,
"started_at": Scan.started_at,
"status": Scan.status,
"total_findings": Scan.total_findings,
"flagged": Scan.flagged,
}
@router.get("")
async def list_scans(
limit: int = Query(50, le=200),
offset: int = Query(0, ge=0),
flagged: bool | None = Query(None),
search: str | None = Query(None),
status: str | None = Query(None),
repository: str | None = Query(None),
sort_by: str = Query("started_at"),
sort_dir: str = Query("desc"),
session: AsyncSession = Depends(get_session),
):
q = select(Scan)
if flagged is not None:
q = q.where(Scan.flagged == flagged)
if status:
q = q.where(Scan.status == status)
if repository:
q = q.where(Scan.repository == repository)
if search:
pattern = f"%{search}%"
q = q.where(
Scan.package_name.ilike(pattern) | Scan.package_version.ilike(pattern)
)
sort_field = VALID_SORT_FIELDS.get(sort_by, Scan.started_at)
sort_dir = "asc" if sort_dir.lower() == "asc" else "desc"
q = q.order_by(sort_field.desc() if sort_dir == "desc" else sort_field.asc())
q = q.offset(offset).limit(limit)
total = await session.scalar(select(func.count(Scan.id)))
scans = (await session.execute(q)).scalars().all()
return {
"total": total,
"limit": limit,
"offset": offset,
"scans": [
{
"id": s.id,
"package_name": s.package_name,
"package_version": s.package_version,
"ecosystem": s.ecosystem,
"repository": s.repository,
"status": s.status,
"total_findings": s.total_findings,
"flagged": s.flagged,
"started_at": s.started_at.isoformat() if s.started_at else None,
"finished_at": s.finished_at.isoformat() if s.finished_at else None,
"error_message": s.error_message,
}
for s in scans
],
}
@router.get("/stats")
async def scan_stats(session: AsyncSession = Depends(get_session)):
total_scans = await session.scalar(select(func.count(Scan.id)))
flagged_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.flagged == True))
recent_flagged = await session.scalar(
select(func.count(Scan.id)).where(
Scan.flagged == True,
Scan.started_at >= func.datetime("now", "-7 days"),
)
)
total_findings = await session.scalar(select(func.count(Finding.id)))
top_rules = (
await session.execute(
select(
func.json_extract(Finding.data, "$.rule").label("rule"),
func.count(Finding.id).label("cnt"),
)
.group_by(text("rule"))
.order_by(text("cnt DESC"))
.limit(10)
)
).all()
latest_scan = await session.scalar(select(Scan).order_by(Scan.started_at.desc()).limit(1))
return {
"total_scans": total_scans,
"flagged_scans": flagged_scans,
"recent_flagged": recent_flagged,
"total_findings": total_findings,
"top_rules": [{"rule": r.rule, "count": r.cnt} for r in top_rules],
"latest_scan_at": latest_scan.started_at.isoformat() if latest_scan else None,
}
@router.get("/{scan_id}")
async def get_scan(scan_id: int, session: AsyncSession = Depends(get_session)):
scan = await session.scalar(
select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings))
)
if not scan:
return {"detail": "Not found"}
return {
"id": scan.id,
"package_name": scan.package_name,
"package_version": scan.package_version,
"ecosystem": scan.ecosystem,
"repository": scan.repository,
"nexus_asset_url": scan.nexus_asset_url,
"sha256": scan.sha256,
"status": scan.status,
"total_findings": scan.total_findings,
"flagged": scan.flagged,
"started_at": scan.started_at.isoformat() if scan.started_at else None,
"finished_at": scan.finished_at.isoformat() if scan.finished_at else None,
"error_message": scan.error_message,
"findings": [{"id": f.id, **f.data} for f in scan.findings],
}