309 lines
12 KiB
Python
309 lines
12 KiB
Python
"""E2E tests for the complete webhook-to-scan flow."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
|
|
class TestWebhookToScanFlow:
|
|
"""End-to-end tests for the complete webhook to scan pipeline."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_webhook_triggers_scan_creation(
|
|
self, e2e_client, e2e_webhook_payload, e2e_db_session
|
|
):
|
|
"""Verify that receiving a webhook triggers scan creation."""
|
|
|
|
# Mock the scan to avoid actual download and GuardDog execution
|
|
async def mock_harvest(*args, **kwargs):
|
|
from guarddog_nexus.db.models import Scan, ScanStatus
|
|
|
|
scan = Scan(
|
|
package_name="e2e-test-pkg",
|
|
package_version="1.0.0",
|
|
ecosystem="pypi",
|
|
repository="pypi-proxy",
|
|
nexus_asset_url=args[0],
|
|
status=ScanStatus.COMPLETED.value,
|
|
total_findings=0,
|
|
flagged=False,
|
|
)
|
|
e2e_db_session.add(scan)
|
|
await e2e_db_session.commit()
|
|
await e2e_db_session.refresh(scan)
|
|
return scan
|
|
|
|
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
|
|
resp = await e2e_client.post("/webhooks/nexus", json=e2e_webhook_payload)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "accepted"
|
|
assert "e2e-test-pkg" in data.get("asset", "")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_webhook_accepts_go_asset(
|
|
self, e2e_client, e2e_go_webhook_payload, e2e_db_session
|
|
):
|
|
"""Verify that Go assets are accepted and processed."""
|
|
|
|
async def mock_harvest(*args, **kwargs):
|
|
from guarddog_nexus.db.models import Scan, ScanStatus
|
|
|
|
scan = Scan(
|
|
package_name="github.com/e2e/test-go",
|
|
package_version="v1.0.0",
|
|
ecosystem="go",
|
|
repository="go-proxy",
|
|
nexus_asset_url=args[0],
|
|
status=ScanStatus.COMPLETED.value,
|
|
total_findings=0,
|
|
flagged=False,
|
|
)
|
|
e2e_db_session.add(scan)
|
|
await e2e_db_session.commit()
|
|
await e2e_db_session.refresh(scan)
|
|
return scan
|
|
|
|
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
|
|
resp = await e2e_client.post("/webhooks/nexus", json=e2e_go_webhook_payload)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "accepted"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_webhook_accepts_npm_asset(
|
|
self, e2e_client, e2e_npm_webhook_payload, e2e_db_session
|
|
):
|
|
"""Verify that npm assets are accepted and processed."""
|
|
|
|
async def mock_harvest(*args, **kwargs):
|
|
from guarddog_nexus.db.models import Scan, ScanStatus
|
|
|
|
scan = Scan(
|
|
package_name="e2e-test-npm",
|
|
package_version="1.0.0",
|
|
ecosystem="npm",
|
|
repository="npm-proxy",
|
|
nexus_asset_url=args[0],
|
|
status=ScanStatus.COMPLETED.value,
|
|
total_findings=0,
|
|
flagged=False,
|
|
)
|
|
e2e_db_session.add(scan)
|
|
await e2e_db_session.commit()
|
|
await e2e_db_session.refresh(scan)
|
|
return scan
|
|
|
|
with patch("guarddog_nexus.routes.webhooks._scan_in_background", mock_harvest):
|
|
resp = await e2e_client.post("/webhooks/nexus", json=e2e_npm_webhook_payload)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "accepted"
|
|
|
|
|
|
class TestWebhookSignatureValidation:
|
|
"""E2E tests for webhook signature validation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_webhook_with_valid_signature(self, e2e_client, e2e_webhook_payload):
|
|
"""Verify that webhooks with valid signatures are accepted."""
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
|
|
from guarddog_nexus.config import config
|
|
|
|
original_secret = config.webhook_secret
|
|
config.webhook_secret = "test-secret"
|
|
|
|
# Calculate valid signature
|
|
payload_bytes = json.dumps(e2e_webhook_payload).encode("utf-8")
|
|
signature = hmac.new(b"test-secret", payload_bytes, hashlib.sha256).hexdigest()
|
|
|
|
resp = await e2e_client.post(
|
|
"/webhooks/nexus",
|
|
content=payload_bytes,
|
|
headers={"X-Nexus-Webhook-Signature": signature, "Content-Type": "application/json"},
|
|
)
|
|
|
|
# Should be accepted (signature matches)
|
|
assert resp.status_code == 200
|
|
|
|
config.webhook_secret = original_secret
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_webhook_with_invalid_signature(self, e2e_client, e2e_webhook_payload):
|
|
"""Verify that webhooks with invalid signatures are rejected."""
|
|
import json
|
|
|
|
from guarddog_nexus.config import config
|
|
|
|
original_secret = config.webhook_secret
|
|
config.webhook_secret = "test-secret"
|
|
|
|
payload_bytes = json.dumps(e2e_webhook_payload).encode("utf-8")
|
|
resp = await e2e_client.post(
|
|
"/webhooks/nexus",
|
|
content=payload_bytes,
|
|
headers={
|
|
"X-Nexus-Webhook-Signature": "invalid-signature",
|
|
"Content-Type": "application/json",
|
|
},
|
|
)
|
|
|
|
# Should be rejected when secret is set
|
|
assert resp.status_code == 403
|
|
|
|
config.webhook_secret = original_secret
|
|
|
|
|
|
class TestApiIntegration:
|
|
"""E2E tests for API endpoint integration with database."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_api_scan_list_with_data(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that API returns scan data from database."""
|
|
resp = await e2e_client.get("/api/v1/scans")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] >= 1
|
|
assert len(data["scans"]) >= 1
|
|
assert data["scans"][0]["package_name"] == "test-e2e-pkg"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_api_scan_detail_with_findings(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that scan detail includes findings."""
|
|
resp = await e2e_client.get(f"/api/v1/scans/{sample_e2e_scan.id}")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["package_name"] == "test-e2e-pkg"
|
|
assert len(data["findings"]) == 2
|
|
assert data["total_findings"] == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_api_package_list(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that package list shows aggregated data."""
|
|
resp = await e2e_client.get("/api/v1/packages")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] >= 1
|
|
assert any(p["name"] == "test-e2e-pkg" for p in data["packages"])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_api_findings_list(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that findings list returns all findings."""
|
|
resp = await e2e_client.get("/api/v1/findings")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] >= 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_api_findings_filter_by_rule(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that findings can be filtered by rule."""
|
|
resp = await e2e_client.get("/api/v1/findings?rule=shady-links")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] >= 1
|
|
assert all(f["rule"] == "shady-links" for f in data["findings"])
|
|
|
|
|
|
class TestWebUiIntegration:
|
|
"""E2E tests for web UI integration."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_dashboard_page(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that dashboard page renders with data."""
|
|
resp = await e2e_client.get("/")
|
|
assert resp.status_code == 200
|
|
assert "GuardDog Nexus" in resp.text
|
|
assert "Dashboard" in resp.text or "Панель" in resp.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scans_page(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that scans page renders with data."""
|
|
resp = await e2e_client.get("/scans")
|
|
assert resp.status_code == 200
|
|
assert "Scans" in resp.text or "Сканирования" in resp.text
|
|
assert "test-e2e-pkg" in resp.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scans_detail_page(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that scan detail page shows findings."""
|
|
resp = await e2e_client.get(f"/scans/{sample_e2e_scan.id}")
|
|
assert resp.status_code == 200
|
|
assert "Scan" in resp.text or "Сканирование" in resp.text
|
|
assert "shady-links" in resp.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_packages_page(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that packages page renders with data."""
|
|
resp = await e2e_client.get("/packages")
|
|
assert resp.status_code == 200
|
|
assert "Packages" in resp.text or "Пакеты" in resp.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_package_detail_page(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that package detail page shows all scans and findings."""
|
|
resp = await e2e_client.get("/packages/test-e2e-pkg/1.0.0")
|
|
assert resp.status_code == 200
|
|
assert "test-e2e-pkg" in resp.text
|
|
assert "shady-links" in resp.text
|
|
|
|
|
|
class TestHealthAndMetrics:
|
|
"""E2E tests for health and metrics endpoints."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_health_endpoint(self, e2e_client):
|
|
"""Verify that health endpoint returns status."""
|
|
resp = await e2e_client.get("/health")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "ok"
|
|
assert "version" in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_metrics_endpoint(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that metrics endpoint returns Prometheus format."""
|
|
resp = await e2e_client.get("/metrics")
|
|
assert resp.status_code == 200
|
|
assert "text/plain" in resp.headers["content-type"]
|
|
assert "guarddog_scans_total" in resp.text
|
|
assert "guarddog_scans_flagged_total" in resp.text
|
|
assert "# HELP" in resp.text
|
|
assert "# TYPE" in resp.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_health_dependencies_endpoint(self, e2e_client):
|
|
"""Verify that dependency health checks work."""
|
|
resp = await e2e_client.get("/health/dependencies")
|
|
assert resp.status_code in [200, 503] # 503 if Nexus not reachable
|
|
data = resp.json()
|
|
assert "database" in data
|
|
assert "nexus" in data
|
|
|
|
|
|
class TestCsvExport:
|
|
"""E2E tests for CSV export functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_scans_csv_export(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that scans CSV export works."""
|
|
resp = await e2e_client.get("/api/v1/scans/export")
|
|
assert resp.status_code == 200
|
|
assert "text/csv" in resp.headers["content-type"]
|
|
assert "id,package_name" in resp.text
|
|
assert "test-e2e-pkg" in resp.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_e2e_packages_csv_export(self, e2e_client, sample_e2e_scan):
|
|
"""Verify that packages CSV export works."""
|
|
resp = await e2e_client.get("/api/v1/packages/export")
|
|
assert resp.status_code == 200
|
|
assert "text/csv" in resp.headers["content-type"]
|
|
assert "name,version" in resp.text
|
|
assert "test-e2e-pkg" in resp.text
|