## Часть A: Вынос хардкода
- Новый модуль constants.py — все magic strings, лимиты, severity, ключи
(104 хардкод-значения централизованы)
- Новый модуль queries.py — общие SQL-запросы (build_scan_list_query,
build_package_list_query, get_dashboard_stats)
Убрана дупликация между api/*.py и web/routes.py (~90%)
- config.py: добавлены NLP_ENABLED, nexus_timeout, guarddog_binary,
log_syslog_facility, LLM-переменные
- nexus_client.py: таймауты из конфига, SHA256_CHUNK_SIZE из constants
- scanner.py: error-ключи из constants, GUARDDOG_OUTPUT_FORMAT из constants
- webhooks.py: RELEVANT_WEBHOOK_ACTIONS, METADATA_PATTERNS, ignore-строки
из constants
- logging_setup.py: конфигурируемый syslog facility, APP_PACKAGE из constants
- main.py: APP_NAME, APP_DESCRIPTION, APP_PACKAGE из constants
- models.py: поле report: JSON | None в Finding для LLM-отчётов
- harvester.py: авто-очистка tmpdir через finally; ERROR_MESSAGE_MAX_LENGTH
из constants; PACKAGE_EXTENSIONS вместо SUPPORTED_EXTENSIONS (с .gem)
- api/*.py + web/routes.py: используют build_*_query из queries.py,
константы для лимитов и сортировок
- tests/conftest.py: SEVERITY_WARNING, DEFAULT_ECOSYSTEM из constants
## Часть B: LLM-анализ finding'ов
- llm.py: клиент для OpenAI-совместимых API с промптом security-аналитика
- harvester.py: авто-триггер после flagged scan, сохранение report в БД
- api/findings.py: POST /{id}/analyze — ручной триггер
- web/routes.py: POST /api/v1/findings/{id}/analyze — HTMX-фрагмент
- _llm_report_fragment.html: шаблон фрагмента с вердиктом
- scan_detail.html, package_detail.html: кнопка Analyze with LLM
(htmx-post, spinner, inline-замена на LLM-отчёт)
- style.css: стили для .llm-report .verdict-safe/suspicious/malicious
## Часть C: Тесты
- 50 тестов, все зелёные
- Линтер чистый
- Тесты используют constants где нужно
160 lines
4.4 KiB
Python
160 lines
4.4 KiB
Python
"""REST API for packages (distinct packages across scans)."""
|
|
|
|
import csv
|
|
import io
|
|
|
|
from fastapi import APIRouter, Depends, Query, Response
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from guarddog_nexus.constants import (
|
|
CSV_MEDIA_TYPE,
|
|
DEFAULT_OFFSET,
|
|
DEFAULT_PAGE_SIZE,
|
|
DEFAULT_SORT_BY_PACKAGES,
|
|
DEFAULT_SORT_DIR,
|
|
MAX_PAGE_SIZE,
|
|
)
|
|
from guarddog_nexus.database import get_session
|
|
from guarddog_nexus.models import Finding, Scan
|
|
from guarddog_nexus.queries import build_package_list_query
|
|
|
|
router = APIRouter(prefix="/api/v1/packages", tags=["packages"])
|
|
|
|
|
|
@router.get("")
|
|
async def list_packages(
|
|
limit: int = Query(DEFAULT_PAGE_SIZE, le=MAX_PAGE_SIZE),
|
|
offset: int = Query(DEFAULT_OFFSET, ge=0),
|
|
ecosystem: str | None = Query(None),
|
|
flagged: bool | None = Query(None),
|
|
search: str | None = Query(None),
|
|
repository: str | None = Query(None),
|
|
sort_by: str = Query(DEFAULT_SORT_BY_PACKAGES),
|
|
sort_dir: str = Query(DEFAULT_SORT_DIR),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
rows_q, total_q = build_package_list_query(
|
|
flagged=flagged,
|
|
ecosystem=ecosystem,
|
|
repository=repository,
|
|
search=search,
|
|
sort_by=sort_by,
|
|
sort_dir=sort_dir,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
total = await session.scalar(total_q)
|
|
rows = (await session.execute(rows_q)).all()
|
|
|
|
return {
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"packages": [
|
|
{
|
|
"name": r.pkg_name,
|
|
"version": r.pkg_ver,
|
|
"ecosystem": r.ecosystem,
|
|
"repository": r.repository,
|
|
"last_scanned_at": r.last_scan.isoformat() if r.last_scan else None,
|
|
"flagged": bool(r.is_flagged),
|
|
"total_findings": r.findings_sum,
|
|
"latest_scan_id": r.sid,
|
|
}
|
|
for r in rows
|
|
],
|
|
}
|
|
|
|
|
|
@router.get("/export")
|
|
async def export_packages_csv(
|
|
flagged: bool | None = Query(None),
|
|
search: str | None = Query(None),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
rows_q, _total_q = build_package_list_query(
|
|
flagged=flagged,
|
|
search=search,
|
|
sort_by=DEFAULT_SORT_BY_PACKAGES,
|
|
sort_dir=DEFAULT_SORT_DIR,
|
|
limit=MAX_PAGE_SIZE,
|
|
offset=0,
|
|
)
|
|
rows = (await session.execute(rows_q)).all()
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow(
|
|
[
|
|
"name", "version", "ecosystem", "repository",
|
|
"last_scanned_at", "flagged", "total_findings",
|
|
]
|
|
)
|
|
for r in rows:
|
|
writer.writerow(
|
|
[
|
|
r.pkg_name, r.pkg_ver, r.ecosystem, r.repository,
|
|
r.last_scan.isoformat() if r.last_scan else "",
|
|
bool(r.is_flagged),
|
|
r.findings_sum,
|
|
]
|
|
)
|
|
|
|
return Response(
|
|
content=output.getvalue(),
|
|
media_type=CSV_MEDIA_TYPE,
|
|
headers={"Content-Disposition": "attachment; filename=packages_export.csv"},
|
|
)
|
|
|
|
|
|
@router.get("/{name}/{version}")
|
|
async def get_package(
|
|
name: str,
|
|
version: str,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
scans = (
|
|
(
|
|
await session.execute(
|
|
select(Scan)
|
|
.where(Scan.package_name == name, Scan.package_version == version)
|
|
.order_by(Scan.started_at.desc())
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
if not scans:
|
|
return {"detail": "Not found"}
|
|
|
|
all_findings: list[dict] = []
|
|
for s in scans:
|
|
findings = (
|
|
(await session.execute(select(Finding).where(Finding.scan_id == s.id)))
|
|
.scalars()
|
|
.all()
|
|
)
|
|
for f in findings:
|
|
all_findings.append({"id": f.id, **f.data, "report": f.report})
|
|
|
|
return {
|
|
"name": scans[0].package_name,
|
|
"version": scans[0].package_version,
|
|
"ecosystem": scans[0].ecosystem,
|
|
"repository": scans[0].repository,
|
|
"flagged": any(s.flagged for s in scans),
|
|
"scans": [
|
|
{
|
|
"id": s.id,
|
|
"status": s.status,
|
|
"total_findings": s.total_findings,
|
|
"flagged": s.flagged,
|
|
"started_at": s.started_at.isoformat() if s.started_at else None,
|
|
}
|
|
for s in scans
|
|
],
|
|
"findings": all_findings,
|
|
}
|