"""Tests for LLM analysis — endpoint and client.""" from unittest.mock import MagicMock, patch import pytest from guarddog_nexus.db.models import Finding @pytest.fixture async def sample_finding(db_session): from guarddog_nexus.constants import SEVERITY_WARNING finding = Finding( scan_id=1, data={ "rule": "shady-links", "severity": SEVERITY_WARNING, "message": "Suspicious URL", "location": "setup.py:15", "code": "url = 'http://evil.com'", }, ) db_session.add(finding) await db_session.commit() await db_session.refresh(finding) return finding @pytest.fixture async def sample_finding_with_report(db_session): from guarddog_nexus.constants import SEVERITY_WARNING report = { "verdict": "safe", "summary": "ok", "analysis": "all good", "severity_rating": "low", } finding = Finding( scan_id=1, data={"rule": "test", "severity": SEVERITY_WARNING, "message": "test"}, report=report, ) db_session.add(finding) await db_session.commit() await db_session.refresh(finding) return finding # --- T4: analyze_finding() client --- @pytest.mark.asyncio async def test_analyze_finding_no_api_key(): from guarddog_nexus.core.llm import analyze_finding result = await analyze_finding({"rule": "test", "severity": "WARNING"}) assert result is None @pytest.mark.asyncio async def test_analyze_finding_timeout(): import guarddog_nexus.config from guarddog_nexus.core.llm import analyze_finding guarddog_nexus.config.config.llm_api_key = "sk-test" guarddog_nexus.config.config.llm_timeout = 1 import httpx with patch("httpx.AsyncClient.post", side_effect=httpx.TimeoutException("timeout")): result = await analyze_finding({"rule": "test", "severity": "WARNING"}) assert result is None guarddog_nexus.config.config.llm_api_key = "" @pytest.mark.asyncio async def test_analyze_finding_api_error(): import guarddog_nexus.config from guarddog_nexus.core.llm import analyze_finding guarddog_nexus.config.config.llm_api_key = "sk-test" guarddog_nexus.config.config.llm_timeout = 30 with patch("httpx.AsyncClient.post", side_effect=Exception("connection refused")): result = await analyze_finding({"rule": "test", "severity": "WARNING"}) assert result is None guarddog_nexus.config.config.llm_api_key = "" @pytest.mark.asyncio async def test_analyze_finding_success(): import guarddog_nexus.config from guarddog_nexus.core.llm import analyze_finding guarddog_nexus.config.config.llm_api_key = "sk-test" guarddog_nexus.config.config.llm_timeout = 30 mock_resp = MagicMock() mock_resp.raise_for_status.return_value = None mock_resp.json.return_value = { "choices": [ { "message": { "content": '{"verdict":"safe","summary":"ok",' '"analysis":"fine","severity_rating":"low"}', } } ] } with patch("guarddog_nexus.core.llm.httpx.AsyncClient.post", return_value=mock_resp): result = await analyze_finding({"rule": "test"}) assert result is not None assert result["verdict"] == "safe" assert result["severity_rating"] == "low" guarddog_nexus.config.config.llm_api_key = "" @pytest.mark.asyncio async def test_analyze_finding_markdown_unwrap(): import guarddog_nexus.config from guarddog_nexus.core.llm import analyze_finding guarddog_nexus.config.config.llm_api_key = "sk-test" mock_resp = MagicMock() mock_resp.raise_for_status.return_value = None mock_resp.json.return_value = { "choices": [ { "message": { "content": '```json\n{"verdict":"suspicious","summary":"hm",' '"analysis":"...","severity_rating":"medium"}\n```', } } ] } with patch("guarddog_nexus.core.llm.httpx.AsyncClient.post", return_value=mock_resp): result = await analyze_finding({"rule": "test"}) assert result is not None assert result["verdict"] == "suspicious" guarddog_nexus.config.config.llm_api_key = "" # --- T1: analyze_finding_htmx endpoint --- @pytest.mark.asyncio async def test_analyze_endpoint_llm_disabled(client, sample_finding): import guarddog_nexus.config guarddog_nexus.config.config.llm_enabled = False resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze") assert resp.status_code == 200 assert "disabled" in resp.text.lower() guarddog_nexus.config.config.llm_enabled = False @pytest.mark.asyncio async def test_analyze_endpoint_not_found(client): import guarddog_nexus.config guarddog_nexus.config.config.llm_enabled = True resp = await client.post("/api/v1/findings/99999/analyze") assert resp.status_code == 404 assert "not found" in resp.text.lower() guarddog_nexus.config.config.llm_enabled = False @pytest.mark.asyncio async def test_analyze_endpoint_idempotent_already_analyzed(client, sample_finding_with_report): import guarddog_nexus.config guarddog_nexus.config.config.llm_enabled = True resp = await client.post(f"/api/v1/findings/{sample_finding_with_report.id}/analyze") assert resp.status_code == 200 assert "safe" in resp.text guarddog_nexus.config.config.llm_enabled = False @pytest.mark.asyncio async def test_analyze_endpoint_success(client, sample_finding): import guarddog_nexus.config guarddog_nexus.config.config.llm_enabled = True fake_report = { "verdict": "malicious", "summary": "bad", "analysis": "evil", "severity_rating": "critical", } async def mock_analyze(data): return fake_report with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze): resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze") assert resp.status_code == 200 assert "malicious" in resp.text guarddog_nexus.config.config.llm_enabled = False @pytest.mark.asyncio async def test_analyze_endpoint_failure(client, sample_finding): import guarddog_nexus.config guarddog_nexus.config.config.llm_enabled = True async def mock_analyze(data): return None with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze): resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze") assert resp.status_code == 200 assert "failed" in resp.text.lower() guarddog_nexus.config.config.llm_enabled = False