154 lines
5.0 KiB
Python
154 lines
5.0 KiB
Python
"""LLM analysis client for GuardDog findings.
|
|
|
|
Supports any OpenAI-compatible API endpoint with configurable model.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
import httpx
|
|
|
|
from ..config import config
|
|
from ..constants import LLM_ANALYSIS_SYSTEM_PROMPT, LLM_DEFAULT_TEMPERATURE, LLM_RESPONSE_FORMAT
|
|
from ..logging_setup import log
|
|
|
|
_llm_semaphore = asyncio.Semaphore(config.llm_max_concurrent)
|
|
|
|
_REPORT_DEFAULTS = {
|
|
"verdict": "unknown",
|
|
"summary": "No summary provided",
|
|
"analysis": "No analysis provided",
|
|
"severity_rating": "unknown",
|
|
}
|
|
|
|
|
|
def _validate_report(report: dict) -> dict:
|
|
result = dict(report)
|
|
for field, default in _REPORT_DEFAULTS.items():
|
|
if not result.get(field):
|
|
result[field] = default
|
|
if result["verdict"] not in ("safe", "suspicious", "malicious", "unknown"):
|
|
result["verdict"] = "unknown"
|
|
return result
|
|
|
|
|
|
def _build_user_message(finding: dict) -> str:
|
|
"""Build a concise prompt from a finding's data."""
|
|
rule = finding.get("rule", "unknown")
|
|
severity = finding.get("severity", "unknown")
|
|
message = finding.get("message", "")
|
|
location = finding.get("location", "")
|
|
code = finding.get("code", "")
|
|
|
|
prompt = f"Rule: {rule}\nSeverity: {severity}\nMessage: {message}\n"
|
|
if location:
|
|
prompt += f"Location: {location}\n"
|
|
if code:
|
|
prompt += f"Code snippet:\n```\n{code}\n```\n"
|
|
|
|
prompt += (
|
|
"\nAnalyse this finding and return JSON with keys: "
|
|
"verdict, summary, analysis, severity_rating."
|
|
)
|
|
return prompt
|
|
|
|
|
|
async def _attempt_llm_call(finding_data: dict) -> dict | None:
|
|
"""Single attempt to call LLM and parse response."""
|
|
url = f"{config.llm_api_base.rstrip('/')}/chat/completions"
|
|
headers = {
|
|
"Authorization": f"Bearer {config.llm_api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": config.llm_model,
|
|
"messages": [
|
|
{"role": "system", "content": LLM_ANALYSIS_SYSTEM_PROMPT},
|
|
{"role": "user", "content": _build_user_message(finding_data)},
|
|
],
|
|
"temperature": LLM_DEFAULT_TEMPERATURE,
|
|
"response_format": {"type": LLM_RESPONSE_FORMAT},
|
|
}
|
|
|
|
try:
|
|
async with _llm_semaphore:
|
|
async with httpx.AsyncClient(timeout=config.llm_timeout, headers=headers) as client:
|
|
resp = await client.post(url, json=payload)
|
|
resp.raise_for_status()
|
|
body = resp.json()
|
|
except httpx.TimeoutException:
|
|
log.error(
|
|
"LLM analysis timed out after %ds for rule=%s",
|
|
config.llm_timeout,
|
|
finding_data.get("rule"),
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
log.warning("LLM analysis failed for rule=%s: %s", finding_data.get("rule"), e)
|
|
return None
|
|
|
|
try:
|
|
choices = body.get("choices", [])
|
|
if not choices:
|
|
raise ValueError("Empty choices list")
|
|
message = choices[0].get("message", {})
|
|
content = message.get("content", "")
|
|
if not content:
|
|
raise ValueError("Empty message content")
|
|
parsed = json.loads(content)
|
|
return _validate_report(parsed)
|
|
except (ValueError, json.JSONDecodeError) as e:
|
|
raw = ""
|
|
try:
|
|
choices = body.get("choices", [])
|
|
if choices:
|
|
message = choices[0].get("message", {})
|
|
raw = message.get("content", "")
|
|
except (KeyError, IndexError):
|
|
raw = str(body)[:300]
|
|
# Some models wrap JSON in markdown code blocks
|
|
if isinstance(raw, str) and raw.strip().startswith("```"):
|
|
try:
|
|
stripped = raw.strip().strip("`").strip()
|
|
if stripped.startswith("json\n"):
|
|
stripped = stripped[5:]
|
|
return _validate_report(json.loads(stripped))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
log.warning(
|
|
"LLM response parse error for rule=%s: %s — raw=%s",
|
|
finding_data.get("rule"),
|
|
e,
|
|
raw[:200] if isinstance(raw, str) else str(raw)[:200],
|
|
)
|
|
return None
|
|
|
|
|
|
async def analyze_finding(finding_data: dict, max_retries: int = 3) -> dict | None:
|
|
"""Send a finding to the LLM for security analysis with retry logic.
|
|
|
|
Returns parsed JSON dict on success, or None on failure.
|
|
"""
|
|
if not config.llm_api_key:
|
|
log.warning("LLM_API_KEY not set — skipping LLM analysis")
|
|
return None
|
|
|
|
for attempt in range(max_retries):
|
|
result = await _attempt_llm_call(finding_data)
|
|
if result is not None:
|
|
return result
|
|
if attempt < max_retries - 1:
|
|
await asyncio.sleep(2**attempt * 2) # 2s, 4s, 8s
|
|
log.info(
|
|
"Retrying LLM analysis for rule=%s (attempt %d)",
|
|
finding_data.get("rule"),
|
|
attempt + 2,
|
|
)
|
|
|
|
log.error(
|
|
"LLM analysis failed after %d attempts for rule=%s",
|
|
max_retries,
|
|
finding_data.get("rule"),
|
|
)
|
|
return None
|