Files
guarddog-nexus/guarddog_nexus/routes/api_findings.py
Marker689 8726b65808 refactor: реструктуризация — core/, db/, routes/, web/
guarddog_nexus/
├── core/          scanner, harvester, nexus, llm
├── db/            engine, models, queries
├── routes/        webhooks, api_*, web
└── web/           templates + static

- 11 файлов перемещено (git mv — сохранена история)
- Все импорты обновлены (~15 файлов)
- main.py, tests — исправлены пути
- 50/50 тестов, ruff clean
2026-05-10 07:17:41 +03:00

87 lines
2.4 KiB
Python

"""REST API for findings (across all scans)."""
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from ..config import config
from ..constants import (
DEFAULT_OFFSET,
DEFAULT_PAGE_SIZE,
JSON_PATH_RULE,
JSON_PATH_SEVERITY,
MAX_PAGE_SIZE,
)
from ..db.engine import get_session
from ..db.models import Finding
router = APIRouter(prefix="/api/v1/findings", tags=["findings"])
@router.get("")
async def list_findings(
limit: int = Query(DEFAULT_PAGE_SIZE, le=MAX_PAGE_SIZE),
offset: int = Query(DEFAULT_OFFSET, ge=0),
rule: str | None = Query(None),
severity: str | None = Query(None),
scan_id: int | None = Query(None),
session: AsyncSession = Depends(get_session),
):
q = select(Finding)
if rule:
q = q.where(func.json_extract(Finding.data, JSON_PATH_RULE) == rule)
if severity:
q = q.where(func.json_extract(Finding.data, JSON_PATH_SEVERITY) == severity)
if scan_id:
q = q.where(Finding.scan_id == scan_id)
total = await session.scalar(select(func.count()).select_from(q.subquery()))
findings = (await session.execute(q.offset(offset).limit(limit))).scalars().all()
return {
"total": total,
"limit": limit,
"offset": offset,
"findings": [
{
"id": f.id,
"scan_id": f.scan_id,
**f.data,
"report": f.report,
"created_at": f.created_at.isoformat() if f.created_at else None,
}
for f in findings
],
}
@router.post("/{finding_id}/analyze")
async def analyze_finding_endpoint(
finding_id: int,
session: AsyncSession = Depends(get_session),
):
"""Manually trigger LLM analysis for a single finding."""
if not config.llm_enabled:
return {"detail": "LLM analysis is disabled"}
finding = await session.scalar(
select(Finding).where(Finding.id == finding_id)
)
if not finding:
return {"detail": "Not found"}
from ..core.llm import analyze_finding
report = await analyze_finding(finding.data)
if report is None:
return {"detail": "LLM analysis failed"}
finding.report = report
await session.commit()
return {
"id": finding.id,
**finding.data,
"report": report,
}