167 lines
5.4 KiB
Python
167 lines
5.4 KiB
Python
"""Tests for Nexus webhook receiver."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_rejects_invalid_json(client):
|
|
resp = await client.post(
|
|
"/webhooks/nexus",
|
|
content="not json",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_ignores_deleted_action(client, sample_nexus_webhook):
|
|
sample_nexus_webhook["action"] = "DELETED"
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ignored"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_ignores_created_action(client, sample_nexus_webhook):
|
|
sample_nexus_webhook["action"] = "CREATED"
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ignored"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_accepts_asset_updated(client, sample_nexus_webhook):
|
|
sample_nexus_webhook["action"] = "UPDATED"
|
|
with patch("guarddog_nexus.routes.webhooks._scan_in_background") as _mock:
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "accepted"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_skips_metadata_assets(client, sample_nexus_webhook):
|
|
sample_nexus_webhook["asset"]["name"] = "/simple/requests/"
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ignored"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_skips_non_package_extension(client, sample_nexus_webhook):
|
|
sample_nexus_webhook["asset"]["name"] = "/some/path.json"
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ignored"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_no_asset_or_component(client):
|
|
resp = await client.post(
|
|
"/webhooks/nexus",
|
|
json={"action": "CREATED", "repositoryName": "test"},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ignored"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_accepts_component(client, sample_nexus_component_webhook):
|
|
with patch("guarddog_nexus.routes.webhooks._scan_component") as _mock:
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_component_webhook)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "accepted"
|
|
assert data["component"] == "requests==2.31.0"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_component_no_version(client, sample_nexus_component_webhook):
|
|
sample_nexus_component_webhook["component"]["version"] = ""
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_component_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ignored"
|
|
|
|
|
|
# --- Ecosystem detection tests ---
|
|
|
|
|
|
def test_detect_ecosystem_pypi():
|
|
from guarddog_nexus.routes.webhooks import _detect_ecosystem
|
|
|
|
assert _detect_ecosystem({"format": "pypi"}) == "pypi"
|
|
assert _detect_ecosystem({"format": "pip"}) == "pypi"
|
|
assert _detect_ecosystem({"format": "python"}) == "pypi"
|
|
|
|
|
|
def test_detect_ecosystem_go():
|
|
from guarddog_nexus.routes.webhooks import _detect_ecosystem
|
|
|
|
assert _detect_ecosystem({"format": "go"}) == "go"
|
|
assert _detect_ecosystem({"format": "golang"}) == "go"
|
|
|
|
|
|
def test_detect_ecosystem_npm():
|
|
from guarddog_nexus.routes.webhooks import _detect_ecosystem
|
|
|
|
assert _detect_ecosystem({"format": "npm"}) == "npm"
|
|
assert _detect_ecosystem({"format": "node"}) == "npm"
|
|
|
|
|
|
def test_detect_ecosystem_unknown():
|
|
from guarddog_nexus.routes.webhooks import _detect_ecosystem
|
|
|
|
assert _detect_ecosystem({"format": "maven"}) is None
|
|
assert _detect_ecosystem({}) is None
|
|
|
|
|
|
# --- Go/npm webhook integration ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_go_asset(client, sample_nexus_go_webhook):
|
|
with patch("guarddog_nexus.routes.webhooks._scan_in_background") as _mock:
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_go_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "accepted"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_npm_asset(client, sample_nexus_npm_webhook):
|
|
with patch("guarddog_nexus.routes.webhooks._scan_in_background") as _mock:
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_npm_webhook)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "accepted"
|
|
|
|
|
|
# --- Webhook signature validation ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_missing_signature_when_required(client, sample_nexus_webhook):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.webhook_secret = "test-secret"
|
|
|
|
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
|
assert resp.status_code == 401
|
|
|
|
guarddog_nexus.config.config.webhook_secret = ""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_webhook_invalid_signature(client, sample_nexus_webhook):
|
|
import guarddog_nexus.config
|
|
|
|
guarddog_nexus.config.config.webhook_secret = "test-secret"
|
|
|
|
resp = await client.post(
|
|
"/webhooks/nexus",
|
|
json=sample_nexus_webhook,
|
|
headers={"X-Nexus-Webhook-Signature": "badsignature"},
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
guarddog_nexus.config.config.webhook_secret = ""
|