327 lines
9.4 KiB
Python
327 lines
9.4 KiB
Python
"""Tests for LLM analysis — endpoint and client."""
|
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from guarddog_nexus.db.models import Finding
|
|
|
|
|
|
@pytest.fixture
|
|
async def sample_finding(db_session):
|
|
from guarddog_nexus.constants import SEVERITY_WARNING
|
|
|
|
finding = Finding(
|
|
scan_id=1,
|
|
data={
|
|
"rule": "shady-links",
|
|
"severity": SEVERITY_WARNING,
|
|
"message": "Suspicious URL",
|
|
"location": "setup.py:15",
|
|
"code": "url = 'http://evil.com'",
|
|
},
|
|
)
|
|
db_session.add(finding)
|
|
await db_session.commit()
|
|
await db_session.refresh(finding)
|
|
return finding
|
|
|
|
|
|
@pytest.fixture
|
|
async def sample_finding_with_report(db_session):
|
|
from guarddog_nexus.constants import SEVERITY_WARNING
|
|
|
|
report = {
|
|
"verdict": "safe",
|
|
"summary": "ok",
|
|
"analysis": "all good",
|
|
"severity_rating": "low",
|
|
}
|
|
finding = Finding(
|
|
scan_id=1,
|
|
data={"rule": "test", "severity": SEVERITY_WARNING, "message": "test"},
|
|
report=report,
|
|
)
|
|
db_session.add(finding)
|
|
await db_session.commit()
|
|
await db_session.refresh(finding)
|
|
return finding
|
|
|
|
|
|
# --- T4: analyze_finding() client ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_finding_no_api_key():
|
|
from guarddog_nexus.core.llm import analyze_finding
|
|
|
|
result = await analyze_finding({"rule": "test", "severity": "WARNING"})
|
|
assert result is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_finding_timeout():
|
|
import guarddog_nexus.config
|
|
from guarddog_nexus.core.llm import analyze_finding
|
|
|
|
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
|
guarddog_nexus.config.config.llm_timeout = 1
|
|
|
|
import httpx
|
|
|
|
with patch("httpx.AsyncClient.post", side_effect=httpx.TimeoutException("timeout")):
|
|
result = await analyze_finding({"rule": "test", "severity": "WARNING"})
|
|
assert result is None
|
|
|
|
guarddog_nexus.config.config.llm_api_key = ""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_finding_api_error():
|
|
import guarddog_nexus.config
|
|
from guarddog_nexus.core.llm import analyze_finding
|
|
|
|
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
|
guarddog_nexus.config.config.llm_timeout = 30
|
|
|
|
with patch("httpx.AsyncClient.post", side_effect=Exception("connection refused")):
|
|
result = await analyze_finding({"rule": "test", "severity": "WARNING"})
|
|
assert result is None
|
|
|
|
guarddog_nexus.config.config.llm_api_key = ""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_finding_success():
|
|
import guarddog_nexus.config
|
|
from guarddog_nexus.core.llm import analyze_finding
|
|
|
|
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
|
guarddog_nexus.config.config.llm_timeout = 30
|
|
|
|
mock_resp = MagicMock()
|
|
mock_resp.raise_for_status.return_value = None
|
|
mock_resp.json.return_value = {
|
|
"choices": [
|
|
{
|
|
"message": {
|
|
"content": '{"verdict":"safe","summary":"ok",'
|
|
'"analysis":"fine","severity_rating":"low"}',
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
with patch("guarddog_nexus.core.llm.httpx.AsyncClient.post", return_value=mock_resp):
|
|
result = await analyze_finding({"rule": "test"})
|
|
assert result is not None
|
|
assert result["verdict"] == "safe"
|
|
assert result["severity_rating"] == "low"
|
|
|
|
guarddog_nexus.config.config.llm_api_key = ""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_finding_markdown_unwrap():
|
|
import guarddog_nexus.config
|
|
from guarddog_nexus.core.llm import analyze_finding
|
|
|
|
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
|
|
|
mock_resp = MagicMock()
|
|
mock_resp.raise_for_status.return_value = None
|
|
mock_resp.json.return_value = {
|
|
"choices": [
|
|
{
|
|
"message": {
|
|
"content": '```json\n{"verdict":"suspicious","summary":"hm",'
|
|
'"analysis":"...","severity_rating":"medium"}\n```',
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
with patch("guarddog_nexus.core.llm.httpx.AsyncClient.post", return_value=mock_resp):
|
|
result = await analyze_finding({"rule": "test"})
|
|
assert result is not None
|
|
assert result["verdict"] == "suspicious"
|
|
|
|
guarddog_nexus.config.config.llm_api_key = ""
|
|
|
|
|
|
# --- T1: analyze_finding_htmx endpoint ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_endpoint_llm_disabled(client, sample_finding):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "disabled" in resp.text.lower()
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_endpoint_not_found(client):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.llm_enabled = True
|
|
|
|
resp = await client.post("/api/v1/findings/99999/analyze")
|
|
assert resp.status_code == 404
|
|
assert "not found" in resp.text.lower()
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_endpoint_idempotent_already_analyzed(client, sample_finding_with_report):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.llm_enabled = True
|
|
|
|
resp = await client.post(f"/api/v1/findings/{sample_finding_with_report.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "safe" in resp.text
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_endpoint_success(client, sample_finding):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.llm_enabled = True
|
|
|
|
fake_report = {
|
|
"verdict": "malicious",
|
|
"summary": "bad",
|
|
"analysis": "evil",
|
|
"severity_rating": "critical",
|
|
}
|
|
|
|
async def mock_analyze(data):
|
|
return fake_report
|
|
|
|
with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze):
|
|
resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "malicious" in resp.text
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_endpoint_failure(client, sample_finding):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.llm_enabled = True
|
|
|
|
async def mock_analyze(data):
|
|
return None
|
|
|
|
with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze):
|
|
resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "failed" in resp.text.lower()
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
|
|
# --- GET /analyze polling endpoint ---
|
|
|
|
|
|
class TestAnalyzeStatusEndpoint:
|
|
@pytest.mark.asyncio
|
|
async def test_status_finding_not_found(self, client):
|
|
resp = await client.get("/api/v1/findings/99999/analyze")
|
|
assert resp.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_returns_report_when_complete(self, client, sample_finding_with_report):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.llm_enabled = True
|
|
|
|
resp = await client.get(f"/api/v1/findings/{sample_finding_with_report.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "safe" in resp.text
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_returns_spinner_when_no_report(self, client, sample_finding):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.llm_enabled = True
|
|
|
|
resp = await client.get(f"/api/v1/findings/{sample_finding.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "hx-get" in resp.text.lower()
|
|
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_returns_spinner_when_analyzing(self, client, db_session, sample_finding):
|
|
from sqlalchemy import select
|
|
|
|
from guarddog_nexus.db.models import Finding
|
|
|
|
finding = await db_session.scalar(select(Finding).where(Finding.id == sample_finding.id))
|
|
finding.report = {"status": "analyzing"}
|
|
await db_session.commit()
|
|
|
|
resp = await client.get(f"/api/v1/findings/{sample_finding.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "hx-get" in resp.text.lower()
|
|
|
|
|
|
# --- LLM retry exhaustion ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_finding_exhausts_all_retries():
|
|
import guarddog_nexus.config
|
|
from guarddog_nexus.core.llm import analyze_finding
|
|
|
|
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
|
|
|
with patch("guarddog_nexus.core.llm._attempt_llm_call", return_value=None):
|
|
with patch("guarddog_nexus.core.llm.asyncio.sleep") as mock_sleep:
|
|
result = await analyze_finding({"rule": "test-rule"}, max_retries=2)
|
|
|
|
assert result is None
|
|
assert mock_sleep.call_count == 1
|
|
|
|
guarddog_nexus.config.config.llm_api_key = ""
|
|
|
|
|
|
# --- LLM lock cleanup ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_llm_locks_removes_unlocked():
|
|
import asyncio
|
|
|
|
from guarddog_nexus.routes.web import _llm_lock, _llm_locks
|
|
|
|
async with _llm_lock:
|
|
_llm_locks[100] = asyncio.Lock()
|
|
_llm_locks[200] = asyncio.Lock()
|
|
|
|
await _llm_locks[100].acquire()
|
|
|
|
for key in list(_llm_locks.keys()):
|
|
if not _llm_locks[key].locked():
|
|
_llm_locks.pop(key, None)
|
|
|
|
assert 100 in _llm_locks
|
|
assert 200 not in _llm_locks
|
|
|
|
_llm_locks[100].release()
|
|
_llm_locks.clear()
|