"""Web UI routes — Jinja2 + htmx pages.""" import datetime from fastapi import APIRouter, Depends, Request from fastapi.responses import HTMLResponse from sqlalchemy import Integer, cast, func, select, text from sqlalchemy.ext.asyncio import AsyncSession from guarddog_nexus.database import get_session from guarddog_nexus.models import Finding, Scan router = APIRouter(tags=["web"]) TEMPLATES: dict[str, str] = {} def _render(name: str, **context) -> HTMLResponse: from jinja2 import Environment, PackageLoader, select_autoescape env = Environment( loader=PackageLoader("guarddog_nexus", "web/templates"), autoescape=select_autoescape(), ) template = env.get_template(name) return HTMLResponse(template.render(**context)) @router.get("/", response_class=HTMLResponse) async def dashboard(request: Request, session: AsyncSession = Depends(get_session)): ctx = await _dashboard_data(session) return _render("dashboard.html", **ctx, request=request) @router.get("/dashboard/stats", response_class=HTMLResponse) async def dashboard_stats_fragment(session: AsyncSession = Depends(get_session)): ctx = await _dashboard_data(session) return _render("dashboard_stats.html", **ctx) async def _dashboard_data(session: AsyncSession) -> dict: total_scans = await session.scalar(select(func.count(Scan.id))) flagged_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.flagged == True)) recent_flagged = await session.scalar( select(func.count(Scan.id)).where( Scan.flagged == True, Scan.started_at >= func.datetime("now", "-7 days"), ) ) completed_scans = await session.scalar( select(func.count(Scan.id)).where(Scan.status == "completed") ) failed_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.status == "failed")) total_findings = await session.scalar(select(func.count(Finding.id))) warnings_count = await session.scalar( select(func.count(Finding.id)).where( func.json_extract(Finding.data, "$.severity") == "WARNING" ) ) errors_count = await session.scalar( select(func.count(Finding.id)).where( func.json_extract(Finding.data, "$.severity") == "ERROR" ) ) latest_flagged = ( ( await session.execute( select(Scan).where(Scan.flagged == True).order_by(Scan.started_at.desc()).limit(8) ) ) .scalars() .all() ) latest_scans = ( (await session.execute(select(Scan).order_by(Scan.started_at.desc()).limit(10))) .scalars() .all() ) top_rules = ( await session.execute( select( func.json_extract(Finding.data, "$.rule").label("rule"), func.count(Finding.id).label("cnt"), ) .group_by(text("rule")) .order_by(text("cnt DESC")) .limit(10) ) ).all() most_flagged = ( await session.execute( select( Scan.package_name, Scan.package_version, func.sum(Scan.total_findings).label("total"), func.max(Scan.started_at).label("last_scan"), ) .where(Scan.flagged == True) .group_by(Scan.package_name, Scan.package_version) .order_by(func.sum(Scan.total_findings).desc()) .limit(8) ) ).all() max_findings = max((r.total for r in most_flagged), default=1) # Heatmap: scans per day for last 14 days days_raw = ( await session.execute( select( func.date(Scan.started_at).label("day"), func.count(Scan.id).label("cnt"), func.sum(cast(Scan.flagged, Integer)).label("flagged_cnt"), ) .where(Scan.started_at >= func.datetime("now", "-14 days")) .group_by("day") .order_by("day") ) ).all() return { "total_scans": total_scans or 0, "flagged_scans": flagged_scans or 0, "recent_flagged": recent_flagged or 0, "completed_scans": completed_scans or 0, "failed_scans": failed_scans or 0, "total_findings": total_findings or 0, "warnings_count": warnings_count or 0, "errors_count": errors_count or 0, "latest_flagged": latest_flagged, "latest_scans": latest_scans, "top_rules": [(r.rule, r.cnt) for r in top_rules], "most_flagged": most_flagged, "max_findings": max_findings, "days": [(d.day, d.cnt, d.flagged_cnt) for d in days_raw], "now": datetime.datetime.now(datetime.timezone.utc), } @router.get("/scans", response_class=HTMLResponse) async def scans_list( request: Request, page: int = 1, flagged: str = "", session: AsyncSession = Depends(get_session), ): per_page = 50 offset = (page - 1) * per_page q = select(Scan) if flagged == "1": q = q.where(Scan.flagged == True) q = q.order_by(Scan.started_at.desc()).offset(offset).limit(per_page) scans = (await session.execute(q)).scalars().all() total = await session.scalar(select(func.count(Scan.id))) return _render( "scans_list.html", scans=scans, page=page, per_page=per_page, total=total, flagged_filter=flagged, request=request, ) @router.get("/scans/{scan_id}", response_class=HTMLResponse) async def scan_detail(scan_id: int, request: Request, session: AsyncSession = Depends(get_session)): from sqlalchemy.orm import selectinload scan = await session.scalar( select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings)) ) if not scan: return HTMLResponse("

Not found

", status_code=404) return _render("scan_detail.html", scan=scan, request=request) @router.get("/packages", response_class=HTMLResponse) async def packages_list( request: Request, page: int = 1, flagged: str = "", session: AsyncSession = Depends(get_session), ): per_page = 50 offset = (page - 1) * per_page subq = select( Scan.package_name.label("pkg_name"), Scan.package_version.label("pkg_ver"), Scan.ecosystem, Scan.repository, func.max(Scan.started_at).label("last_scan"), func.max(Scan.flagged).label("is_flagged"), func.sum(Scan.total_findings).label("findings_sum"), func.max(Scan.id).label("sid"), ).group_by(Scan.package_name, Scan.package_version) if flagged == "1": subq = subq.having(func.max(Scan.flagged) == True) subq = subq.subquery() total = await session.scalar(select(func.count()).select_from(subq)) rows = ( await session.execute( select(subq).order_by(subq.c.last_scan.desc()).offset(offset).limit(per_page) ) ).all() return _render( "packages_list.html", packages=rows, page=page, per_page=per_page, total=total, flagged_filter=flagged, request=request, ) @router.get("/packages/{name}/{version}", response_class=HTMLResponse) async def package_detail( name: str, version: str, request: Request, session: AsyncSession = Depends(get_session), ): from sqlalchemy.orm import selectinload scans = ( ( await session.execute( select(Scan) .where(Scan.package_name == name, Scan.package_version == version) .options(selectinload(Scan.findings)) .order_by(Scan.started_at.desc()) ) ) .scalars() .all() ) if not scans: return HTMLResponse("

Not found

", status_code=404) all_findings = [] for s in scans: all_findings.extend(s.findings) return _render( "package_detail.html", pkg_name=name, pkg_version=version, scans=scans, findings=all_findings, request=request, )