"""GuardDog CLI integration via asyncio subprocess.""" import asyncio import json from ..config import config from ..constants import ( DEFAULT_ECOSYSTEM, DEFAULT_FINDING_SEVERITY, GUARDDOG_OUTPUT_FORMAT, GUARDDOG_OUTPUT_KEY, GUARDDOG_RESULTS_KEY, SCAN_ERROR_BINARY_NOT_FOUND, SCAN_ERROR_JSON_PARSE, SCAN_ERROR_TIMEOUT, ) from ..logging_setup import log async def scan_package(filepath: str, ecosystem: str = DEFAULT_ECOSYSTEM) -> dict: """Run guarddog scan on a downloaded package file. Returns normalized dict.""" guarddog_bin = config.guarddog_binary cmd = [guarddog_bin, ecosystem, "scan", filepath, GUARDDOG_OUTPUT_KEY, GUARDDOG_OUTPUT_FORMAT] log.info("Running: %s", " ".join(cmd)) try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=config.scan_timeout_seconds ) except asyncio.TimeoutError: log.error("GuardDog scan timed out for %s", filepath) return {"findings": [], "errors": [SCAN_ERROR_TIMEOUT]} except FileNotFoundError: log.error("GuardDog binary not found at %s", guarddog_bin) return {"findings": [], "errors": [SCAN_ERROR_BINARY_NOT_FOUND]} if proc.returncode not in (0, 1): log.error("GuardDog exited %d: %s", proc.returncode, stderr.decode()) return {"findings": [], "errors": [stderr.decode().strip()]} if proc.returncode == 1 and stderr: log.warning("GuardDog stderr (exit 1): %s", stderr.decode().strip()) try: data = json.loads(stdout.decode()) except json.JSONDecodeError: log.error("GuardDog returned invalid JSON for %s", filepath) return {"findings": [], "errors": [SCAN_ERROR_JSON_PARSE]} return _normalize_output(data) def _normalize_output(data: dict) -> dict: """Normalize guarddog JSON into consistent format. GuardDog v2 JSON: {"package": "...", "issues": N, "errors": {}, "results": {"rule": null|{}|str|list}} Rules mapped as: - null → not applicable, skip - {} → active but no findings, skip - str → metadata finding (description) - list → semgrep findings [{message, location, code}] """ findings = [] results = data.get(GUARDDOG_RESULTS_KEY, {}) if isinstance(results, list): results = {} for rule_name, value in results.items(): if value is None: continue if isinstance(value, str): findings.append( { "rule": rule_name, "severity": DEFAULT_FINDING_SEVERITY, "message": value, "location": "", "code": "", } ) elif isinstance(value, list): for item in value: if isinstance(item, dict): findings.append( { "rule": rule_name, "severity": item.get("severity", DEFAULT_FINDING_SEVERITY), "message": item.get("message", ""), "location": item.get("location", ""), "code": item.get("code", ""), } ) elif isinstance(value, dict) and not value: continue elif isinstance(value, dict): # Non-empty dict — treat as a single finding findings.append( { "rule": rule_name, "severity": value.get("severity", DEFAULT_FINDING_SEVERITY), "message": value.get("message", ""), "location": value.get("location", ""), "code": value.get("code", ""), } ) errors = data.get("errors", {}) if isinstance(errors, dict): errors_list = [f"{k}: {v}" for k, v in errors.items() if v] else: errors_list = errors if isinstance(errors, list) else [] return { "findings": findings, "errors": errors_list, }