Files
guarddog-nexus/tests/test_api.py

299 lines
8.4 KiB
Python

"""Tests for REST API endpoints."""
import pytest
@pytest.mark.asyncio
async def test_health(client):
resp = await client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
# --- Scans ---
@pytest.mark.asyncio
async def test_list_scans_empty(client):
resp = await client.get("/api/v1/scans")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert len(data["scans"]) == 0
@pytest.mark.asyncio
async def test_scan_stats_empty(client):
resp = await client.get("/api/v1/scans/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total_scans"] == 0
assert data["flagged_scans"] == 0
@pytest.mark.asyncio
async def test_scan_not_found(client):
resp = await client.get("/api/v1/scans/99999")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_list_scans_with_filters(client):
# Filter parameters smoke test — should not 500
for params in [
"?flagged=true&search=test&status=completed&sort_by=id&sort_dir=asc",
"?flagged=false&search=nonexistent&sort_by=total_findings",
"?sort_by=invalid_key",
"?limit=10&offset=0",
]:
resp = await client.get(f"/api/v1/scans{params}")
assert resp.status_code == 200, f"Failed on: {params}"
@pytest.mark.asyncio
async def test_scan_stats_with_data(client, sample_flagged_scan):
resp = await client.get("/api/v1/scans/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total_scans"] == 1
assert data["flagged_scans"] == 1
assert data["total_findings"] == 1
@pytest.mark.asyncio
async def test_scans_csv_export_empty(client):
resp = await client.get("/api/v1/scans/export")
assert resp.status_code == 200
assert "text/csv" in resp.headers["content-type"]
assert "id,package_name" in resp.text
@pytest.mark.asyncio
async def test_scans_csv_export_with_filter(client, sample_flagged_scan):
resp = await client.get("/api/v1/scans/export?flagged=true")
assert resp.status_code == 200
assert sample_flagged_scan.package_name in resp.text
# --- Packages ---
@pytest.mark.asyncio
async def test_list_packages_empty(client):
resp = await client.get("/api/v1/packages")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
@pytest.mark.asyncio
async def test_list_packages_with_filters(client):
for params in [
"?search=test&sort_by=name&sort_dir=asc",
"?flagged=false&sort_by=last_scanned_at",
"?ecosystem=pypi",
"?sort_by=invalid",
]:
resp = await client.get(f"/api/v1/packages{params}")
assert resp.status_code == 200, f"Failed on: {params}"
@pytest.mark.asyncio
async def test_packages_csv_export_empty(client):
resp = await client.get("/api/v1/packages/export")
assert resp.status_code == 200
assert "text/csv" in resp.headers["content-type"]
assert "name,version" in resp.text
@pytest.mark.asyncio
async def test_packages_csv_export_with_filter(client, sample_flagged_scan):
resp = await client.get("/api/v1/packages/export?flagged=true")
assert resp.status_code == 200
assert sample_flagged_scan.package_name in resp.text
@pytest.mark.asyncio
async def test_package_with_data(client, sample_flagged_scan):
resp = await client.get(
f"/api/v1/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}"
)
assert resp.status_code == 200
data = resp.json()
assert data["name"] == sample_flagged_scan.package_name
assert len(data["scans"]) == 1
assert data["flagged"] is True
@pytest.mark.asyncio
async def test_package_not_found(client):
resp = await client.get("/api/v1/packages/nonexistent/1.0")
assert resp.status_code == 404
# --- Findings ---
@pytest.mark.asyncio
async def test_list_findings_empty(client):
resp = await client.get("/api/v1/findings")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
@pytest.mark.asyncio
async def test_list_findings_with_data(client, sample_flagged_scan):
resp = await client.get("/api/v1/findings")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert len(data["findings"]) == 1
@pytest.mark.asyncio
async def test_list_findings_with_filters(client, sample_flagged_scan):
for params in [
f"?scan_id={sample_flagged_scan.id}",
"?severity=WARNING",
"?rule=test_rule",
]:
resp = await client.get(f"/api/v1/findings{params}")
assert resp.status_code == 200, f"Failed on: {params}"
# --- Web UI ---
@pytest.mark.asyncio
async def test_web_ui_dashboard(client):
resp = await client.get("/")
assert resp.status_code == 200
assert "GuardDog Nexus" in resp.text
@pytest.mark.asyncio
async def test_web_ui_dashboard_stats_fragment(client):
resp = await client.get("/dashboard/stats")
assert resp.status_code == 200
assert "scans" in resp.text.lower()
@pytest.mark.asyncio
async def test_web_ui_scans(client):
resp = await client.get("/scans")
assert resp.status_code == 200
assert "Scans" in resp.text
@pytest.mark.asyncio
async def test_web_ui_scans_with_search(client):
resp = await client.get("/scans?search=nonexistent&status=completed&sort_by=id&sort_dir=asc")
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_web_ui_scans_page_out_of_range(client):
resp = await client.get("/scans?page=999")
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_web_ui_scan_not_found(client):
resp = await client.get("/scans/99999")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_web_ui_scan_detail(client, sample_flagged_scan):
resp = await client.get(f"/scans/{sample_flagged_scan.id}")
assert resp.status_code == 200
assert sample_flagged_scan.package_name in resp.text
assert "test_rule" in resp.text
@pytest.mark.asyncio
async def test_web_ui_packages(client):
resp = await client.get("/packages")
assert resp.status_code == 200
assert "Packages" in resp.text
@pytest.mark.asyncio
async def test_web_ui_packages_with_search(client):
resp = await client.get("/packages?search=test&sort_by=name&sort_dir=asc")
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_web_ui_package_not_found(client):
resp = await client.get("/packages/nonexistent/1.0")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_web_ui_package_detail(client, sample_flagged_scan):
resp = await client.get(
f"/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}"
)
assert resp.status_code == 200
assert sample_flagged_scan.package_name in resp.text
assert "test_rule" in resp.text
@pytest.mark.asyncio
async def test_health_no_db_leak(client):
# Rapid health checks should not exhaust connections
for _ in range(5):
resp = await client.get("/health")
assert resp.status_code == 200
# --- CSV formula injection ---
class TestCsvSafe:
def test_formula_prefixes_escaped(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe("=cmd|'calc'!A0") == "'=cmd|'calc'!A0"
assert _csv_safe("+SUM(1,2)") == "'+SUM(1,2)"
assert _csv_safe("-3+4") == "'-3+4"
assert _csv_safe("@REF(A1)") == "'@REF(A1)"
def test_normal_values_unchanged(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe("requests") == "requests"
assert _csv_safe("2.0.0") == "2.0.0"
def test_empty_string(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe("") == ""
def test_none_passes_through(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe(None) is None
@pytest.mark.asyncio
async def test_csv_export_escapes_formula_injection(client, db_session):
from guarddog_nexus.db.models import Scan, ScanStatus
scan = Scan(
package_name="=cmd|'calc'!A0",
package_version="1.0",
ecosystem="pypi",
repository="pypi-proxy",
nexus_asset_url="http://nexus:8081/repo/evil-1.0.tar.gz",
status=ScanStatus.COMPLETED.value,
)
db_session.add(scan)
await db_session.commit()
resp = await client.get("/api/v1/scans/export")
assert resp.status_code == 200
assert "'=cmd" in resp.text