"""GuardDog CLI integration via asyncio subprocess.""" import asyncio import json import shutil from guarddog_nexus.config import config from guarddog_nexus.logging_setup import log GUARDDOG_BIN = shutil.which("guarddog") or "guarddog" async def scan_package(filepath: str, ecosystem: str = "pypi") -> dict: """Run guarddog scan on a downloaded package file. Returns normalized dict.""" cmd = [GUARDDOG_BIN, ecosystem, "scan", filepath, "--output-format", "json"] log.info("Running: %s", " ".join(cmd)) try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=config.scan_timeout_seconds ) except asyncio.TimeoutError: log.error("GuardDog scan timed out for %s", filepath) return {"findings": [], "errors": ["timeout"]} except FileNotFoundError: log.error("GuardDog binary not found at %s", GUARDDOG_BIN) return {"findings": [], "errors": ["guarddog_not_found"]} if proc.returncode not in (0, 1): log.error("GuardDog exited %d: %s", proc.returncode, stderr.decode()) return {"findings": [], "errors": [stderr.decode().strip()]} try: data = json.loads(stdout.decode()) except json.JSONDecodeError: log.error("GuardDog returned invalid JSON for %s", filepath) return {"findings": [], "errors": ["json_parse_error"]} return _normalize_output(data) def _normalize_output(data: dict) -> dict: """Normalize guarddog JSON into consistent format. GuardDog v2 JSON: {"package": "...", "issues": N, "errors": {}, "results": {"rule": null|{}|str|list}} Rules mapped as: - null → not applicable, skip - {} → active but no findings, skip - str → metadata finding (description) - list → semgrep findings [{message, location, code}] """ findings = [] results = data.get("results", {}) if isinstance(results, list): results = {} for rule_name, value in results.items(): if value is None: continue if isinstance(value, str): findings.append( { "rule": rule_name, "severity": "WARNING", "message": value, "location": "", "code": "", } ) elif isinstance(value, list): for item in value: if isinstance(item, dict): findings.append( { "rule": rule_name, "severity": item.get("severity", "WARNING"), "message": item.get("message", ""), "location": item.get("location", ""), "code": item.get("code", ""), } ) elif isinstance(value, dict) and not value: continue errors = data.get("errors", {}) if isinstance(errors, dict): errors_list = [f"{k}: {v}" for k, v in errors.items() if v] else: errors_list = errors if isinstance(errors, list) else [] return { "findings": findings, "errors": errors_list, }