Files
guarddog-nexus/guarddog_nexus/routes/api_packages.py
Marker689 8726b65808 refactor: реструктуризация — core/, db/, routes/, web/
guarddog_nexus/
├── core/          scanner, harvester, nexus, llm
├── db/            engine, models, queries
├── routes/        webhooks, api_*, web
└── web/           templates + static

- 11 файлов перемещено (git mv — сохранена история)
- Все импорты обновлены (~15 файлов)
- main.py, tests — исправлены пути
- 50/50 тестов, ruff clean
2026-05-10 07:17:41 +03:00

164 lines
4.5 KiB
Python

"""REST API for packages (distinct packages across scans)."""
import csv
import io
from urllib.parse import unquote
from fastapi import APIRouter, Depends, Query, Response
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ..constants import (
CSV_MEDIA_TYPE,
DEFAULT_OFFSET,
DEFAULT_PAGE_SIZE,
DEFAULT_SORT_BY_PACKAGES,
DEFAULT_SORT_DIR,
MAX_PAGE_SIZE,
)
from ..db.engine import get_session
from ..db.models import Finding, Scan
from ..db.queries import build_package_list_query
router = APIRouter(prefix="/api/v1/packages", tags=["packages"])
@router.get("")
async def list_packages(
limit: int = Query(DEFAULT_PAGE_SIZE, le=MAX_PAGE_SIZE),
offset: int = Query(DEFAULT_OFFSET, ge=0),
ecosystem: str | None = Query(None),
flagged: bool | None = Query(None),
search: str | None = Query(None),
repository: str | None = Query(None),
sort_by: str = Query(DEFAULT_SORT_BY_PACKAGES),
sort_dir: str = Query(DEFAULT_SORT_DIR),
session: AsyncSession = Depends(get_session),
):
rows_q, total_q = build_package_list_query(
flagged=flagged,
ecosystem=ecosystem,
repository=repository,
search=search,
sort_by=sort_by,
sort_dir=sort_dir,
limit=limit,
offset=offset,
)
total = await session.scalar(total_q)
rows = (await session.execute(rows_q)).all()
return {
"total": total,
"limit": limit,
"offset": offset,
"packages": [
{
"name": r.pkg_name,
"version": r.pkg_ver,
"ecosystem": r.ecosystem,
"repository": r.repository,
"last_scanned_at": r.last_scan.isoformat() if r.last_scan else None,
"flagged": bool(r.is_flagged),
"total_findings": r.findings_sum,
"latest_scan_id": r.sid,
}
for r in rows
],
}
@router.get("/export")
async def export_packages_csv(
flagged: bool | None = Query(None),
search: str | None = Query(None),
session: AsyncSession = Depends(get_session),
):
rows_q, _total_q = build_package_list_query(
flagged=flagged,
search=search,
sort_by=DEFAULT_SORT_BY_PACKAGES,
sort_dir=DEFAULT_SORT_DIR,
limit=MAX_PAGE_SIZE,
offset=0,
)
rows = (await session.execute(rows_q)).all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(
[
"name", "version", "ecosystem", "repository",
"last_scanned_at", "flagged", "total_findings",
]
)
for r in rows:
writer.writerow(
[
r.pkg_name, r.pkg_ver, r.ecosystem, r.repository,
r.last_scan.isoformat() if r.last_scan else "",
bool(r.is_flagged),
r.findings_sum,
]
)
return Response(
content=output.getvalue(),
media_type=CSV_MEDIA_TYPE,
headers={"Content-Disposition": "attachment; filename=packages_export.csv"},
)
@router.get("/{name:path}")
async def get_package(
name: str,
session: AsyncSession = Depends(get_session),
):
parts = name.rsplit("/", 1)
pkg_name = unquote(parts[0])
pkg_version = unquote(parts[1]) if len(parts) == 2 else ""
scans = (
(
await session.execute(
select(Scan)
.where(Scan.package_name == pkg_name, Scan.package_version == pkg_version)
.order_by(Scan.started_at.desc())
)
)
.scalars()
.all()
)
if not scans:
return {"detail": "Not found"}
all_findings: list[dict] = []
for s in scans:
findings = (
(await session.execute(select(Finding).where(Finding.scan_id == s.id)))
.scalars()
.all()
)
for f in findings:
all_findings.append({"id": f.id, **f.data, "report": f.report})
return {
"name": scans[0].package_name,
"version": scans[0].package_version,
"ecosystem": scans[0].ecosystem,
"repository": scans[0].repository,
"flagged": any(s.flagged for s in scans),
"scans": [
{
"id": s.id,
"status": s.status,
"total_findings": s.total_findings,
"flagged": s.flagged,
"started_at": s.started_at.isoformat() if s.started_at else None,
}
for s in scans
],
"findings": all_findings,
}