"""Tests for REST API endpoints.""" import pytest @pytest.mark.asyncio async def test_health(client): resp = await client.get("/health") assert resp.status_code == 200 assert resp.json()["status"] == "ok" # --- Scans --- @pytest.mark.asyncio async def test_list_scans_empty(client): resp = await client.get("/api/v1/scans") assert resp.status_code == 200 data = resp.json() assert data["total"] == 0 assert len(data["scans"]) == 0 @pytest.mark.asyncio async def test_scan_stats_empty(client): resp = await client.get("/api/v1/scans/stats") assert resp.status_code == 200 data = resp.json() assert data["total_scans"] == 0 assert data["flagged_scans"] == 0 @pytest.mark.asyncio async def test_scan_not_found(client): resp = await client.get("/api/v1/scans/99999") assert resp.status_code == 404 @pytest.mark.asyncio async def test_list_scans_with_filters(client): # Filter parameters smoke test — should not 500 for params in [ "?flagged=true&search=test&status=completed&sort_by=id&sort_dir=asc", "?flagged=false&search=nonexistent&sort_by=total_findings", "?sort_by=invalid_key", "?limit=10&offset=0", ]: resp = await client.get(f"/api/v1/scans{params}") assert resp.status_code == 200, f"Failed on: {params}" @pytest.mark.asyncio async def test_scan_stats_with_data(client, sample_flagged_scan): resp = await client.get("/api/v1/scans/stats") assert resp.status_code == 200 data = resp.json() assert data["total_scans"] == 1 assert data["flagged_scans"] == 1 assert data["total_findings"] == 1 @pytest.mark.asyncio async def test_scans_csv_export_empty(client): resp = await client.get("/api/v1/scans/export") assert resp.status_code == 200 assert "text/csv" in resp.headers["content-type"] assert "id,package_name" in resp.text @pytest.mark.asyncio async def test_scans_csv_export_with_filter(client, sample_flagged_scan): resp = await client.get("/api/v1/scans/export?flagged=true") assert resp.status_code == 200 assert sample_flagged_scan.package_name in resp.text # --- Packages --- @pytest.mark.asyncio async def test_list_packages_empty(client): resp = await client.get("/api/v1/packages") assert resp.status_code == 200 data = resp.json() assert data["total"] == 0 @pytest.mark.asyncio async def test_list_packages_with_filters(client): for params in [ "?search=test&sort_by=name&sort_dir=asc", "?flagged=false&sort_by=last_scanned_at", "?ecosystem=pypi", "?sort_by=invalid", ]: resp = await client.get(f"/api/v1/packages{params}") assert resp.status_code == 200, f"Failed on: {params}" @pytest.mark.asyncio async def test_packages_csv_export_empty(client): resp = await client.get("/api/v1/packages/export") assert resp.status_code == 200 assert "text/csv" in resp.headers["content-type"] assert "name,version" in resp.text @pytest.mark.asyncio async def test_packages_csv_export_with_filter(client, sample_flagged_scan): resp = await client.get("/api/v1/packages/export?flagged=true") assert resp.status_code == 200 assert sample_flagged_scan.package_name in resp.text @pytest.mark.asyncio async def test_package_with_data(client, sample_flagged_scan): resp = await client.get( f"/api/v1/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}" ) assert resp.status_code == 200 data = resp.json() assert data["name"] == sample_flagged_scan.package_name assert len(data["scans"]) == 1 assert data["flagged"] is True @pytest.mark.asyncio async def test_package_not_found(client): resp = await client.get("/api/v1/packages/nonexistent/1.0") assert resp.status_code == 404 # --- Findings --- @pytest.mark.asyncio async def test_list_findings_empty(client): resp = await client.get("/api/v1/findings") assert resp.status_code == 200 data = resp.json() assert data["total"] == 0 @pytest.mark.asyncio async def test_list_findings_with_data(client, sample_flagged_scan): resp = await client.get("/api/v1/findings") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert len(data["findings"]) == 1 @pytest.mark.asyncio async def test_list_findings_with_filters(client, sample_flagged_scan): for params in [ f"?scan_id={sample_flagged_scan.id}", "?severity=WARNING", "?rule=test_rule", ]: resp = await client.get(f"/api/v1/findings{params}") assert resp.status_code == 200, f"Failed on: {params}" # --- Web UI --- @pytest.mark.asyncio async def test_web_ui_dashboard(client): resp = await client.get("/") assert resp.status_code == 200 assert "GuardDog Nexus" in resp.text @pytest.mark.asyncio async def test_web_ui_dashboard_stats_fragment(client): resp = await client.get("/dashboard/stats") assert resp.status_code == 200 assert "scans" in resp.text.lower() @pytest.mark.asyncio async def test_web_ui_scans(client): resp = await client.get("/scans") assert resp.status_code == 200 assert "Scans" in resp.text @pytest.mark.asyncio async def test_web_ui_scans_with_search(client): resp = await client.get("/scans?search=nonexistent&status=completed&sort_by=id&sort_dir=asc") assert resp.status_code == 200 @pytest.mark.asyncio async def test_web_ui_scans_page_out_of_range(client): resp = await client.get("/scans?page=999") assert resp.status_code == 200 @pytest.mark.asyncio async def test_web_ui_scan_not_found(client): resp = await client.get("/scans/99999") assert resp.status_code == 404 @pytest.mark.asyncio async def test_web_ui_scan_detail(client, sample_flagged_scan): resp = await client.get(f"/scans/{sample_flagged_scan.id}") assert resp.status_code == 200 assert sample_flagged_scan.package_name in resp.text assert "test_rule" in resp.text @pytest.mark.asyncio async def test_web_ui_packages(client): resp = await client.get("/packages") assert resp.status_code == 200 assert "Packages" in resp.text @pytest.mark.asyncio async def test_web_ui_packages_with_search(client): resp = await client.get("/packages?search=test&sort_by=name&sort_dir=asc") assert resp.status_code == 200 @pytest.mark.asyncio async def test_web_ui_package_not_found(client): resp = await client.get("/packages/nonexistent/1.0") assert resp.status_code == 404 @pytest.mark.asyncio async def test_web_ui_package_detail(client, sample_flagged_scan): resp = await client.get( f"/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}" ) assert resp.status_code == 200 assert sample_flagged_scan.package_name in resp.text assert "test_rule" in resp.text @pytest.mark.asyncio async def test_health_no_db_leak(client): # Rapid health checks should not exhaust connections for _ in range(5): resp = await client.get("/health") assert resp.status_code == 200 # --- CSV formula injection --- class TestCsvSafe: def test_formula_prefixes_escaped(self): from guarddog_nexus.routes.api_scans import _csv_safe assert _csv_safe("=cmd|'calc'!A0") == "'=cmd|'calc'!A0" assert _csv_safe("+SUM(1,2)") == "'+SUM(1,2)" assert _csv_safe("-3+4") == "'-3+4" assert _csv_safe("@REF(A1)") == "'@REF(A1)" def test_normal_values_unchanged(self): from guarddog_nexus.routes.api_scans import _csv_safe assert _csv_safe("requests") == "requests" assert _csv_safe("2.0.0") == "2.0.0" def test_empty_string(self): from guarddog_nexus.routes.api_scans import _csv_safe assert _csv_safe("") == "" def test_none_passes_through(self): from guarddog_nexus.routes.api_scans import _csv_safe assert _csv_safe(None) is None @pytest.mark.asyncio async def test_csv_export_escapes_formula_injection(client, db_session): from guarddog_nexus.db.models import Scan, ScanStatus scan = Scan( package_name="=cmd|'calc'!A0", package_version="1.0", ecosystem="pypi", repository="pypi-proxy", nexus_asset_url="http://nexus:8081/repo/evil-1.0.tar.gz", status=ScanStatus.COMPLETED.value, ) db_session.add(scan) await db_session.commit() resp = await client.get("/api/v1/scans/export") assert resp.status_code == 200 assert "'=cmd" in resp.text