fix: real nexus webhook format, atomic dedup, tested live
This commit is contained in:
@@ -18,55 +18,70 @@ async def test_webhook_rejects_invalid_json(client):
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_ignores_deleted_action(client, sample_nexus_webhook):
|
||||
sample_nexus_webhook["action"] = "DELETED"
|
||||
resp = await client.post(
|
||||
"/webhooks/nexus",
|
||||
json=sample_nexus_webhook,
|
||||
)
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ignored"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_accepts_created(client, sample_nexus_webhook):
|
||||
with patch("guarddog_nexus.webhooks._scan_in_background") as _mock_scan:
|
||||
resp = await client.post(
|
||||
"/webhooks/nexus",
|
||||
json=sample_nexus_webhook,
|
||||
)
|
||||
async def test_webhook_accepts_asset_created(client, sample_nexus_webhook):
|
||||
with patch("guarddog_nexus.webhooks._scan_in_background") as _mock:
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "accepted"
|
||||
assert data["package"] == "requests-2.31.0.tar.gz"
|
||||
assert data["action"] == "CREATED"
|
||||
assert "/packages/requests/2.31.0/requests-2.31.0.tar.gz" in data["asset"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_accepts_updated(client, sample_nexus_webhook):
|
||||
async def test_webhook_accepts_asset_updated(client, sample_nexus_webhook):
|
||||
sample_nexus_webhook["action"] = "UPDATED"
|
||||
with patch("guarddog_nexus.webhooks._scan_in_background") as _mock_scan:
|
||||
resp = await client.post(
|
||||
"/webhooks/nexus",
|
||||
json=sample_nexus_webhook,
|
||||
)
|
||||
with patch("guarddog_nexus.webhooks._scan_in_background") as _mock:
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_skips_metadata_assets(client, sample_nexus_webhook):
|
||||
sample_nexus_webhook["asset"]["name"] = "index.html"
|
||||
sample_nexus_webhook["asset"]["name"] = "/simple/requests/"
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ignored"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_skips_non_package_extension(client, sample_nexus_webhook):
|
||||
sample_nexus_webhook["asset"]["name"] = "/some/path.json"
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ignored"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_no_asset_or_component(client):
|
||||
resp = await client.post(
|
||||
"/webhooks/nexus",
|
||||
json=sample_nexus_webhook,
|
||||
json={"action": "CREATED", "repositoryName": "test"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ignored"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_missing_asset(client):
|
||||
resp = await client.post(
|
||||
"/webhooks/nexus",
|
||||
json={"action": "CREATED", "repositoryName": "test"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
async def test_webhook_accepts_component(client, sample_nexus_component_webhook):
|
||||
with patch("guarddog_nexus.webhooks._scan_component") as _mock:
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_component_webhook)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "accepted"
|
||||
assert data["component"] == "requests==2.31.0"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_component_no_version(client, sample_nexus_component_webhook):
|
||||
sample_nexus_component_webhook["component"]["version"] = ""
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_component_webhook)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ignored"
|
||||
|
||||
Reference in New Issue
Block a user