238 lines
8.7 KiB
Python
238 lines
8.7 KiB
Python
"""E2E tests for LLM analysis flow and edge cases."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
|
|
class TestLlmAnalysisE2e:
|
|
"""End-to-end tests for LLM analysis functionality."""
|
|
|
|
@pytest.fixture
|
|
async def finding_with_id(self, e2e_db_session):
|
|
"""Create a finding with database ID for LLM tests."""
|
|
from guarddog_nexus.constants import SEVERITY_WARNING
|
|
from guarddog_nexus.db.models import Finding
|
|
|
|
finding = Finding(
|
|
scan_id=1,
|
|
data={
|
|
"rule": "shady-links",
|
|
"severity": SEVERITY_WARNING,
|
|
"message": "Suspicious URL detected",
|
|
"location": "setup.py:15",
|
|
"code": "url = 'http://evil.com'",
|
|
},
|
|
)
|
|
e2e_db_session.add(finding)
|
|
await e2e_db_session.commit()
|
|
await e2e_db_session.refresh(finding)
|
|
return finding
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_llm_analysis_disabled(self, e2e_client, finding_with_id):
|
|
"""Verify LLM analysis endpoint returns disabled message when LLM is disabled."""
|
|
import guarddog_nexus.config
|
|
|
|
original = guarddog_nexus.config.config.llm_enabled
|
|
guarddog_nexus.config.config.llm_enabled = False
|
|
|
|
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
|
|
assert resp.status_code == 200
|
|
assert "disabled" in resp.text.lower()
|
|
|
|
guarddog_nexus.config.config.llm_enabled = original
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_llm_analysis_success(self, e2e_client, finding_with_id):
|
|
"""Verify LLM analysis endpoint works when LLM is enabled."""
|
|
import guarddog_nexus.config
|
|
|
|
original_enabled = guarddog_nexus.config.config.llm_enabled
|
|
original_key = guarddog_nexus.config.config.llm_api_key
|
|
guarddog_nexus.config.config.llm_enabled = True
|
|
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
|
|
|
fake_report = {
|
|
"verdict": "suspicious",
|
|
"summary": "Potential security risk",
|
|
"analysis": (
|
|
"The package contains a URL to an external domain "
|
|
"which could be used for data exfiltration."
|
|
),
|
|
"severity_rating": "medium",
|
|
}
|
|
|
|
async def mock_analyze(data):
|
|
return fake_report
|
|
|
|
with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze):
|
|
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
|
|
|
|
assert resp.status_code == 200
|
|
assert "suspicious" in resp.text
|
|
assert "security risk" in resp.text.lower()
|
|
|
|
guarddog_nexus.config.config.llm_enabled = original_enabled
|
|
guarddog_nexus.config.config.llm_api_key = original_key
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_llm_analysis_idempotent(self, e2e_client, finding_with_id, e2e_db_session):
|
|
"""Verify that re-analyzing an already analyzed finding returns the cached report."""
|
|
from sqlalchemy import select
|
|
|
|
from guarddog_nexus.config import config
|
|
from guarddog_nexus.db.models import Finding
|
|
|
|
# First, set up a finding with existing report
|
|
finding = await e2e_db_session.scalar(
|
|
select(Finding).where(Finding.id == finding_with_id.id)
|
|
)
|
|
if finding:
|
|
finding.report = {
|
|
"verdict": "safe",
|
|
"summary": "No issues found",
|
|
"analysis": "Package appears clean",
|
|
"severity_rating": "low",
|
|
}
|
|
await e2e_db_session.commit()
|
|
|
|
config.llm_enabled = True
|
|
|
|
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
|
|
assert resp.status_code == 200
|
|
# Should return cached report, not make LLM call
|
|
assert "safe" in resp.text
|
|
|
|
config.llm_enabled = False
|
|
|
|
|
|
class TestPaginationE2e:
|
|
"""End-to-end tests for pagination functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scans_pagination(self, e2e_client):
|
|
"""Verify that scan list pagination works."""
|
|
# First page
|
|
resp1 = await e2e_client.get("/api/v1/scans?limit=10&offset=0")
|
|
assert resp1.status_code == 200
|
|
data1 = resp1.json()
|
|
assert data1["limit"] == 10
|
|
assert data1["offset"] == 0
|
|
|
|
# Second page
|
|
resp2 = await e2e_client.get("/api/v1/scans?limit=10&offset=10")
|
|
assert resp2.status_code == 200
|
|
data2 = resp2.json()
|
|
assert data2["limit"] == 10
|
|
assert data2["offset"] == 10
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_packages_pagination(self, e2e_client):
|
|
"""Verify that package list pagination works."""
|
|
resp1 = await e2e_client.get("/api/v1/packages?limit=5&offset=0")
|
|
assert resp1.status_code == 200
|
|
data1 = resp1.json()
|
|
assert data1["limit"] == 5
|
|
assert data1["offset"] == 0
|
|
|
|
|
|
class TestFilteringE2e:
|
|
"""End-to-end tests for filtering functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scan_filter_by_status(self, e2e_client):
|
|
"""Verify that scans can be filtered by status."""
|
|
resp = await e2e_client.get("/api/v1/scans?status=completed")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert all(s["status"] == "completed" for s in data["scans"])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scan_filter_by_flagged(self, e2e_client):
|
|
"""Verify that scans can be filtered by flagged status."""
|
|
resp = await e2e_client.get("/api/v1/scans?flagged=true")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert all(s["flagged"] is True for s in data["scans"])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scan_filter_by_search(self, e2e_client):
|
|
"""Verify that scans can be filtered by search term."""
|
|
resp = await e2e_client.get("/api/v1/scans?search=e2e")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
# If there are matching scans, they should contain the search term
|
|
if data["scans"]:
|
|
assert any("e2e" in s["package_name"] for s in data["scans"])
|
|
|
|
|
|
class TestErrorHandlingE2e:
|
|
"""End-to-end tests for error handling."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_not_found_error(self, e2e_client):
|
|
"""Verify that 404 errors are handled correctly."""
|
|
resp = await e2e_client.get("/api/v1/scans/999999")
|
|
assert resp.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_invalid_webhook_action(self, e2e_client):
|
|
"""Verify that invalid webhook actions are ignored."""
|
|
payload = {
|
|
"action": "INVALID_ACTION",
|
|
"repositoryName": "test-repo",
|
|
}
|
|
resp = await e2e_client.post("/webhooks/nexus", json=payload)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "ignored"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_webhook_missing_repository(self, e2e_client):
|
|
"""Verify that webhooks without repository are rejected."""
|
|
payload = {
|
|
"action": "UPDATED",
|
|
"asset": {
|
|
"format": "pypi",
|
|
"name": "/packages/test/1.0/test.tar.gz",
|
|
},
|
|
}
|
|
resp = await e2e_client.post("/webhooks/nexus", json=payload)
|
|
assert resp.status_code == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_webhook_unknown_ecosystem(self, e2e_client):
|
|
"""Verify that webhooks with unknown ecosystem are rejected."""
|
|
payload = {
|
|
"action": "UPDATED",
|
|
"repositoryName": "test-repo",
|
|
"asset": {
|
|
"format": "maven",
|
|
"name": "/packages/test/1.0/test-1.0.tar.gz",
|
|
},
|
|
}
|
|
resp = await e2e_client.post("/webhooks/nexus", json=payload)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ignored"
|
|
assert resp.json()["reason"] == "unknown_ecosystem"
|
|
|
|
|
|
class TestWebsocketFragmentE2e:
|
|
"""E2E tests for HTMX fragment responses."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scans_fragment_response(self, e2e_client):
|
|
"""Verify that scans page returns fragment when HX-Request header is set."""
|
|
resp = await e2e_client.get("/scans", headers={"HX-Request": "true"})
|
|
assert resp.status_code == 200
|
|
# Fragment should not include full HTML structure
|
|
assert "<!DOCTYPE" not in resp.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_packages_fragment_response(self, e2e_client):
|
|
"""Verify that packages page returns fragment when HX-Request header is set."""
|
|
resp = await e2e_client.get("/packages", headers={"HX-Request": "true"})
|
|
assert resp.status_code == 200
|
|
assert "<!DOCTYPE" not in resp.text
|