Files
guarddog-nexus/guarddog_nexus/main.py

181 lines
5.4 KiB
Python

"""GuardDog Nexus — FastAPI application entry point."""
import asyncio
import os
import time
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from starlette.middleware.base import BaseHTTPMiddleware
from guarddog_nexus.config import config
from guarddog_nexus.constants import (
APP_DESCRIPTION,
APP_NAME,
APP_PACKAGE,
APP_VERSION,
STATIC_MOUNT_PATH,
)
from guarddog_nexus.db.engine import init_db
from guarddog_nexus.i18n import DEFAULT_LANG, LANGUAGES
from guarddog_nexus.logging_setup import log
from guarddog_nexus.routes import api_findings, api_packages, api_scans
from guarddog_nexus.routes.metrics import router as metrics_router
from guarddog_nexus.routes.web import router as web_router
from guarddog_nexus.routes.webhooks import router as webhook_router
STATIC_DIR = os.path.join(os.path.dirname(__file__), "web", "static")
class LangMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Cookie takes priority, then query param, then default
cookie_lang = request.cookies.get("lang")
query_lang = request.query_params.get("lang")
if query_lang and query_lang in LANGUAGES and query_lang != cookie_lang:
lang = query_lang
elif cookie_lang and cookie_lang in LANGUAGES:
lang = cookie_lang
else:
lang = DEFAULT_LANG
request.state.lang = lang
response = await call_next(request)
if query_lang and query_lang in LANGUAGES:
response.set_cookie("lang", query_lang, max_age=365 * 24 * 3600, httponly=True)
return response
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
log.info("%s started on %s:%s", APP_NAME, config.host, config.port)
# Start background lock cleanup tasks
asyncio.create_task(_start_lock_cleanup())
yield
log.info("%s shutting down", APP_NAME)
async def _start_lock_cleanup():
"""Start background tasks for cleanup of unused locks."""
from guarddog_nexus.core.harvester import _cleanup_url_locks
from guarddog_nexus.routes.web import _cleanup_llm_locks
asyncio.create_task(_cleanup_url_locks())
asyncio.create_task(_cleanup_llm_locks())
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.monotonic()
response = await call_next(request)
duration = (time.monotonic() - start) * 1000
log.info(
"%s %s %s %.1fms",
request.method,
request.url.path,
response.status_code,
duration,
)
return response
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "no-referrer"
response.headers["Permissions-Policy"] = "geolocation=(), microphone=()"
return response
app = FastAPI(
title=APP_NAME,
version=APP_VERSION,
description=APP_DESCRIPTION,
lifespan=lifespan,
)
app.add_middleware(LangMiddleware)
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(RequestLoggingMiddleware)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
log.exception("Unhandled exception on %s %s", request.method, request.url.path)
return JSONResponse(status_code=500, content={"detail": "Internal server error"})
app.include_router(webhook_router)
app.include_router(metrics_router)
app.include_router(api_scans.router)
app.include_router(api_packages.router)
app.include_router(api_findings.router)
app.include_router(web_router)
if os.path.isdir(STATIC_DIR):
app.mount(STATIC_MOUNT_PATH, StaticFiles(directory=STATIC_DIR), name="static")
@app.get("/health")
async def health() -> dict:
return {"status": "ok", "version": APP_VERSION}
@app.get("/health/dependencies")
async def health_dependencies() -> JSONResponse:
"""Check health of external dependencies."""
checks = {
"database": await _check_db_health(),
"nexus": await _check_nexus_connectivity(),
}
status = 200 if all(checks.values()) else 503
return JSONResponse(status_code=status, content=checks)
async def _check_db_health() -> bool:
"""Check if database is accessible."""
from sqlalchemy import text
try:
from guarddog_nexus.db.engine import _engine
async with _engine.connect() as conn:
await conn.execute(text("SELECT 1"))
return True
except Exception:
return False
async def _check_nexus_connectivity() -> bool:
"""Check if Nexus API is reachable."""
import httpx
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(f"{config.nexus_url.rstrip('/')}/service/rest/v1/status")
return resp.status_code == 200
except Exception:
return False
def main():
uvicorn.run(
f"{APP_PACKAGE}.main:app",
host=config.host,
port=config.port,
log_level=config.log_level.lower(),
reload=False,
)
if __name__ == "__main__":
main()