Files
guarddog-nexus/tests/e2e/test_webhook_flow.py

309 lines
12 KiB
Python

"""E2E tests for the complete webhook-to-scan flow."""
from unittest.mock import patch
import pytest
class TestWebhookToScanFlow:
"""End-to-end tests for the complete webhook to scan pipeline."""
@pytest.mark.asyncio
async def test_e2e_webhook_triggers_scan_creation(
self, e2e_client, e2e_webhook_payload, e2e_db_session
):
"""Verify that receiving a webhook triggers scan creation."""
# Mock the scan to avoid actual download and GuardDog execution
async def mock_harvest(*args, **kwargs):
from guarddog_nexus.db.models import Scan, ScanStatus
scan = Scan(
package_name="e2e-test-pkg",
package_version="1.0.0",
ecosystem="pypi",
repository="pypi-proxy",
nexus_asset_url=args[0],
status=ScanStatus.COMPLETED.value,
total_findings=0,
flagged=False,
)
e2e_db_session.add(scan)
await e2e_db_session.commit()
await e2e_db_session.refresh(scan)
return scan
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
resp = await e2e_client.post("/webhooks/nexus", json=e2e_webhook_payload)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "accepted"
assert "e2e-test-pkg" in data.get("asset", "")
@pytest.mark.asyncio
async def test_e2e_webhook_accepts_go_asset(
self, e2e_client, e2e_go_webhook_payload, e2e_db_session
):
"""Verify that Go assets are accepted and processed."""
async def mock_harvest(*args, **kwargs):
from guarddog_nexus.db.models import Scan, ScanStatus
scan = Scan(
package_name="github.com/e2e/test-go",
package_version="v1.0.0",
ecosystem="go",
repository="go-proxy",
nexus_asset_url=args[0],
status=ScanStatus.COMPLETED.value,
total_findings=0,
flagged=False,
)
e2e_db_session.add(scan)
await e2e_db_session.commit()
await e2e_db_session.refresh(scan)
return scan
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
resp = await e2e_client.post("/webhooks/nexus", json=e2e_go_webhook_payload)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "accepted"
@pytest.mark.asyncio
async def test_e2e_webhook_accepts_npm_asset(
self, e2e_client, e2e_npm_webhook_payload, e2e_db_session
):
"""Verify that npm assets are accepted and processed."""
async def mock_harvest(*args, **kwargs):
from guarddog_nexus.db.models import Scan, ScanStatus
scan = Scan(
package_name="e2e-test-npm",
package_version="1.0.0",
ecosystem="npm",
repository="npm-proxy",
nexus_asset_url=args[0],
status=ScanStatus.COMPLETED.value,
total_findings=0,
flagged=False,
)
e2e_db_session.add(scan)
await e2e_db_session.commit()
await e2e_db_session.refresh(scan)
return scan
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
resp = await e2e_client.post("/webhooks/nexus", json=e2e_npm_webhook_payload)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "accepted"
class TestWebhookSignatureValidation:
"""E2E tests for webhook signature validation."""
@pytest.mark.asyncio
async def test_e2e_webhook_with_valid_signature(self, e2e_client, e2e_webhook_payload):
"""Verify that webhooks with valid signatures are accepted."""
import hashlib
import hmac
import json
from guarddog_nexus.config import config
original_secret = config.webhook_secret
config.webhook_secret = "test-secret"
# Calculate valid signature
payload_bytes = json.dumps(e2e_webhook_payload).encode("utf-8")
signature = hmac.new(b"test-secret", payload_bytes, hashlib.sha256).hexdigest()
resp = await e2e_client.post(
"/webhooks/nexus",
content=payload_bytes,
headers={"X-Nexus-Webhook-Signature": signature, "Content-Type": "application/json"},
)
# Should be accepted (signature matches)
assert resp.status_code == 200
config.webhook_secret = original_secret
@pytest.mark.asyncio
async def test_e2e_webhook_with_invalid_signature(self, e2e_client, e2e_webhook_payload):
"""Verify that webhooks with invalid signatures are rejected."""
import json
from guarddog_nexus.config import config
original_secret = config.webhook_secret
config.webhook_secret = "test-secret"
payload_bytes = json.dumps(e2e_webhook_payload).encode("utf-8")
resp = await e2e_client.post(
"/webhooks/nexus",
content=payload_bytes,
headers={
"X-Nexus-Webhook-Signature": "invalid-signature",
"Content-Type": "application/json",
},
)
# Should be rejected when secret is set
assert resp.status_code == 403
config.webhook_secret = original_secret
class TestApiIntegration:
"""E2E tests for API endpoint integration with database."""
@pytest.mark.asyncio
async def test_e2e_api_scan_list_with_data(self, e2e_client, sample_e2e_scan):
"""Verify that API returns scan data from database."""
resp = await e2e_client.get("/api/v1/scans")
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 1
assert len(data["scans"]) >= 1
assert data["scans"][0]["package_name"] == "test-e2e-pkg"
@pytest.mark.asyncio
async def test_e2e_api_scan_detail_with_findings(self, e2e_client, sample_e2e_scan):
"""Verify that scan detail includes findings."""
resp = await e2e_client.get(f"/api/v1/scans/{sample_e2e_scan.id}")
assert resp.status_code == 200
data = resp.json()
assert data["package_name"] == "test-e2e-pkg"
assert len(data["findings"]) == 2
assert data["total_findings"] == 2
@pytest.mark.asyncio
async def test_e2e_api_package_list(self, e2e_client, sample_e2e_scan):
"""Verify that package list shows aggregated data."""
resp = await e2e_client.get("/api/v1/packages")
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 1
assert any(p["name"] == "test-e2e-pkg" for p in data["packages"])
@pytest.mark.asyncio
async def test_e2e_api_findings_list(self, e2e_client, sample_e2e_scan):
"""Verify that findings list returns all findings."""
resp = await e2e_client.get("/api/v1/findings")
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 2
@pytest.mark.asyncio
async def test_e2e_api_findings_filter_by_rule(self, e2e_client, sample_e2e_scan):
"""Verify that findings can be filtered by rule."""
resp = await e2e_client.get("/api/v1/findings?rule=shady-links")
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 1
assert all(f["rule"] == "shady-links" for f in data["findings"])
class TestWebUiIntegration:
"""E2E tests for web UI integration."""
@pytest.mark.asyncio
async def test_e2e_dashboard_page(self, e2e_client, sample_e2e_scan):
"""Verify that dashboard page renders with data."""
resp = await e2e_client.get("/")
assert resp.status_code == 200
assert "GuardDog Nexus" in resp.text
assert "Dashboard" in resp.text or "Панель" in resp.text
@pytest.mark.asyncio
async def test_e2e_scans_page(self, e2e_client, sample_e2e_scan):
"""Verify that scans page renders with data."""
resp = await e2e_client.get("/scans")
assert resp.status_code == 200
assert "Scans" in resp.text or "Сканирования" in resp.text
assert "test-e2e-pkg" in resp.text
@pytest.mark.asyncio
async def test_e2e_scans_detail_page(self, e2e_client, sample_e2e_scan):
"""Verify that scan detail page shows findings."""
resp = await e2e_client.get(f"/scans/{sample_e2e_scan.id}")
assert resp.status_code == 200
assert "Scan" in resp.text or "Сканирование" in resp.text
assert "shady-links" in resp.text
@pytest.mark.asyncio
async def test_e2e_packages_page(self, e2e_client, sample_e2e_scan):
"""Verify that packages page renders with data."""
resp = await e2e_client.get("/packages")
assert resp.status_code == 200
assert "Packages" in resp.text or "Пакеты" in resp.text
@pytest.mark.asyncio
async def test_e2e_package_detail_page(self, e2e_client, sample_e2e_scan):
"""Verify that package detail page shows all scans and findings."""
resp = await e2e_client.get("/packages/test-e2e-pkg/1.0.0")
assert resp.status_code == 200
assert "test-e2e-pkg" in resp.text
assert "shady-links" in resp.text
class TestHealthAndMetrics:
"""E2E tests for health and metrics endpoints."""
@pytest.mark.asyncio
async def test_e2e_health_endpoint(self, e2e_client):
"""Verify that health endpoint returns status."""
resp = await e2e_client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert "version" in data
@pytest.mark.asyncio
async def test_e2e_metrics_endpoint(self, e2e_client, sample_e2e_scan):
"""Verify that metrics endpoint returns Prometheus format."""
resp = await e2e_client.get("/metrics")
assert resp.status_code == 200
assert "text/plain" in resp.headers["content-type"]
assert "guarddog_scans_total" in resp.text
assert "guarddog_scans_flagged_total" in resp.text
assert "# HELP" in resp.text
assert "# TYPE" in resp.text
@pytest.mark.asyncio
async def test_e2e_health_dependencies_endpoint(self, e2e_client):
"""Verify that dependency health checks work."""
resp = await e2e_client.get("/health/dependencies")
assert resp.status_code in [200, 503] # 503 if Nexus not reachable
data = resp.json()
assert "database" in data
assert "nexus" in data
class TestCsvExport:
"""E2E tests for CSV export functionality."""
@pytest.mark.asyncio
async def test_e2e_scans_csv_export(self, e2e_client, sample_e2e_scan):
"""Verify that scans CSV export works."""
resp = await e2e_client.get("/api/v1/scans/export")
assert resp.status_code == 200
assert "text/csv" in resp.headers["content-type"]
assert "id,package_name" in resp.text
assert "test-e2e-pkg" in resp.text
@pytest.mark.asyncio
async def test_e2e_packages_csv_export(self, e2e_client, sample_e2e_scan):
"""Verify that packages CSV export works."""
resp = await e2e_client.get("/api/v1/packages/export")
assert resp.status_code == 200
assert "text/csv" in resp.headers["content-type"]
assert "name,version" in resp.text
assert "test-e2e-pkg" in resp.text