"""LLM analysis client for GuardDog findings. Supports any OpenAI-compatible API endpoint with configurable model. """ import json import httpx from ..config import config from ..constants import LLM_ANALYSIS_SYSTEM_PROMPT from ..logging_setup import log def _build_user_message(finding: dict) -> str: """Build a concise prompt from a finding's data.""" rule = finding.get("rule", "unknown") severity = finding.get("severity", "unknown") message = finding.get("message", "") location = finding.get("location", "") code = finding.get("code", "") prompt = ( f"Rule: {rule}\n" f"Severity: {severity}\n" f"Message: {message}\n" ) if location: prompt += f"Location: {location}\n" if code: prompt += f"Code snippet:\n```\n{code}\n```\n" prompt += ( "\nAnalyse this finding and return JSON with keys: " "verdict, summary, analysis, severity_rating." ) return prompt async def analyze_finding(finding_data: dict) -> dict | None: """Send a finding to the LLM for security analysis. Returns parsed JSON dict on success, or None on failure. """ if not config.llm_api_key: log.warning("LLM_API_KEY not set — skipping LLM analysis") return None url = f"{config.llm_api_base.rstrip('/')}/chat/completions" headers = { "Authorization": f"Bearer {config.llm_api_key}", "Content-Type": "application/json", } payload = { "model": config.llm_model, "messages": [ {"role": "system", "content": LLM_ANALYSIS_SYSTEM_PROMPT}, {"role": "user", "content": _build_user_message(finding_data)}, ], "temperature": 0.3, "response_format": {"type": "json_object"}, } try: async with httpx.AsyncClient( timeout=config.llm_timeout, headers=headers ) as client: resp = await client.post(url, json=payload) resp.raise_for_status() body = resp.json() except httpx.TimeoutException: log.error( "LLM analysis timed out after %ds for rule=%s", config.llm_timeout, finding_data.get("rule"), ) return None except Exception as e: log.warning("LLM analysis failed for rule=%s: %s", finding_data.get("rule"), e) return None try: content = body["choices"][0]["message"]["content"] return json.loads(content) except (KeyError, IndexError, json.JSONDecodeError) as e: raw = "" try: raw = body["choices"][0]["message"]["content"] except (KeyError, IndexError): raw = str(body)[:300] # Some models wrap JSON in markdown code blocks if isinstance(raw, str) and raw.strip().startswith("```"): try: stripped = raw.strip().strip("`").strip() if stripped.startswith("json\n"): stripped = stripped[5:] return json.loads(stripped) except json.JSONDecodeError: pass log.warning( "LLM response parse error for rule=%s: %s — raw=%s", finding_data.get("rule"), e, raw[:200] if isinstance(raw, str) else str(raw)[:200], ) return None