"""Nexus webhook receiver — handles component/asset webhooks.""" import hashlib import hmac import json import re from fastapi import APIRouter, BackgroundTasks, Header, HTTPException, Request, status from guarddog_nexus.config import config from guarddog_nexus.database import get_session from guarddog_nexus.harvester import harvest from guarddog_nexus.logging_setup import log router = APIRouter(prefix="/webhooks", tags=["webhooks"]) RELEVANT_ACTIONS = {"CREATED", "UPDATED"} METADATA_PATTERNS = [ re.compile(p) for p in [ r"^/?simple/", r"\.html$", r"\.json$", r"\.xml$", r"/?index\.", r"\.rss$", r"\.atom$", ] ] PACKAGE_EXTENSIONS = (".tar.gz", ".tgz", ".whl", ".zip", ".gem") def _is_package_asset(name: str) -> bool: for pat in METADATA_PATTERNS: if pat.search(name): return False return name.endswith(PACKAGE_EXTENSIONS) def _build_download_url(repo: str, asset_path: str) -> str: base = config.nexus_url.rstrip("/") asset_path = asset_path.lstrip("/") return f"{base}/repository/{repo}/{asset_path}" def _extract_asset_path(asset: dict) -> str | None: for key in ("path", "name"): val = asset.get(key) if val: return val return None @router.post("/nexus") async def nexus_webhook( request: Request, background_tasks: BackgroundTasks, x_nexus_webhook_signature: str | None = Header(None, alias="X-Nexus-Webhook-Signature"), ): payload = await request.body() payload_str = payload.decode("utf-8") if config.webhook_secret: if not x_nexus_webhook_signature: log.warning("Webhook rejected: missing signature header") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing signature" ) expected = hmac.new(config.webhook_secret.encode(), payload, hashlib.sha256).hexdigest() if not hmac.compare_digest(x_nexus_webhook_signature, expected): log.warning("Webhook rejected: invalid signature") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid signature") try: data = json.loads(payload_str) except json.JSONDecodeError: log.warning("Webhook received invalid JSON") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON") action = data.get("action", "").upper() if action not in RELEVANT_ACTIONS: return {"status": "ignored", "action": action} repository = data.get("repositoryName", "") asset = data.get("asset") component = data.get("component") if asset: asset_path = _extract_asset_path(asset) if not asset_path or not _is_package_asset(asset_path): return {"status": "ignored", "reason": "non_package_asset"} download_url = asset.get("downloadUrl") or _build_download_url(repository, asset_path) log.info("Webhook: %s asset %s in %s", action, asset_path, repository) background_tasks.add_task(_scan_in_background, download_url, repository, "pypi", asset_path) return {"status": "accepted", "asset": asset_path, "action": action} if component: name = component.get("name", "") version = component.get("version", "") if not name or not version: return {"status": "ignored", "reason": "no_name_or_version"} # For component events, look up assets via Nexus REST API background_tasks.add_task(_scan_component, repository, name, version) return {"status": "accepted", "component": f"{name}=={version}", "action": action} return {"status": "ignored", "reason": "no_asset_or_component"} async def _scan_component(repository: str, name: str, version: str): """Look up component assets via Nexus API, then scan each package file.""" import subprocess api_url = ( f"{config.nexus_url.rstrip('/')}/service/rest/v1/search" f"?repository={repository}&name={name}&version={version}&format=pypi" ) try: result = subprocess.run( ["curl", "-sf", "-u", f"{config.nexus_username}:{config.nexus_password}", api_url], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: log.warning("Component lookup failed for %s==%s: %s", name, version, result.stderr) return data = json.loads(result.stdout) except Exception as e: log.warning("Component lookup error for %s==%s: %s", name, version, e) return items = data.get("items", []) if not items: log.warning("No items found in search for %s==%s", name, version) return for item in items: for asset in item.get("assets", []): asset_path = _extract_asset_path(asset) if not asset_path or not _is_package_asset(asset_path): continue download_url = asset.get("downloadUrl") or _build_download_url(repository, asset_path) log.info("Scanning component asset: %s", asset_path) async for session in get_session(): await harvest(download_url, repository, "pypi", asset_path, session) break async def _scan_in_background( download_url: str, repository: str, format_: str, asset_path: str, ): try: async for session in get_session(): await harvest(download_url, repository, format_, asset_path, session) break except Exception as e: log.error("Background scan failed: %s", e)