"""LLM analysis client for GuardDog findings. Supports any OpenAI-compatible API endpoint with configurable model. """ import json import httpx from guarddog_nexus.config import config from guarddog_nexus.constants import LLM_ANALYSIS_SYSTEM_PROMPT from guarddog_nexus.logging_setup import log def _build_user_message(finding: dict) -> str: """Build a concise prompt from a finding's data.""" rule = finding.get("rule", "unknown") severity = finding.get("severity", "unknown") message = finding.get("message", "") location = finding.get("location", "") code = finding.get("code", "") prompt = ( f"Rule: {rule}\n" f"Severity: {severity}\n" f"Message: {message}\n" ) if location: prompt += f"Location: {location}\n" if code: prompt += f"Code snippet:\n```\n{code}\n```\n" prompt += ( "\nAnalyse this finding and return JSON with keys: " "verdict, summary, analysis, severity_rating." ) return prompt async def analyze_finding(finding_data: dict) -> dict | None: """Send a finding to the LLM for security analysis. Returns parsed JSON dict on success, or None on failure. """ if not config.llm_api_key: log.warning("LLM_API_KEY not set — skipping LLM analysis") return None url = f"{config.llm_api_base.rstrip('/')}/chat/completions" headers = { "Authorization": f"Bearer {config.llm_api_key}", "Content-Type": "application/json", } payload = { "model": config.llm_model, "messages": [ {"role": "system", "content": LLM_ANALYSIS_SYSTEM_PROMPT}, {"role": "user", "content": _build_user_message(finding_data)}, ], "temperature": 0.3, "response_format": {"type": "json_object"}, } try: async with httpx.AsyncClient( timeout=config.llm_timeout, headers=headers ) as client: resp = await client.post(url, json=payload) resp.raise_for_status() body = resp.json() except httpx.TimeoutException: log.error( "LLM analysis timed out after %ds for rule=%s", config.llm_timeout, finding_data.get("rule"), ) return None except Exception as e: log.warning("LLM analysis failed for rule=%s: %s", finding_data.get("rule"), e) return None try: content = body["choices"][0]["message"]["content"] return json.loads(content) except (KeyError, IndexError, json.JSONDecodeError) as e: log.warning( "LLM response parse error for rule=%s: %s", finding_data.get("rule"), e, ) return None