299 lines
8.4 KiB
Python
299 lines
8.4 KiB
Python
"""Tests for REST API endpoints."""
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health(client):
|
|
resp = await client.get("/health")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ok"
|
|
|
|
|
|
# --- Scans ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_scans_empty(client):
|
|
resp = await client.get("/api/v1/scans")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
assert len(data["scans"]) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_stats_empty(client):
|
|
resp = await client.get("/api/v1/scans/stats")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total_scans"] == 0
|
|
assert data["flagged_scans"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_not_found(client):
|
|
resp = await client.get("/api/v1/scans/99999")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_scans_with_filters(client):
|
|
# Filter parameters smoke test — should not 500
|
|
for params in [
|
|
"?flagged=true&search=test&status=completed&sort_by=id&sort_dir=asc",
|
|
"?flagged=false&search=nonexistent&sort_by=total_findings",
|
|
"?sort_by=invalid_key",
|
|
"?limit=10&offset=0",
|
|
]:
|
|
resp = await client.get(f"/api/v1/scans{params}")
|
|
assert resp.status_code == 200, f"Failed on: {params}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_stats_with_data(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/scans/stats")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total_scans"] == 1
|
|
assert data["flagged_scans"] == 1
|
|
assert data["total_findings"] == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scans_csv_export_empty(client):
|
|
resp = await client.get("/api/v1/scans/export")
|
|
assert resp.status_code == 200
|
|
assert "text/csv" in resp.headers["content-type"]
|
|
assert "id,package_name" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scans_csv_export_with_filter(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/scans/export?flagged=true")
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
|
|
|
|
# --- Packages ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_packages_empty(client):
|
|
resp = await client.get("/api/v1/packages")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_packages_with_filters(client):
|
|
for params in [
|
|
"?search=test&sort_by=name&sort_dir=asc",
|
|
"?flagged=false&sort_by=last_scanned_at",
|
|
"?ecosystem=pypi",
|
|
"?sort_by=invalid",
|
|
]:
|
|
resp = await client.get(f"/api/v1/packages{params}")
|
|
assert resp.status_code == 200, f"Failed on: {params}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_packages_csv_export_empty(client):
|
|
resp = await client.get("/api/v1/packages/export")
|
|
assert resp.status_code == 200
|
|
assert "text/csv" in resp.headers["content-type"]
|
|
assert "name,version" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_packages_csv_export_with_filter(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/packages/export?flagged=true")
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_package_with_data(client, sample_flagged_scan):
|
|
resp = await client.get(
|
|
f"/api/v1/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}"
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == sample_flagged_scan.package_name
|
|
assert len(data["scans"]) == 1
|
|
assert data["flagged"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_package_not_found(client):
|
|
resp = await client.get("/api/v1/packages/nonexistent/1.0")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# --- Findings ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_findings_empty(client):
|
|
resp = await client.get("/api/v1/findings")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_findings_with_data(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/findings")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert len(data["findings"]) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_findings_with_filters(client, sample_flagged_scan):
|
|
for params in [
|
|
f"?scan_id={sample_flagged_scan.id}",
|
|
"?severity=WARNING",
|
|
"?rule=test_rule",
|
|
]:
|
|
resp = await client.get(f"/api/v1/findings{params}")
|
|
assert resp.status_code == 200, f"Failed on: {params}"
|
|
|
|
|
|
# --- Web UI ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_dashboard(client):
|
|
resp = await client.get("/")
|
|
assert resp.status_code == 200
|
|
assert "GuardDog Nexus" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_dashboard_stats_fragment(client):
|
|
resp = await client.get("/dashboard/stats")
|
|
assert resp.status_code == 200
|
|
assert "scans" in resp.text.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scans(client):
|
|
resp = await client.get("/scans")
|
|
assert resp.status_code == 200
|
|
assert "Scans" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scans_with_search(client):
|
|
resp = await client.get("/scans?search=nonexistent&status=completed&sort_by=id&sort_dir=asc")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scans_page_out_of_range(client):
|
|
resp = await client.get("/scans?page=999")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scan_not_found(client):
|
|
resp = await client.get("/scans/99999")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scan_detail(client, sample_flagged_scan):
|
|
resp = await client.get(f"/scans/{sample_flagged_scan.id}")
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
assert "test_rule" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_packages(client):
|
|
resp = await client.get("/packages")
|
|
assert resp.status_code == 200
|
|
assert "Packages" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_packages_with_search(client):
|
|
resp = await client.get("/packages?search=test&sort_by=name&sort_dir=asc")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_package_not_found(client):
|
|
resp = await client.get("/packages/nonexistent/1.0")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_package_detail(client, sample_flagged_scan):
|
|
resp = await client.get(
|
|
f"/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}"
|
|
)
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
assert "test_rule" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_no_db_leak(client):
|
|
# Rapid health checks should not exhaust connections
|
|
for _ in range(5):
|
|
resp = await client.get("/health")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
# --- CSV formula injection ---
|
|
|
|
|
|
class TestCsvSafe:
|
|
def test_formula_prefixes_escaped(self):
|
|
from guarddog_nexus.routes.api_scans import _csv_safe
|
|
|
|
assert _csv_safe("=cmd|'calc'!A0") == "'=cmd|'calc'!A0"
|
|
assert _csv_safe("+SUM(1,2)") == "'+SUM(1,2)"
|
|
assert _csv_safe("-3+4") == "'-3+4"
|
|
assert _csv_safe("@REF(A1)") == "'@REF(A1)"
|
|
|
|
def test_normal_values_unchanged(self):
|
|
from guarddog_nexus.routes.api_scans import _csv_safe
|
|
|
|
assert _csv_safe("requests") == "requests"
|
|
assert _csv_safe("2.0.0") == "2.0.0"
|
|
|
|
def test_empty_string(self):
|
|
from guarddog_nexus.routes.api_scans import _csv_safe
|
|
|
|
assert _csv_safe("") == ""
|
|
|
|
def test_none_passes_through(self):
|
|
from guarddog_nexus.routes.api_scans import _csv_safe
|
|
|
|
assert _csv_safe(None) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_csv_export_escapes_formula_injection(client, db_session):
|
|
from guarddog_nexus.db.models import Scan, ScanStatus
|
|
|
|
scan = Scan(
|
|
package_name="=cmd|'calc'!A0",
|
|
package_version="1.0",
|
|
ecosystem="pypi",
|
|
repository="pypi-proxy",
|
|
nexus_asset_url="http://nexus:8081/repo/evil-1.0.tar.gz",
|
|
status=ScanStatus.COMPLETED.value,
|
|
)
|
|
db_session.add(scan)
|
|
await db_session.commit()
|
|
|
|
resp = await client.get("/api/v1/scans/export")
|
|
assert resp.status_code == 200
|
|
assert "'=cmd" in resp.text
|