Files
guarddog-nexus/guarddog_nexus/web/routes.py

259 lines
7.8 KiB
Python

"""Web UI routes — Jinja2 + htmx pages."""
import datetime
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from sqlalchemy import Integer, cast, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from guarddog_nexus.database import get_session
from guarddog_nexus.models import Finding, Scan
router = APIRouter(tags=["web"])
TEMPLATES: dict[str, str] = {}
def _render(name: str, **context) -> HTMLResponse:
from jinja2 import Environment, PackageLoader, select_autoescape
env = Environment(
loader=PackageLoader("guarddog_nexus", "web/templates"),
autoescape=select_autoescape(),
)
template = env.get_template(name)
return HTMLResponse(template.render(**context))
@router.get("/", response_class=HTMLResponse)
async def dashboard(request: Request, session: AsyncSession = Depends(get_session)):
ctx = await _dashboard_data(session)
return _render("dashboard.html", **ctx, request=request)
@router.get("/dashboard/stats", response_class=HTMLResponse)
async def dashboard_stats_fragment(session: AsyncSession = Depends(get_session)):
ctx = await _dashboard_data(session)
return _render("dashboard_stats.html", **ctx)
async def _dashboard_data(session: AsyncSession) -> dict:
total_scans = await session.scalar(select(func.count(Scan.id)))
flagged_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.flagged == True))
recent_flagged = await session.scalar(
select(func.count(Scan.id)).where(
Scan.flagged == True,
Scan.started_at >= func.datetime("now", "-7 days"),
)
)
completed_scans = await session.scalar(
select(func.count(Scan.id)).where(Scan.status == "completed")
)
failed_scans = await session.scalar(select(func.count(Scan.id)).where(Scan.status == "failed"))
total_findings = await session.scalar(select(func.count(Finding.id)))
warnings_count = await session.scalar(
select(func.count(Finding.id)).where(Finding.severity == "WARNING")
)
errors_count = await session.scalar(
select(func.count(Finding.id)).where(Finding.severity == "ERROR")
)
latest_flagged = (
(
await session.execute(
select(Scan).where(Scan.flagged == True).order_by(Scan.started_at.desc()).limit(8)
)
)
.scalars()
.all()
)
latest_scans = (
(await session.execute(select(Scan).order_by(Scan.started_at.desc()).limit(10)))
.scalars()
.all()
)
top_rules = (
await session.execute(
select(Finding.rule, func.count(Finding.id).label("cnt"))
.group_by(Finding.rule)
.order_by(func.count(Finding.id).desc())
.limit(10)
)
).all()
most_flagged = (
await session.execute(
select(
Scan.package_name,
Scan.package_version,
func.sum(Scan.total_findings).label("total"),
func.max(Scan.started_at).label("last_scan"),
)
.where(Scan.flagged == True)
.group_by(Scan.package_name, Scan.package_version)
.order_by(func.sum(Scan.total_findings).desc())
.limit(8)
)
).all()
max_findings = max((r.total for r in most_flagged), default=1)
# Heatmap: scans per day for last 14 days
days_raw = (
await session.execute(
select(
func.date(Scan.started_at).label("day"),
func.count(Scan.id).label("cnt"),
func.sum(cast(Scan.flagged, Integer)).label("flagged_cnt"),
)
.where(Scan.started_at >= func.datetime("now", "-14 days"))
.group_by("day")
.order_by("day")
)
).all()
return {
"total_scans": total_scans or 0,
"flagged_scans": flagged_scans or 0,
"recent_flagged": recent_flagged or 0,
"completed_scans": completed_scans or 0,
"failed_scans": failed_scans or 0,
"total_findings": total_findings or 0,
"warnings_count": warnings_count or 0,
"errors_count": errors_count or 0,
"latest_flagged": latest_flagged,
"latest_scans": latest_scans,
"top_rules": [(r.rule, r.cnt) for r in top_rules],
"most_flagged": most_flagged,
"max_findings": max_findings,
"days": [(d.day, d.cnt, d.flagged_cnt) for d in days_raw],
"now": datetime.datetime.now(datetime.timezone.utc),
}
@router.get("/scans", response_class=HTMLResponse)
async def scans_list(
request: Request,
page: int = 1,
flagged: str = "",
session: AsyncSession = Depends(get_session),
):
per_page = 50
offset = (page - 1) * per_page
q = select(Scan)
if flagged == "1":
q = q.where(Scan.flagged == True)
q = q.order_by(Scan.started_at.desc()).offset(offset).limit(per_page)
scans = (await session.execute(q)).scalars().all()
total = await session.scalar(select(func.count(Scan.id)))
return _render(
"scans_list.html",
scans=scans,
page=page,
per_page=per_page,
total=total,
flagged_filter=flagged,
request=request,
)
@router.get("/scans/{scan_id}", response_class=HTMLResponse)
async def scan_detail(scan_id: int, request: Request, session: AsyncSession = Depends(get_session)):
from sqlalchemy.orm import selectinload
scan = await session.scalar(
select(Scan).where(Scan.id == scan_id).options(selectinload(Scan.findings))
)
if not scan:
return HTMLResponse("<h1>Not found</h1>", status_code=404)
return _render("scan_detail.html", scan=scan, request=request)
@router.get("/packages", response_class=HTMLResponse)
async def packages_list(
request: Request,
page: int = 1,
flagged: str = "",
session: AsyncSession = Depends(get_session),
):
per_page = 50
offset = (page - 1) * per_page
subq = select(
Scan.package_name.label("pkg_name"),
Scan.package_version.label("pkg_ver"),
Scan.ecosystem,
Scan.repository,
func.max(Scan.started_at).label("last_scan"),
func.max(Scan.flagged).label("is_flagged"),
func.sum(Scan.total_findings).label("findings_sum"),
func.max(Scan.id).label("sid"),
).group_by(Scan.package_name, Scan.package_version)
if flagged == "1":
subq = subq.having(func.max(Scan.flagged) == True)
subq = subq.subquery()
total = await session.scalar(select(func.count()).select_from(subq))
rows = (
await session.execute(
select(subq).order_by(subq.c.last_scan.desc()).offset(offset).limit(per_page)
)
).all()
return _render(
"packages_list.html",
packages=rows,
page=page,
per_page=per_page,
total=total,
flagged_filter=flagged,
request=request,
)
@router.get("/packages/{name}/{version}", response_class=HTMLResponse)
async def package_detail(
name: str,
version: str,
request: Request,
session: AsyncSession = Depends(get_session),
):
from sqlalchemy.orm import selectinload
scans = (
(
await session.execute(
select(Scan)
.where(Scan.package_name == name, Scan.package_version == version)
.options(selectinload(Scan.findings))
.order_by(Scan.started_at.desc())
)
)
.scalars()
.all()
)
if not scans:
return HTMLResponse("<h1>Not found</h1>", status_code=404)
all_findings = []
for s in scans:
all_findings.extend(s.findings)
return _render(
"package_detail.html",
pkg_name=name,
pkg_version=version,
scans=scans,
findings=all_findings,
request=request,
)