Files
guarddog-nexus/tests/e2e/test_llm_and_edge_cases.py

236 lines
8.6 KiB
Python

"""E2E tests for LLM analysis flow and edge cases."""
from unittest.mock import patch
import pytest
class TestLlmAnalysisE2e:
"""End-to-end tests for LLM analysis functionality."""
@pytest.fixture
async def finding_with_id(self, e2e_db_session):
"""Create a finding with database ID for LLM tests."""
from guarddog_nexus.constants import SEVERITY_WARNING
from guarddog_nexus.db.models import Finding
finding = Finding(
scan_id=1,
data={
"rule": "shady-links",
"severity": SEVERITY_WARNING,
"message": "Suspicious URL detected",
"location": "setup.py:15",
"code": "url = 'http://evil.com'",
},
)
e2e_db_session.add(finding)
await e2e_db_session.commit()
await e2e_db_session.refresh(finding)
return finding
@pytest.mark.asyncio
async def test_e2e_llm_analysis_disabled(self, e2e_client, finding_with_id):
"""Verify LLM analysis endpoint returns disabled message when LLM is disabled."""
import guarddog_nexus.config
original = guarddog_nexus.config.config.llm_enabled
guarddog_nexus.config.config.llm_enabled = False
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
assert resp.status_code == 200
assert "disabled" in resp.text.lower()
guarddog_nexus.config.config.llm_enabled = original
@pytest.mark.asyncio
async def test_e2e_llm_analysis_success(self, e2e_client, finding_with_id):
"""Verify LLM analysis endpoint works when LLM is enabled."""
import guarddog_nexus.config
original_enabled = guarddog_nexus.config.config.llm_enabled
original_key = guarddog_nexus.config.config.llm_api_key
guarddog_nexus.config.config.llm_enabled = True
guarddog_nexus.config.config.llm_api_key = "sk-test"
fake_report = {
"verdict": "suspicious",
"summary": "Potential security risk",
"analysis": (
"The package contains a URL to an external domain "
"which could be used for data exfiltration."
),
"severity_rating": "medium",
}
async def mock_analyze(data):
return fake_report
with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze):
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
assert resp.status_code == 200
assert "suspicious" in resp.text
guarddog_nexus.config.config.llm_enabled = original_enabled
guarddog_nexus.config.config.llm_api_key = original_key
@pytest.mark.asyncio
async def test_e2e_llm_analysis_idempotent(self, e2e_client, finding_with_id, e2e_db_session):
"""Verify that re-analyzing an already analyzed finding returns the cached report."""
from sqlalchemy import select
from guarddog_nexus.config import config
from guarddog_nexus.db.models import Finding
# First, set up a finding with existing report
finding = await e2e_db_session.scalar(
select(Finding).where(Finding.id == finding_with_id.id)
)
if finding:
finding.report = {
"verdict": "safe",
"summary": "No issues found",
"analysis": "Package appears clean",
"severity_rating": "low",
}
await e2e_db_session.commit()
config.llm_enabled = True
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
assert resp.status_code == 200
# Should return cached report, not make LLM call
assert "safe" in resp.text
config.llm_enabled = False
class TestPaginationE2e:
"""End-to-end tests for pagination functionality."""
@pytest.mark.asyncio
async def test_e2e_scans_pagination(self, e2e_client):
"""Verify that scan list pagination works."""
# First page
resp1 = await e2e_client.get("/api/v1/scans?limit=10&offset=0")
assert resp1.status_code == 200
data1 = resp1.json()
assert data1["limit"] == 10
assert data1["offset"] == 0
# Second page
resp2 = await e2e_client.get("/api/v1/scans?limit=10&offset=10")
assert resp2.status_code == 200
data2 = resp2.json()
assert data2["limit"] == 10
assert data2["offset"] == 10
@pytest.mark.asyncio
async def test_e2e_packages_pagination(self, e2e_client):
"""Verify that package list pagination works."""
resp1 = await e2e_client.get("/api/v1/packages?limit=5&offset=0")
assert resp1.status_code == 200
data1 = resp1.json()
assert data1["limit"] == 5
assert data1["offset"] == 0
class TestFilteringE2e:
"""End-to-end tests for filtering functionality."""
@pytest.mark.asyncio
async def test_e2e_scan_filter_by_status(self, e2e_client):
"""Verify that scans can be filtered by status."""
resp = await e2e_client.get("/api/v1/scans?status=completed")
assert resp.status_code == 200
data = resp.json()
assert all(s["status"] == "completed" for s in data["scans"])
@pytest.mark.asyncio
async def test_e2e_scan_filter_by_flagged(self, e2e_client):
"""Verify that scans can be filtered by flagged status."""
resp = await e2e_client.get("/api/v1/scans?flagged=true")
assert resp.status_code == 200
data = resp.json()
assert all(s["flagged"] is True for s in data["scans"])
@pytest.mark.asyncio
async def test_e2e_scan_filter_by_search(self, e2e_client):
"""Verify that scans can be filtered by search term."""
resp = await e2e_client.get("/api/v1/scans?search=e2e")
assert resp.status_code == 200
data = resp.json()
# If there are matching scans, they should contain the search term
if data["scans"]:
assert any("e2e" in s["package_name"] for s in data["scans"])
class TestErrorHandlingE2e:
"""End-to-end tests for error handling."""
@pytest.mark.asyncio
async def test_e2e_not_found_error(self, e2e_client):
"""Verify that 404 errors are handled correctly."""
resp = await e2e_client.get("/api/v1/scans/999999")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_e2e_invalid_webhook_action(self, e2e_client):
"""Verify that invalid webhook actions are ignored."""
payload = {
"action": "INVALID_ACTION",
"repositoryName": "test-repo",
}
resp = await e2e_client.post("/webhooks/nexus", json=payload)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ignored"
@pytest.mark.asyncio
async def test_e2e_webhook_missing_repository(self, e2e_client):
"""Verify that webhooks without repository are rejected."""
payload = {
"action": "UPDATED",
"asset": {
"format": "pypi",
"name": "/packages/test/1.0/test.tar.gz",
},
}
resp = await e2e_client.post("/webhooks/nexus", json=payload)
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_e2e_webhook_unknown_ecosystem(self, e2e_client):
"""Verify that webhooks with unknown ecosystem are rejected."""
payload = {
"action": "UPDATED",
"repositoryName": "test-repo",
"asset": {
"format": "maven",
"name": "/packages/test/1.0/test-1.0.tar.gz",
},
}
resp = await e2e_client.post("/webhooks/nexus", json=payload)
assert resp.status_code == 200
assert resp.json()["status"] == "ignored"
assert resp.json()["reason"] == "unknown_ecosystem"
class TestWebsocketFragmentE2e:
"""E2E tests for HTMX fragment responses."""
@pytest.mark.asyncio
async def test_e2e_scans_fragment_response(self, e2e_client):
"""Verify that scans page returns fragment when HX-Request header is set."""
resp = await e2e_client.get("/scans", headers={"HX-Request": "true"})
assert resp.status_code == 200
# Fragment should not include full HTML structure
assert "<!DOCTYPE" not in resp.text
@pytest.mark.asyncio
async def test_e2e_packages_fragment_response(self, e2e_client):
"""Verify that packages page returns fragment when HX-Request header is set."""
resp = await e2e_client.get("/packages", headers={"HX-Request": "true"})
assert resp.status_code == 200
assert "<!DOCTYPE" not in resp.text