## Часть A: Вынос хардкода
- Новый модуль constants.py — все magic strings, лимиты, severity, ключи
(104 хардкод-значения централизованы)
- Новый модуль queries.py — общие SQL-запросы (build_scan_list_query,
build_package_list_query, get_dashboard_stats)
Убрана дупликация между api/*.py и web/routes.py (~90%)
- config.py: добавлены NLP_ENABLED, nexus_timeout, guarddog_binary,
log_syslog_facility, LLM-переменные
- nexus_client.py: таймауты из конфига, SHA256_CHUNK_SIZE из constants
- scanner.py: error-ключи из constants, GUARDDOG_OUTPUT_FORMAT из constants
- webhooks.py: RELEVANT_WEBHOOK_ACTIONS, METADATA_PATTERNS, ignore-строки
из constants
- logging_setup.py: конфигурируемый syslog facility, APP_PACKAGE из constants
- main.py: APP_NAME, APP_DESCRIPTION, APP_PACKAGE из constants
- models.py: поле report: JSON | None в Finding для LLM-отчётов
- harvester.py: авто-очистка tmpdir через finally; ERROR_MESSAGE_MAX_LENGTH
из constants; PACKAGE_EXTENSIONS вместо SUPPORTED_EXTENSIONS (с .gem)
- api/*.py + web/routes.py: используют build_*_query из queries.py,
константы для лимитов и сортировок
- tests/conftest.py: SEVERITY_WARNING, DEFAULT_ECOSYSTEM из constants
## Часть B: LLM-анализ finding'ов
- llm.py: клиент для OpenAI-совместимых API с промптом security-аналитика
- harvester.py: авто-триггер после flagged scan, сохранение report в БД
- api/findings.py: POST /{id}/analyze — ручной триггер
- web/routes.py: POST /api/v1/findings/{id}/analyze — HTMX-фрагмент
- _llm_report_fragment.html: шаблон фрагмента с вердиктом
- scan_detail.html, package_detail.html: кнопка Analyze with LLM
(htmx-post, spinner, inline-замена на LLM-отчёт)
- style.css: стили для .llm-report .verdict-safe/suspicious/malicious
## Часть C: Тесты
- 50 тестов, все зелёные
- Линтер чистый
- Тесты используют constants где нужно
223 lines
6.0 KiB
Python
223 lines
6.0 KiB
Python
"""Web UI routes — Jinja2 + htmx pages."""
|
|
|
|
from fastapi import APIRouter, Depends, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from jinja2 import Environment, PackageLoader, select_autoescape
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from guarddog_nexus.constants import (
|
|
APP_PACKAGE,
|
|
DEFAULT_SORT_BY_PACKAGES,
|
|
DEFAULT_SORT_BY_SCANS,
|
|
DEFAULT_SORT_DIR,
|
|
WEB_PER_PAGE,
|
|
)
|
|
from guarddog_nexus.database import get_session
|
|
from guarddog_nexus.models import Finding, Scan
|
|
from guarddog_nexus.queries import (
|
|
build_package_list_query,
|
|
build_scan_list_query,
|
|
get_dashboard_stats,
|
|
)
|
|
|
|
router = APIRouter(tags=["web"])
|
|
|
|
_jinja_env = Environment(
|
|
loader=PackageLoader(APP_PACKAGE, "web/templates"),
|
|
autoescape=select_autoescape(),
|
|
)
|
|
|
|
|
|
def _render(name: str, **context) -> HTMLResponse:
|
|
template = _jinja_env.get_template(name)
|
|
return HTMLResponse(template.render(**context))
|
|
|
|
|
|
@router.get("/", response_class=HTMLResponse)
|
|
async def dashboard(request: Request, session: AsyncSession = Depends(get_session)):
|
|
ctx = await get_dashboard_stats(session)
|
|
return _render("dashboard.html", **ctx, request=request)
|
|
|
|
|
|
@router.get("/dashboard/stats", response_class=HTMLResponse)
|
|
async def dashboard_stats_fragment(session: AsyncSession = Depends(get_session)):
|
|
ctx = await get_dashboard_stats(session)
|
|
return _render("dashboard_stats.html", **ctx)
|
|
|
|
|
|
@router.get("/scans", response_class=HTMLResponse)
|
|
async def scans_list(
|
|
request: Request,
|
|
page: int = 1,
|
|
flagged: str = "",
|
|
search: str = "",
|
|
status: str = "",
|
|
sort_by: str = DEFAULT_SORT_BY_SCANS,
|
|
sort_dir: str = DEFAULT_SORT_DIR,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
per_page = WEB_PER_PAGE
|
|
offset = (page - 1) * per_page
|
|
|
|
flagged_bool = None
|
|
if flagged == "1":
|
|
flagged_bool = True
|
|
|
|
q, count_q = build_scan_list_query(
|
|
flagged=flagged_bool,
|
|
status=status or None,
|
|
search=search or None,
|
|
sort_by=sort_by,
|
|
sort_dir=sort_dir,
|
|
limit=per_page,
|
|
offset=offset,
|
|
)
|
|
scans = (await session.execute(q)).scalars().all()
|
|
total = await session.scalar(count_q)
|
|
|
|
return _render(
|
|
"scans_list.html",
|
|
scans=scans,
|
|
page=page,
|
|
per_page=per_page,
|
|
total=total,
|
|
flagged_filter=flagged,
|
|
search=search,
|
|
status_filter=status,
|
|
sort_by=sort_by,
|
|
sort_dir=sort_dir,
|
|
request=request,
|
|
)
|
|
|
|
|
|
@router.get("/scans/{scan_id}", response_class=HTMLResponse)
|
|
async def scan_detail(
|
|
scan_id: int, request: Request, session: AsyncSession = Depends(get_session)
|
|
):
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
scan = await session.scalar(
|
|
select(Scan)
|
|
.where(Scan.id == scan_id)
|
|
.options(selectinload(Scan.findings))
|
|
)
|
|
if not scan:
|
|
return HTMLResponse("<h1>Not found</h1>", status_code=404)
|
|
|
|
return _render("scan_detail.html", scan=scan, request=request)
|
|
|
|
|
|
@router.get("/packages", response_class=HTMLResponse)
|
|
async def packages_list(
|
|
request: Request,
|
|
page: int = 1,
|
|
flagged: str = "",
|
|
search: str = "",
|
|
sort_by: str = DEFAULT_SORT_BY_PACKAGES,
|
|
sort_dir: str = DEFAULT_SORT_DIR,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
per_page = WEB_PER_PAGE
|
|
offset = (page - 1) * per_page
|
|
|
|
flagged_bool = None
|
|
if flagged == "1":
|
|
flagged_bool = True
|
|
|
|
rows_q, total_q = build_package_list_query(
|
|
flagged=flagged_bool,
|
|
search=search or None,
|
|
sort_by=sort_by,
|
|
sort_dir=sort_dir,
|
|
limit=per_page,
|
|
offset=offset,
|
|
)
|
|
total = await session.scalar(total_q)
|
|
rows = (await session.execute(rows_q)).all()
|
|
|
|
return _render(
|
|
"packages_list.html",
|
|
packages=rows,
|
|
page=page,
|
|
per_page=per_page,
|
|
total=total,
|
|
flagged_filter=flagged,
|
|
search=search,
|
|
sort_by=sort_by,
|
|
sort_dir=sort_dir,
|
|
request=request,
|
|
)
|
|
|
|
|
|
@router.get("/packages/{name}/{version}", response_class=HTMLResponse)
|
|
async def package_detail(
|
|
name: str,
|
|
version: str,
|
|
request: Request,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
scans = (
|
|
(
|
|
await session.execute(
|
|
select(Scan)
|
|
.where(Scan.package_name == name, Scan.package_version == version)
|
|
.options(selectinload(Scan.findings))
|
|
.order_by(Scan.started_at.desc())
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
if not scans:
|
|
return HTMLResponse("<h1>Not found</h1>", status_code=404)
|
|
|
|
all_findings = []
|
|
for s in scans:
|
|
all_findings.extend(s.findings)
|
|
|
|
return _render(
|
|
"package_detail.html",
|
|
pkg_name=name,
|
|
pkg_version=version,
|
|
scans=scans,
|
|
findings=all_findings,
|
|
request=request,
|
|
)
|
|
|
|
|
|
@router.post("/api/v1/findings/{finding_id}/analyze", response_class=HTMLResponse)
|
|
async def analyze_finding_htmx(
|
|
finding_id: int,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
"""HTMX fragment: trigger LLM analysis and return styled result HTML."""
|
|
from guarddog_nexus.config import config
|
|
from guarddog_nexus.llm import analyze_finding
|
|
|
|
if not config.llm_enabled:
|
|
return HTMLResponse(
|
|
'<div class="llm-actions"><small class="flagged">LLM analysis is disabled</small></div>'
|
|
)
|
|
|
|
finding = await session.scalar(select(Finding).where(Finding.id == finding_id))
|
|
if not finding:
|
|
return HTMLResponse(
|
|
'<div class="llm-actions"><small class="flagged">Finding not found</small></div>',
|
|
status_code=404,
|
|
)
|
|
|
|
report = await analyze_finding(finding.data)
|
|
if report is None:
|
|
return HTMLResponse(
|
|
'<div class="llm-actions"><small class="flagged">LLM analysis failed</small></div>'
|
|
)
|
|
|
|
finding.report = report
|
|
await session.commit()
|
|
|
|
return _render("_llm_report_fragment.html", report=report)
|