- Все 18 роутов получили return type annotations
- Создан schemas.py с Pydantic-моделями (ScanOut, PackageOut, FindingOut, ...)
- API-роуты: response_model на list/detail/export/stats
- 404 через HTTPException(404) вместо {'detail':'Not found'} (200)
- RequestLoggingMiddleware: method, path, status, duration_ms
- Глобальный exception handler: ловит необработанные исключения → 500
- _parse_flagged(): вынесен дублирующийся string→bool
- parse_package_path(): общий для web.py и api_packages.py
- selectinload: вынесены в top-level imports
- harvester: makedirs/mkdtemp/rmtree обёрнуты в asyncio.to_thread()
250 lines
7.0 KiB
Python
250 lines
7.0 KiB
Python
"""Tests for REST API endpoints."""
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health(client):
|
|
resp = await client.get("/health")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ok"
|
|
|
|
|
|
# --- Scans ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_scans_empty(client):
|
|
resp = await client.get("/api/v1/scans")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
assert len(data["scans"]) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_stats_empty(client):
|
|
resp = await client.get("/api/v1/scans/stats")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total_scans"] == 0
|
|
assert data["flagged_scans"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_not_found(client):
|
|
resp = await client.get("/api/v1/scans/99999")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_scans_with_filters(client):
|
|
# Filter parameters smoke test — should not 500
|
|
for params in [
|
|
"?flagged=true&search=test&status=completed&sort_by=id&sort_dir=asc",
|
|
"?flagged=false&search=nonexistent&sort_by=total_findings",
|
|
"?sort_by=invalid_key",
|
|
"?limit=10&offset=0",
|
|
]:
|
|
resp = await client.get(f"/api/v1/scans{params}")
|
|
assert resp.status_code == 200, f"Failed on: {params}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_stats_with_data(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/scans/stats")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total_scans"] == 1
|
|
assert data["flagged_scans"] == 1
|
|
assert data["total_findings"] == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scans_csv_export_empty(client):
|
|
resp = await client.get("/api/v1/scans/export")
|
|
assert resp.status_code == 200
|
|
assert "text/csv" in resp.headers["content-type"]
|
|
assert "id,package_name" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scans_csv_export_with_filter(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/scans/export?flagged=true")
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
|
|
|
|
# --- Packages ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_packages_empty(client):
|
|
resp = await client.get("/api/v1/packages")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_packages_with_filters(client):
|
|
for params in [
|
|
"?search=test&sort_by=name&sort_dir=asc",
|
|
"?flagged=false&sort_by=last_scanned_at",
|
|
"?ecosystem=pypi",
|
|
"?sort_by=invalid",
|
|
]:
|
|
resp = await client.get(f"/api/v1/packages{params}")
|
|
assert resp.status_code == 200, f"Failed on: {params}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_packages_csv_export_empty(client):
|
|
resp = await client.get("/api/v1/packages/export")
|
|
assert resp.status_code == 200
|
|
assert "text/csv" in resp.headers["content-type"]
|
|
assert "name,version" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_packages_csv_export_with_filter(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/packages/export?flagged=true")
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_package_with_data(client, sample_flagged_scan):
|
|
resp = await client.get(
|
|
f"/api/v1/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}"
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == sample_flagged_scan.package_name
|
|
assert len(data["scans"]) == 1
|
|
assert data["flagged"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_package_not_found(client):
|
|
resp = await client.get("/api/v1/packages/nonexistent/1.0")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# --- Findings ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_findings_empty(client):
|
|
resp = await client.get("/api/v1/findings")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_findings_with_data(client, sample_flagged_scan):
|
|
resp = await client.get("/api/v1/findings")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert len(data["findings"]) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_findings_with_filters(client, sample_flagged_scan):
|
|
for params in [
|
|
f"?scan_id={sample_flagged_scan.id}",
|
|
"?severity=WARNING",
|
|
"?rule=test_rule",
|
|
]:
|
|
resp = await client.get(f"/api/v1/findings{params}")
|
|
assert resp.status_code == 200, f"Failed on: {params}"
|
|
|
|
|
|
# --- Web UI ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_dashboard(client):
|
|
resp = await client.get("/")
|
|
assert resp.status_code == 200
|
|
assert "GuardDog Nexus" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_dashboard_stats_fragment(client):
|
|
resp = await client.get("/dashboard/stats")
|
|
assert resp.status_code == 200
|
|
assert "scans" in resp.text.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scans(client):
|
|
resp = await client.get("/scans")
|
|
assert resp.status_code == 200
|
|
assert "Scans" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scans_with_search(client):
|
|
resp = await client.get("/scans?search=nonexistent&status=completed&sort_by=id&sort_dir=asc")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scans_page_out_of_range(client):
|
|
resp = await client.get("/scans?page=999")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scan_not_found(client):
|
|
resp = await client.get("/scans/99999")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_scan_detail(client, sample_flagged_scan):
|
|
resp = await client.get(f"/scans/{sample_flagged_scan.id}")
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
assert "test_rule" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_packages(client):
|
|
resp = await client.get("/packages")
|
|
assert resp.status_code == 200
|
|
assert "Packages" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_packages_with_search(client):
|
|
resp = await client.get("/packages?search=test&sort_by=name&sort_dir=asc")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_package_not_found(client):
|
|
resp = await client.get("/packages/nonexistent/1.0")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_ui_package_detail(client, sample_flagged_scan):
|
|
resp = await client.get(
|
|
f"/packages/{sample_flagged_scan.package_name}/{sample_flagged_scan.package_version}"
|
|
)
|
|
assert resp.status_code == 200
|
|
assert sample_flagged_scan.package_name in resp.text
|
|
assert "test_rule" in resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_no_db_leak(client):
|
|
# Rapid health checks should not exhaust connections
|
|
for _ in range(5):
|
|
resp = await client.get("/health")
|
|
assert resp.status_code == 200
|