refactor: реструктуризация — core/, db/, routes/, web/
guarddog_nexus/ ├── core/ scanner, harvester, nexus, llm ├── db/ engine, models, queries ├── routes/ webhooks, api_*, web └── web/ templates + static - 11 файлов перемещено (git mv — сохранена история) - Все импорты обновлены (~15 файлов) - main.py, tests — исправлены пути - 50/50 тестов, ruff clean
This commit is contained in:
233
guarddog_nexus/routes/web.py
Normal file
233
guarddog_nexus/routes/web.py
Normal file
@@ -0,0 +1,233 @@
|
||||
"""Web UI routes — Jinja2 + htmx pages."""
|
||||
|
||||
from urllib.parse import unquote
|
||||
|
||||
from fastapi import APIRouter, Depends, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
from jinja2 import Environment, PackageLoader, select_autoescape
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from ..constants import (
|
||||
APP_PACKAGE,
|
||||
DEFAULT_SORT_BY_PACKAGES,
|
||||
DEFAULT_SORT_BY_SCANS,
|
||||
DEFAULT_SORT_DIR,
|
||||
WEB_PER_PAGE,
|
||||
)
|
||||
from ..db.engine import get_session
|
||||
from ..db.models import Finding, Scan
|
||||
from ..db.queries import (
|
||||
build_package_list_query,
|
||||
build_scan_list_query,
|
||||
get_dashboard_stats,
|
||||
)
|
||||
|
||||
router = APIRouter(tags=["web"])
|
||||
|
||||
_jinja_env = Environment(
|
||||
loader=PackageLoader(APP_PACKAGE, "web/templates"),
|
||||
autoescape=select_autoescape(),
|
||||
)
|
||||
|
||||
|
||||
def _render(name: str, **context) -> HTMLResponse:
|
||||
template = _jinja_env.get_template(name)
|
||||
return HTMLResponse(template.render(**context))
|
||||
|
||||
|
||||
@router.get("/", response_class=HTMLResponse)
|
||||
async def dashboard(request: Request, session: AsyncSession = Depends(get_session)):
|
||||
ctx = await get_dashboard_stats(session)
|
||||
return _render("dashboard.html", **ctx, request=request)
|
||||
|
||||
|
||||
@router.get("/dashboard/stats", response_class=HTMLResponse)
|
||||
async def dashboard_stats_fragment(session: AsyncSession = Depends(get_session)):
|
||||
ctx = await get_dashboard_stats(session)
|
||||
return _render("dashboard_stats.html", **ctx)
|
||||
|
||||
|
||||
@router.get("/scans", response_class=HTMLResponse)
|
||||
async def scans_list(
|
||||
request: Request,
|
||||
page: int = 1,
|
||||
flagged: str = "",
|
||||
search: str = "",
|
||||
status: str = "",
|
||||
sort_by: str = DEFAULT_SORT_BY_SCANS,
|
||||
sort_dir: str = DEFAULT_SORT_DIR,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
per_page = WEB_PER_PAGE
|
||||
offset = (page - 1) * per_page
|
||||
|
||||
flagged_bool = None
|
||||
if flagged == "1":
|
||||
flagged_bool = True
|
||||
|
||||
q, count_q = build_scan_list_query(
|
||||
flagged=flagged_bool,
|
||||
status=status or None,
|
||||
search=search or None,
|
||||
sort_by=sort_by,
|
||||
sort_dir=sort_dir,
|
||||
limit=per_page,
|
||||
offset=offset,
|
||||
)
|
||||
scans = (await session.execute(q)).scalars().all()
|
||||
total = await session.scalar(count_q)
|
||||
|
||||
template = "_scans_table.html" if request.headers.get("HX-Request") else "scans_list.html"
|
||||
|
||||
return _render(
|
||||
template,
|
||||
scans=scans,
|
||||
page=page,
|
||||
per_page=per_page,
|
||||
total=total,
|
||||
flagged_filter=flagged,
|
||||
search=search,
|
||||
status_filter=status,
|
||||
sort_by=sort_by,
|
||||
sort_dir=sort_dir,
|
||||
request=request,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/scans/{scan_id}", response_class=HTMLResponse)
|
||||
async def scan_detail(
|
||||
scan_id: int, request: Request, session: AsyncSession = Depends(get_session)
|
||||
):
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
scan = await session.scalar(
|
||||
select(Scan)
|
||||
.where(Scan.id == scan_id)
|
||||
.options(selectinload(Scan.findings))
|
||||
)
|
||||
if not scan:
|
||||
return HTMLResponse("<h1>Not found</h1>", status_code=404)
|
||||
|
||||
return _render("scan_detail.html", scan=scan, request=request)
|
||||
|
||||
|
||||
@router.get("/packages", response_class=HTMLResponse)
|
||||
async def packages_list(
|
||||
request: Request,
|
||||
page: int = 1,
|
||||
flagged: str = "",
|
||||
search: str = "",
|
||||
sort_by: str = DEFAULT_SORT_BY_PACKAGES,
|
||||
sort_dir: str = DEFAULT_SORT_DIR,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
per_page = WEB_PER_PAGE
|
||||
offset = (page - 1) * per_page
|
||||
|
||||
flagged_bool = None
|
||||
if flagged == "1":
|
||||
flagged_bool = True
|
||||
|
||||
rows_q, total_q = build_package_list_query(
|
||||
flagged=flagged_bool,
|
||||
search=search or None,
|
||||
sort_by=sort_by,
|
||||
sort_dir=sort_dir,
|
||||
limit=per_page,
|
||||
offset=offset,
|
||||
)
|
||||
total = await session.scalar(total_q)
|
||||
rows = (await session.execute(rows_q)).all()
|
||||
|
||||
template = "_packages_table.html" if request.headers.get("HX-Request") else "packages_list.html"
|
||||
|
||||
return _render(
|
||||
template,
|
||||
packages=rows,
|
||||
page=page,
|
||||
per_page=per_page,
|
||||
total=total,
|
||||
flagged_filter=flagged,
|
||||
search=search,
|
||||
sort_by=sort_by,
|
||||
sort_dir=sort_dir,
|
||||
request=request,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/packages/{name:path}", response_class=HTMLResponse)
|
||||
async def package_detail(
|
||||
name: str,
|
||||
request: Request,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
# name:path captures the entire path after /packages/
|
||||
# e.g. "eviltest/0.1.0" or "github.com/attacker/evilmodule/v0.1.0"
|
||||
parts = name.rsplit("/", 1)
|
||||
pkg_name = unquote(parts[0])
|
||||
pkg_version = unquote(parts[1]) if len(parts) == 2 else ""
|
||||
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
scans = (
|
||||
(
|
||||
await session.execute(
|
||||
select(Scan)
|
||||
.where(Scan.package_name == pkg_name, Scan.package_version == pkg_version)
|
||||
.options(selectinload(Scan.findings))
|
||||
.order_by(Scan.started_at.desc())
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
if not scans:
|
||||
return HTMLResponse("<h1>Not found</h1>", status_code=404)
|
||||
|
||||
all_findings = []
|
||||
for s in scans:
|
||||
all_findings.extend(s.findings)
|
||||
|
||||
return _render(
|
||||
"package_detail.html",
|
||||
pkg_name=pkg_name,
|
||||
pkg_version=pkg_version,
|
||||
scans=scans,
|
||||
findings=all_findings,
|
||||
request=request,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/v1/findings/{finding_id}/analyze", response_class=HTMLResponse)
|
||||
async def analyze_finding_htmx(
|
||||
finding_id: int,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""HTMX fragment: trigger LLM analysis and return styled result HTML."""
|
||||
from ..config import config
|
||||
from ..core.llm import analyze_finding
|
||||
|
||||
if not config.llm_enabled:
|
||||
return HTMLResponse(
|
||||
'<div class="llm-actions"><small class="flagged">LLM analysis is disabled</small></div>'
|
||||
)
|
||||
|
||||
finding = await session.scalar(select(Finding).where(Finding.id == finding_id))
|
||||
if not finding:
|
||||
return HTMLResponse(
|
||||
'<div class="llm-actions"><small class="flagged">Finding not found</small></div>',
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
report = await analyze_finding(finding.data)
|
||||
if report is None:
|
||||
return HTMLResponse(
|
||||
'<div class="llm-actions"><small class="flagged">LLM analysis failed</small></div>'
|
||||
)
|
||||
|
||||
finding.report = report
|
||||
await session.commit()
|
||||
|
||||
return _render("_llm_report_fragment.html", report=report)
|
||||
Reference in New Issue
Block a user