"""GuardDog Nexus — FastAPI application entry point.""" import asyncio import os import time from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from starlette.middleware.base import BaseHTTPMiddleware from guarddog_nexus.config import config from guarddog_nexus.constants import ( APP_DESCRIPTION, APP_NAME, APP_PACKAGE, APP_VERSION, STATIC_MOUNT_PATH, ) from guarddog_nexus.db.engine import init_db from guarddog_nexus.i18n import DEFAULT_LANG, LANGUAGES from guarddog_nexus.logging_setup import log from guarddog_nexus.routes import api_findings, api_packages, api_scans from guarddog_nexus.routes.metrics import router as metrics_router from guarddog_nexus.routes.web import router as web_router from guarddog_nexus.routes.webhooks import router as webhook_router STATIC_DIR = os.path.join(os.path.dirname(__file__), "web", "static") class LangMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): # Cookie takes priority, then query param, then default cookie_lang = request.cookies.get("lang") query_lang = request.query_params.get("lang") if query_lang and query_lang in LANGUAGES and query_lang != cookie_lang: lang = query_lang elif cookie_lang and cookie_lang in LANGUAGES: lang = cookie_lang else: lang = DEFAULT_LANG request.state.lang = lang response = await call_next(request) if query_lang and query_lang in LANGUAGES: response.set_cookie("lang", query_lang, max_age=365 * 24 * 3600, httponly=True) return response @asynccontextmanager async def lifespan(app: FastAPI): await init_db() log.info("%s started on %s:%s", APP_NAME, config.host, config.port) # Start background lock cleanup tasks asyncio.create_task(_start_lock_cleanup()) yield log.info("%s shutting down", APP_NAME) async def _start_lock_cleanup(): """Start background tasks for cleanup of unused locks.""" from guarddog_nexus.core.harvester import _cleanup_url_locks from guarddog_nexus.routes.web import _cleanup_llm_locks asyncio.create_task(_cleanup_url_locks()) asyncio.create_task(_cleanup_llm_locks()) class RequestLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start = time.monotonic() response = await call_next(request) duration = (time.monotonic() - start) * 1000 log.info( "%s %s %s %.1fms", request.method, request.url.path, response.status_code, duration, ) return response class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "no-referrer" response.headers["Permissions-Policy"] = "geolocation=(), microphone=()" return response app = FastAPI( title=APP_NAME, version=APP_VERSION, description=APP_DESCRIPTION, lifespan=lifespan, ) app.add_middleware(LangMiddleware) app.add_middleware(SecurityHeadersMiddleware) app.add_middleware(RequestLoggingMiddleware) @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): log.exception("Unhandled exception on %s %s", request.method, request.url.path) return JSONResponse(status_code=500, content={"detail": "Internal server error"}) app.include_router(webhook_router) app.include_router(metrics_router) app.include_router(api_scans.router) app.include_router(api_packages.router) app.include_router(api_findings.router) app.include_router(web_router) if os.path.isdir(STATIC_DIR): app.mount(STATIC_MOUNT_PATH, StaticFiles(directory=STATIC_DIR), name="static") @app.get("/health") async def health() -> dict: return {"status": "ok", "version": APP_VERSION} @app.get("/health/dependencies") async def health_dependencies() -> JSONResponse: """Check health of external dependencies.""" checks = { "database": await _check_db_health(), "nexus": await _check_nexus_connectivity(), } status = 200 if all(checks.values()) else 503 return JSONResponse(status_code=status, content=checks) async def _check_db_health() -> bool: """Check if database is accessible.""" from sqlalchemy import text try: from guarddog_nexus.db.engine import _engine async with _engine.connect() as conn: await conn.execute(text("SELECT 1")) return True except Exception: return False async def _check_nexus_connectivity() -> bool: """Check if Nexus API is reachable.""" import httpx try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(f"{config.nexus_url.rstrip('/')}/service/rest/v1/status") return resp.status_code == 200 except Exception: return False def main(): uvicorn.run( f"{APP_PACKAGE}.main:app", host=config.host, port=config.port, log_level=config.log_level.lower(), reload=False, ) if __name__ == "__main__": main()