"""Web UI routes — Jinja2 + htmx pages."""
from urllib.parse import unquote
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from jinja2 import Environment, PackageLoader, select_autoescape
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from guarddog_nexus.constants import (
APP_PACKAGE,
DEFAULT_SORT_BY_PACKAGES,
DEFAULT_SORT_BY_SCANS,
DEFAULT_SORT_DIR,
WEB_PER_PAGE,
)
from guarddog_nexus.database import get_session
from guarddog_nexus.models import Finding, Scan
from guarddog_nexus.queries import (
build_package_list_query,
build_scan_list_query,
get_dashboard_stats,
)
router = APIRouter(tags=["web"])
_jinja_env = Environment(
loader=PackageLoader(APP_PACKAGE, "web/templates"),
autoescape=select_autoescape(),
)
def _render(name: str, **context) -> HTMLResponse:
template = _jinja_env.get_template(name)
return HTMLResponse(template.render(**context))
@router.get("/", response_class=HTMLResponse)
async def dashboard(request: Request, session: AsyncSession = Depends(get_session)):
ctx = await get_dashboard_stats(session)
return _render("dashboard.html", **ctx, request=request)
@router.get("/dashboard/stats", response_class=HTMLResponse)
async def dashboard_stats_fragment(session: AsyncSession = Depends(get_session)):
ctx = await get_dashboard_stats(session)
return _render("dashboard_stats.html", **ctx)
@router.get("/scans", response_class=HTMLResponse)
async def scans_list(
request: Request,
page: int = 1,
flagged: str = "",
search: str = "",
status: str = "",
sort_by: str = DEFAULT_SORT_BY_SCANS,
sort_dir: str = DEFAULT_SORT_DIR,
session: AsyncSession = Depends(get_session),
):
per_page = WEB_PER_PAGE
offset = (page - 1) * per_page
flagged_bool = None
if flagged == "1":
flagged_bool = True
q, count_q = build_scan_list_query(
flagged=flagged_bool,
status=status or None,
search=search or None,
sort_by=sort_by,
sort_dir=sort_dir,
limit=per_page,
offset=offset,
)
scans = (await session.execute(q)).scalars().all()
total = await session.scalar(count_q)
template = "_scans_table.html" if request.headers.get("HX-Request") else "scans_list.html"
return _render(
template,
scans=scans,
page=page,
per_page=per_page,
total=total,
flagged_filter=flagged,
search=search,
status_filter=status,
sort_by=sort_by,
sort_dir=sort_dir,
request=request,
)
@router.get("/scans/{scan_id}", response_class=HTMLResponse)
async def scan_detail(
scan_id: int, request: Request, session: AsyncSession = Depends(get_session)
):
from sqlalchemy.orm import selectinload
scan = await session.scalar(
select(Scan)
.where(Scan.id == scan_id)
.options(selectinload(Scan.findings))
)
if not scan:
return HTMLResponse("
Not found
", status_code=404)
return _render("scan_detail.html", scan=scan, request=request)
@router.get("/packages", response_class=HTMLResponse)
async def packages_list(
request: Request,
page: int = 1,
flagged: str = "",
search: str = "",
sort_by: str = DEFAULT_SORT_BY_PACKAGES,
sort_dir: str = DEFAULT_SORT_DIR,
session: AsyncSession = Depends(get_session),
):
per_page = WEB_PER_PAGE
offset = (page - 1) * per_page
flagged_bool = None
if flagged == "1":
flagged_bool = True
rows_q, total_q = build_package_list_query(
flagged=flagged_bool,
search=search or None,
sort_by=sort_by,
sort_dir=sort_dir,
limit=per_page,
offset=offset,
)
total = await session.scalar(total_q)
rows = (await session.execute(rows_q)).all()
template = "_packages_table.html" if request.headers.get("HX-Request") else "packages_list.html"
return _render(
template,
packages=rows,
page=page,
per_page=per_page,
total=total,
flagged_filter=flagged,
search=search,
sort_by=sort_by,
sort_dir=sort_dir,
request=request,
)
@router.get("/packages/{name:path}", response_class=HTMLResponse)
async def package_detail(
name: str,
request: Request,
session: AsyncSession = Depends(get_session),
):
# name:path captures the entire path after /packages/
# e.g. "eviltest/0.1.0" or "github.com/attacker/evilmodule/v0.1.0"
parts = name.rsplit("/", 1)
pkg_name = unquote(parts[0])
pkg_version = unquote(parts[1]) if len(parts) == 2 else ""
from sqlalchemy.orm import selectinload
scans = (
(
await session.execute(
select(Scan)
.where(Scan.package_name == pkg_name, Scan.package_version == pkg_version)
.options(selectinload(Scan.findings))
.order_by(Scan.started_at.desc())
)
)
.scalars()
.all()
)
if not scans:
return HTMLResponse("Not found
", status_code=404)
all_findings = []
for s in scans:
all_findings.extend(s.findings)
return _render(
"package_detail.html",
pkg_name=pkg_name,
pkg_version=pkg_version,
scans=scans,
findings=all_findings,
request=request,
)
@router.post("/api/v1/findings/{finding_id}/analyze", response_class=HTMLResponse)
async def analyze_finding_htmx(
finding_id: int,
session: AsyncSession = Depends(get_session),
):
"""HTMX fragment: trigger LLM analysis and return styled result HTML."""
from guarddog_nexus.config import config
from guarddog_nexus.llm import analyze_finding
if not config.llm_enabled:
return HTMLResponse(
'LLM analysis is disabled
'
)
finding = await session.scalar(select(Finding).where(Finding.id == finding_id))
if not finding:
return HTMLResponse(
'Finding not found
',
status_code=404,
)
report = await analyze_finding(finding.data)
if report is None:
return HTMLResponse(
'LLM analysis failed
'
)
finding.report = report
await session.commit()
return _render("_llm_report_fragment.html", report=report)