refactor: uv-based deps, no nexus auth, LLM retries, lock cleanup, health checks, e2e tests
This commit is contained in:
@@ -12,8 +12,6 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
os.environ["DATABASE_PATH"] = ":memory:"
|
||||
os.environ["NEXUS_URL"] = "http://nexus:8081"
|
||||
os.environ["NEXUS_USERNAME"] = "admin"
|
||||
os.environ["NEXUS_PASSWORD"] = "admin123"
|
||||
os.environ["LOG_SYSLOG_HOST"] = ""
|
||||
os.environ["TEMP_DIR"] = "/tmp/guarddog-nexus-test"
|
||||
|
||||
|
||||
160
tests/e2e/conftest.py
Normal file
160
tests/e2e/conftest.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""E2E test fixtures for GuardDog Nexus end-to-end tests."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
# Add project root to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
# Set environment for testing
|
||||
os.environ["DATABASE_PATH"] = ":memory:"
|
||||
os.environ["NEXUS_URL"] = "http://nexus:8081"
|
||||
os.environ["LOG_SYSLOG_HOST"] = ""
|
||||
os.environ["TEMP_DIR"] = "/tmp/guarddog-nexus-e2e"
|
||||
os.environ["LLM_ENABLED"] = "0"
|
||||
os.environ["LLM_AUTO_ANALYZE"] = "0"
|
||||
os.environ["LLM_API_KEY"] = ""
|
||||
|
||||
from guarddog_nexus.constants import DEFAULT_ECOSYSTEM, SEVERITY_WARNING # noqa: E402
|
||||
from guarddog_nexus.db.engine import Base, get_session # noqa: E402
|
||||
from guarddog_nexus.db.models import Finding, Scan, ScanStatus # noqa: E402
|
||||
from guarddog_nexus.main import app # noqa: E402
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def e2e_db_engine():
|
||||
"""Create shared database engine for e2e tests."""
|
||||
engine = create_async_engine(
|
||||
"sqlite+aiosqlite:///file:e2e_test?mode=memory&cache=shared&uri=true"
|
||||
)
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
yield engine
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def e2e_db_session(e2e_db_engine):
|
||||
"""Create database session for e2e tests."""
|
||||
maker = async_sessionmaker(e2e_db_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
async with maker() as session:
|
||||
yield session
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def e2e_client(e2e_db_engine):
|
||||
"""Create HTTP client for e2e tests."""
|
||||
maker = async_sessionmaker(e2e_db_engine, class_=AsyncSession, expire_on_commit=False)
|
||||
|
||||
async def override_get_session():
|
||||
async with maker() as session:
|
||||
yield session
|
||||
|
||||
app.dependency_overrides[get_session] = override_get_session
|
||||
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
yield ac
|
||||
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def sample_e2e_scan(e2e_db_session):
|
||||
"""Create a sample scan with findings for e2e tests."""
|
||||
scan = Scan(
|
||||
package_name="test-e2e-pkg",
|
||||
package_version="1.0.0",
|
||||
ecosystem=DEFAULT_ECOSYSTEM,
|
||||
repository="pypi-proxy",
|
||||
nexus_asset_url="http://nexus:8081/repository/pypi-proxy/packages/test-e2e-pkg/1.0.0/test-e2e-pkg-1.0.0.tar.gz",
|
||||
sha256="e2e1234567890abcdef",
|
||||
status=ScanStatus.COMPLETED.value,
|
||||
total_findings=2,
|
||||
flagged=True,
|
||||
)
|
||||
e2e_db_session.add(scan)
|
||||
await e2e_db_session.commit()
|
||||
await e2e_db_session.refresh(scan)
|
||||
|
||||
# Add findings
|
||||
for i, rule in enumerate(["shady-links", "exec-base64"]):
|
||||
finding = Finding(
|
||||
scan_id=scan.id,
|
||||
data={
|
||||
"rule": rule,
|
||||
"severity": SEVERITY_WARNING,
|
||||
"message": f"E2E test finding {i + 1}",
|
||||
"location": f"test.py:{i + 1}",
|
||||
"code": f"print('test {i + 1}')",
|
||||
},
|
||||
)
|
||||
e2e_db_session.add(finding)
|
||||
await e2e_db_session.commit()
|
||||
await e2e_db_session.refresh(scan)
|
||||
return scan
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def e2e_webhook_payload():
|
||||
"""Create a sample Nexus webhook payload."""
|
||||
return {
|
||||
"timestamp": "2026-05-11T12:00:00.000+00:00",
|
||||
"nodeId": "e2e-test-node",
|
||||
"initiator": "e2e-test",
|
||||
"action": "UPDATED",
|
||||
"repositoryName": "pypi-proxy",
|
||||
"asset": {
|
||||
"id": "e2e123",
|
||||
"assetId": "dGVzdGUyZTFFMjM=",
|
||||
"format": "pypi",
|
||||
"name": "/packages/e2e-test-pkg/1.0.0/e2e-test-pkg-1.0.0.tar.gz",
|
||||
"downloadUrl": "http://nexus:8081/repository/pypi-proxy/packages/e2e-test-pkg/1.0.0/e2e-test-pkg-1.0.0.tar.gz",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def e2e_go_webhook_payload():
|
||||
"""Create a sample Go webhook payload."""
|
||||
return {
|
||||
"timestamp": "2026-05-11T12:00:00.000+00:00",
|
||||
"nodeId": "e2e-test-node",
|
||||
"initiator": "e2e-test",
|
||||
"action": "UPDATED",
|
||||
"repositoryName": "go-proxy",
|
||||
"asset": {
|
||||
"id": "e2ego123",
|
||||
"assetId": "Z29lMjFFMjM=",
|
||||
"format": "go",
|
||||
"name": "/packages/github.com/e2e/test-go/@v/v1.0.0.zip",
|
||||
"downloadUrl": "http://nexus:8081/repository/go-proxy/github.com/e2e/test-go/@v/v1.0.0.zip",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def e2e_npm_webhook_payload():
|
||||
"""Create a sample npm webhook payload."""
|
||||
return {
|
||||
"timestamp": "2026-05-11T12:00:00.000+00:00",
|
||||
"nodeId": "e2e-test-node",
|
||||
"initiator": "e2e-test",
|
||||
"action": "UPDATED",
|
||||
"repositoryName": "npm-proxy",
|
||||
"asset": {
|
||||
"id": "e2enpm123",
|
||||
"assetId": "bnBtZTJFRTIz",
|
||||
"format": "npm",
|
||||
"name": "/packages/e2e-test-npm/-/e2e-test-npm-1.0.0.tgz",
|
||||
"downloadUrl": "http://nexus:8081/repository/npm-proxy/e2e-test-npm/-/e2e-test-npm-1.0.0.tgz",
|
||||
},
|
||||
}
|
||||
221
tests/e2e/test_llm_and_edge_cases.py
Normal file
221
tests/e2e/test_llm_and_edge_cases.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""E2E tests for LLM analysis flow and edge cases."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestLlmAnalysisE2e:
|
||||
"""End-to-end tests for LLM analysis functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
async def finding_with_id(self, e2e_db_session):
|
||||
"""Create a finding with database ID for LLM tests."""
|
||||
from guarddog_nexus.constants import SEVERITY_WARNING
|
||||
from guarddog_nexus.db.models import Finding
|
||||
|
||||
finding = Finding(
|
||||
scan_id=1,
|
||||
data={
|
||||
"rule": "shady-links",
|
||||
"severity": SEVERITY_WARNING,
|
||||
"message": "Suspicious URL detected",
|
||||
"location": "setup.py:15",
|
||||
"code": "url = 'http://evil.com'",
|
||||
},
|
||||
)
|
||||
e2e_db_session.add(finding)
|
||||
await e2e_db_session.commit()
|
||||
await e2e_db_session.refresh(finding)
|
||||
return finding
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_llm_analysis_disabled(self, e2e_client, finding_with_id):
|
||||
"""Verify LLM analysis endpoint returns disabled message when LLM is disabled."""
|
||||
import guarddog_nexus.config
|
||||
|
||||
original = guarddog_nexus.config.config.llm_enabled
|
||||
guarddog_nexus.config.config.llm_enabled = False
|
||||
|
||||
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
|
||||
assert resp.status_code == 200
|
||||
assert "disabled" in resp.text.lower()
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = original
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_llm_analysis_success(self, e2e_client, finding_with_id):
|
||||
"""Verify LLM analysis endpoint works when LLM is enabled."""
|
||||
import guarddog_nexus.config
|
||||
|
||||
original_enabled = guarddog_nexus.config.config.llm_enabled
|
||||
original_key = guarddog_nexus.config.config.llm_api_key
|
||||
guarddog_nexus.config.config.llm_enabled = True
|
||||
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
||||
|
||||
fake_report = {
|
||||
"verdict": "suspicious",
|
||||
"summary": "Potential security risk",
|
||||
"analysis": (
|
||||
"The package contains a URL to an external domain "
|
||||
"which could be used for data exfiltration."
|
||||
),
|
||||
"severity_rating": "medium",
|
||||
}
|
||||
|
||||
async def mock_analyze(data):
|
||||
return fake_report
|
||||
|
||||
with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze):
|
||||
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert "suspicious" in resp.text
|
||||
assert "security risk" in resp.text.lower()
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = original_enabled
|
||||
guarddog_nexus.config.config.llm_api_key = original_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_llm_analysis_idempotent(self, e2e_client, finding_with_id, e2e_db_session):
|
||||
"""Verify that re-analyzing an already analyzed finding returns the cached report."""
|
||||
from sqlalchemy import select
|
||||
|
||||
from guarddog_nexus.config import config
|
||||
from guarddog_nexus.db.models import Finding
|
||||
|
||||
# First, set up a finding with existing report
|
||||
finding = await e2e_db_session.scalar(
|
||||
select(Finding).where(Finding.id == finding_with_id.id)
|
||||
)
|
||||
if finding:
|
||||
finding.report = {
|
||||
"verdict": "safe",
|
||||
"summary": "No issues found",
|
||||
"analysis": "Package appears clean",
|
||||
"severity_rating": "low",
|
||||
}
|
||||
await e2e_db_session.commit()
|
||||
|
||||
config.llm_enabled = True
|
||||
|
||||
resp = await e2e_client.post(f"/api/v1/findings/{finding_with_id.id}/analyze")
|
||||
assert resp.status_code == 200
|
||||
# Should return cached report, not make LLM call
|
||||
assert "safe" in resp.text
|
||||
|
||||
config.llm_enabled = False
|
||||
|
||||
|
||||
class TestPaginationE2e:
|
||||
"""End-to-end tests for pagination functionality."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scans_pagination(self, e2e_client):
|
||||
"""Verify that scan list pagination works."""
|
||||
# First page
|
||||
resp1 = await e2e_client.get("/api/v1/scans?limit=10&offset=0")
|
||||
assert resp1.status_code == 200
|
||||
data1 = resp1.json()
|
||||
assert data1["limit"] == 10
|
||||
assert data1["offset"] == 0
|
||||
|
||||
# Second page
|
||||
resp2 = await e2e_client.get("/api/v1/scans?limit=10&offset=10")
|
||||
assert resp2.status_code == 200
|
||||
data2 = resp2.json()
|
||||
assert data2["limit"] == 10
|
||||
assert data2["offset"] == 10
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_packages_pagination(self, e2e_client):
|
||||
"""Verify that package list pagination works."""
|
||||
resp1 = await e2e_client.get("/api/v1/packages?limit=5&offset=0")
|
||||
assert resp1.status_code == 200
|
||||
data1 = resp1.json()
|
||||
assert data1["limit"] == 5
|
||||
assert data1["offset"] == 0
|
||||
|
||||
|
||||
class TestFilteringE2e:
|
||||
"""End-to-end tests for filtering functionality."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scan_filter_by_status(self, e2e_client):
|
||||
"""Verify that scans can be filtered by status."""
|
||||
resp = await e2e_client.get("/api/v1/scans?status=completed")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert all(s["status"] == "completed" for s in data["scans"])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scan_filter_by_flagged(self, e2e_client):
|
||||
"""Verify that scans can be filtered by flagged status."""
|
||||
resp = await e2e_client.get("/api/v1/scans?flagged=true")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert all(s["flagged"] is True for s in data["scans"])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scan_filter_by_search(self, e2e_client):
|
||||
"""Verify that scans can be filtered by search term."""
|
||||
resp = await e2e_client.get("/api/v1/scans?search=e2e")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
# If there are matching scans, they should contain the search term
|
||||
if data["scans"]:
|
||||
assert any("e2e" in s["package_name"] for s in data["scans"])
|
||||
|
||||
|
||||
class TestErrorHandlingE2e:
|
||||
"""End-to-end tests for error handling."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_not_found_error(self, e2e_client):
|
||||
"""Verify that 404 errors are handled correctly."""
|
||||
resp = await e2e_client.get("/api/v1/scans/999999")
|
||||
assert resp.status_code == 404
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_invalid_webhook_action(self, e2e_client):
|
||||
"""Verify that invalid webhook actions are ignored."""
|
||||
payload = {
|
||||
"action": "INVALID_ACTION",
|
||||
"repositoryName": "test-repo",
|
||||
}
|
||||
resp = await e2e_client.post("/webhooks/nexus", json=payload)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "ignored"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_webhook_missing_repository(self, e2e_client):
|
||||
"""Verify that webhooks without repository are rejected."""
|
||||
payload = {
|
||||
"action": "UPDATED",
|
||||
"asset": {
|
||||
"format": "pypi",
|
||||
"name": "/packages/test/1.0/test.tar.gz",
|
||||
},
|
||||
}
|
||||
resp = await e2e_client.post("/webhooks/nexus", json=payload)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
class TestWebsocketFragmentE2e:
|
||||
"""E2E tests for HTMX fragment responses."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scans_fragment_response(self, e2e_client):
|
||||
"""Verify that scans page returns fragment when HX-Request header is set."""
|
||||
resp = await e2e_client.get("/scans", headers={"HX-Request": "true"})
|
||||
assert resp.status_code == 200
|
||||
# Fragment should not include full HTML structure
|
||||
assert "<!DOCTYPE" not in resp.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_packages_fragment_response(self, e2e_client):
|
||||
"""Verify that packages page returns fragment when HX-Request header is set."""
|
||||
resp = await e2e_client.get("/packages", headers={"HX-Request": "true"})
|
||||
assert resp.status_code == 200
|
||||
assert "<!DOCTYPE" not in resp.text
|
||||
308
tests/e2e/test_webhook_flow.py
Normal file
308
tests/e2e/test_webhook_flow.py
Normal file
@@ -0,0 +1,308 @@
|
||||
"""E2E tests for the complete webhook-to-scan flow."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestWebhookToScanFlow:
|
||||
"""End-to-end tests for the complete webhook to scan pipeline."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_webhook_triggers_scan_creation(
|
||||
self, e2e_client, e2e_webhook_payload, e2e_db_session
|
||||
):
|
||||
"""Verify that receiving a webhook triggers scan creation."""
|
||||
|
||||
# Mock the scan to avoid actual download and GuardDog execution
|
||||
async def mock_harvest(*args, **kwargs):
|
||||
from guarddog_nexus.db.models import Scan, ScanStatus
|
||||
|
||||
scan = Scan(
|
||||
package_name="e2e-test-pkg",
|
||||
package_version="1.0.0",
|
||||
ecosystem="pypi",
|
||||
repository="pypi-proxy",
|
||||
nexus_asset_url=args[0],
|
||||
status=ScanStatus.COMPLETED.value,
|
||||
total_findings=0,
|
||||
flagged=False,
|
||||
)
|
||||
e2e_db_session.add(scan)
|
||||
await e2e_db_session.commit()
|
||||
await e2e_db_session.refresh(scan)
|
||||
return scan
|
||||
|
||||
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
|
||||
resp = await e2e_client.post("/webhooks/nexus", json=e2e_webhook_payload)
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "accepted"
|
||||
assert "e2e-test-pkg" in data.get("asset", "")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_webhook_accepts_go_asset(
|
||||
self, e2e_client, e2e_go_webhook_payload, e2e_db_session
|
||||
):
|
||||
"""Verify that Go assets are accepted and processed."""
|
||||
|
||||
async def mock_harvest(*args, **kwargs):
|
||||
from guarddog_nexus.db.models import Scan, ScanStatus
|
||||
|
||||
scan = Scan(
|
||||
package_name="github.com/e2e/test-go",
|
||||
package_version="v1.0.0",
|
||||
ecosystem="go",
|
||||
repository="go-proxy",
|
||||
nexus_asset_url=args[0],
|
||||
status=ScanStatus.COMPLETED.value,
|
||||
total_findings=0,
|
||||
flagged=False,
|
||||
)
|
||||
e2e_db_session.add(scan)
|
||||
await e2e_db_session.commit()
|
||||
await e2e_db_session.refresh(scan)
|
||||
return scan
|
||||
|
||||
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
|
||||
resp = await e2e_client.post("/webhooks/nexus", json=e2e_go_webhook_payload)
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "accepted"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_webhook_accepts_npm_asset(
|
||||
self, e2e_client, e2e_npm_webhook_payload, e2e_db_session
|
||||
):
|
||||
"""Verify that npm assets are accepted and processed."""
|
||||
|
||||
async def mock_harvest(*args, **kwargs):
|
||||
from guarddog_nexus.db.models import Scan, ScanStatus
|
||||
|
||||
scan = Scan(
|
||||
package_name="e2e-test-npm",
|
||||
package_version="1.0.0",
|
||||
ecosystem="npm",
|
||||
repository="npm-proxy",
|
||||
nexus_asset_url=args[0],
|
||||
status=ScanStatus.COMPLETED.value,
|
||||
total_findings=0,
|
||||
flagged=False,
|
||||
)
|
||||
e2e_db_session.add(scan)
|
||||
await e2e_db_session.commit()
|
||||
await e2e_db_session.refresh(scan)
|
||||
return scan
|
||||
|
||||
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
|
||||
resp = await e2e_client.post("/webhooks/nexus", json=e2e_npm_webhook_payload)
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "accepted"
|
||||
|
||||
|
||||
class TestWebhookSignatureValidation:
|
||||
"""E2E tests for webhook signature validation."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_webhook_with_valid_signature(self, e2e_client, e2e_webhook_payload):
|
||||
"""Verify that webhooks with valid signatures are accepted."""
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
|
||||
from guarddog_nexus.config import config
|
||||
|
||||
original_secret = config.webhook_secret
|
||||
config.webhook_secret = "test-secret"
|
||||
|
||||
# Calculate valid signature
|
||||
payload_bytes = json.dumps(e2e_webhook_payload).encode("utf-8")
|
||||
signature = hmac.new(b"test-secret", payload_bytes, hashlib.sha256).hexdigest()
|
||||
|
||||
resp = await e2e_client.post(
|
||||
"/webhooks/nexus",
|
||||
content=payload_bytes,
|
||||
headers={"X-Nexus-Webhook-Signature": signature, "Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
# Should be accepted (signature matches)
|
||||
assert resp.status_code == 200
|
||||
|
||||
config.webhook_secret = original_secret
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_webhook_with_invalid_signature(self, e2e_client, e2e_webhook_payload):
|
||||
"""Verify that webhooks with invalid signatures are rejected."""
|
||||
import json
|
||||
|
||||
from guarddog_nexus.config import config
|
||||
|
||||
original_secret = config.webhook_secret
|
||||
config.webhook_secret = "test-secret"
|
||||
|
||||
payload_bytes = json.dumps(e2e_webhook_payload).encode("utf-8")
|
||||
resp = await e2e_client.post(
|
||||
"/webhooks/nexus",
|
||||
content=payload_bytes,
|
||||
headers={
|
||||
"X-Nexus-Webhook-Signature": "invalid-signature",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
# Should be rejected when secret is set
|
||||
assert resp.status_code == 403
|
||||
|
||||
config.webhook_secret = original_secret
|
||||
|
||||
|
||||
class TestApiIntegration:
|
||||
"""E2E tests for API endpoint integration with database."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_api_scan_list_with_data(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that API returns scan data from database."""
|
||||
resp = await e2e_client.get("/api/v1/scans")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["total"] >= 1
|
||||
assert len(data["scans"]) >= 1
|
||||
assert data["scans"][0]["package_name"] == "test-e2e-pkg"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_api_scan_detail_with_findings(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that scan detail includes findings."""
|
||||
resp = await e2e_client.get(f"/api/v1/scans/{sample_e2e_scan.id}")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["package_name"] == "test-e2e-pkg"
|
||||
assert len(data["findings"]) == 2
|
||||
assert data["total_findings"] == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_api_package_list(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that package list shows aggregated data."""
|
||||
resp = await e2e_client.get("/api/v1/packages")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["total"] >= 1
|
||||
assert any(p["name"] == "test-e2e-pkg" for p in data["packages"])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_api_findings_list(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that findings list returns all findings."""
|
||||
resp = await e2e_client.get("/api/v1/findings")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["total"] >= 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_api_findings_filter_by_rule(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that findings can be filtered by rule."""
|
||||
resp = await e2e_client.get("/api/v1/findings?rule=shady-links")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["total"] >= 1
|
||||
assert all(f["rule"] == "shady-links" for f in data["findings"])
|
||||
|
||||
|
||||
class TestWebUiIntegration:
|
||||
"""E2E tests for web UI integration."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_dashboard_page(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that dashboard page renders with data."""
|
||||
resp = await e2e_client.get("/")
|
||||
assert resp.status_code == 200
|
||||
assert "GuardDog Nexus" in resp.text
|
||||
assert "Dashboard" in resp.text or "Панель" in resp.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scans_page(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that scans page renders with data."""
|
||||
resp = await e2e_client.get("/scans")
|
||||
assert resp.status_code == 200
|
||||
assert "Scans" in resp.text or "Сканирования" in resp.text
|
||||
assert "test-e2e-pkg" in resp.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scans_detail_page(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that scan detail page shows findings."""
|
||||
resp = await e2e_client.get(f"/scans/{sample_e2e_scan.id}")
|
||||
assert resp.status_code == 200
|
||||
assert "Scan" in resp.text or "Сканирование" in resp.text
|
||||
assert "shady-links" in resp.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_packages_page(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that packages page renders with data."""
|
||||
resp = await e2e_client.get("/packages")
|
||||
assert resp.status_code == 200
|
||||
assert "Packages" in resp.text or "Пакеты" in resp.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_package_detail_page(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that package detail page shows all scans and findings."""
|
||||
resp = await e2e_client.get("/packages/test-e2e-pkg/1.0.0")
|
||||
assert resp.status_code == 200
|
||||
assert "test-e2e-pkg" in resp.text
|
||||
assert "shady-links" in resp.text
|
||||
|
||||
|
||||
class TestHealthAndMetrics:
|
||||
"""E2E tests for health and metrics endpoints."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_health_endpoint(self, e2e_client):
|
||||
"""Verify that health endpoint returns status."""
|
||||
resp = await e2e_client.get("/health")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "ok"
|
||||
assert "version" in data
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_metrics_endpoint(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that metrics endpoint returns Prometheus format."""
|
||||
resp = await e2e_client.get("/metrics")
|
||||
assert resp.status_code == 200
|
||||
assert "text/plain" in resp.headers["content-type"]
|
||||
assert "guarddog_scans_total" in resp.text
|
||||
assert "guarddog_scans_flagged_total" in resp.text
|
||||
assert "# HELP" in resp.text
|
||||
assert "# TYPE" in resp.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_health_dependencies_endpoint(self, e2e_client):
|
||||
"""Verify that dependency health checks work."""
|
||||
resp = await e2e_client.get("/health/dependencies")
|
||||
assert resp.status_code in [200, 503] # 503 if Nexus not reachable
|
||||
data = resp.json()
|
||||
assert "database" in data
|
||||
assert "nexus" in data
|
||||
|
||||
|
||||
class TestCsvExport:
|
||||
"""E2E tests for CSV export functionality."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_scans_csv_export(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that scans CSV export works."""
|
||||
resp = await e2e_client.get("/api/v1/scans/export")
|
||||
assert resp.status_code == 200
|
||||
assert "text/csv" in resp.headers["content-type"]
|
||||
assert "id,package_name" in resp.text
|
||||
assert "test-e2e-pkg" in resp.text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_e2e_packages_csv_export(self, e2e_client, sample_e2e_scan):
|
||||
"""Verify that packages CSV export works."""
|
||||
resp = await e2e_client.get("/api/v1/packages/export")
|
||||
assert resp.status_code == 200
|
||||
assert "text/csv" in resp.headers["content-type"]
|
||||
assert "name,version" in resp.text
|
||||
assert "test-e2e-pkg" in resp.text
|
||||
@@ -1,7 +1,8 @@
|
||||
"""Tests for GuardDog scanner integration."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock, patch
|
||||
import warnings
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -61,10 +62,16 @@ def test_normalize_semgrep_list():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scan_package_timeout():
|
||||
with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError):
|
||||
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
||||
assert result["findings"] == []
|
||||
assert "timeout" in result["errors"][0]
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.communicate = AsyncMock(return_value=(b"", b""))
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", RuntimeWarning)
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
||||
with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError):
|
||||
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
||||
assert result["findings"] == []
|
||||
assert "timeout" in result["errors"][0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Reference in New Issue
Block a user