fix: аудит — 19 фиксов безопасности, надёжности, UI и 16 новых тестов
- S4: bump jinja2>=3.1.4, python-multipart>=0.0.18, httpx>=0.28.0
- S5: _detect_ecosystem — DEFAULT_ECOSYSTEM для неизвестных форматов
- S6: harvester — log.exception() вместо log.error()
- S8: _scan_component — urlencode параметров
- P1: scanner — proc.kill() при таймауте
- P3: api_packages — selectinload(Scan.findings), убран N+1
- P4+P5: утечка _url_locks и _llm_locks при early return
- P6: DB reaper — сброс {'status':'analyzing'} при старте
- UI: htmx-пагинация, фильтры не теряют flagged, 404 с layout
- UI: мобильные таблицы overflow-x, полная стата на дашборде
- UI: i18n статусов в _status_badge, urlencode package_name
- 16 новых тестов: analyze endpoint (6), scanner errors (4),
webhook signature (2), llm client (4)
This commit is contained in:
@@ -12,6 +12,7 @@ async def test_health(client):
|
||||
|
||||
# --- Scans ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_scans_empty(client):
|
||||
resp = await client.get("/api/v1/scans")
|
||||
@@ -77,6 +78,7 @@ async def test_scans_csv_export_with_filter(client, sample_flagged_scan):
|
||||
|
||||
# --- Packages ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_packages_empty(client):
|
||||
resp = await client.get("/api/v1/packages")
|
||||
@@ -133,6 +135,7 @@ async def test_package_not_found(client):
|
||||
|
||||
# --- Findings ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_findings_empty(client):
|
||||
resp = await client.get("/api/v1/findings")
|
||||
@@ -163,6 +166,7 @@ async def test_list_findings_with_filters(client, sample_flagged_scan):
|
||||
|
||||
# --- Web UI ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_web_ui_dashboard(client):
|
||||
resp = await client.get("/")
|
||||
|
||||
232
tests/test_llm_analysis.py
Normal file
232
tests/test_llm_analysis.py
Normal file
@@ -0,0 +1,232 @@
|
||||
"""Tests for LLM analysis — endpoint and client."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from guarddog_nexus.db.models import Finding
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def sample_finding(db_session):
|
||||
from guarddog_nexus.constants import SEVERITY_WARNING
|
||||
|
||||
finding = Finding(
|
||||
scan_id=1,
|
||||
data={
|
||||
"rule": "shady-links",
|
||||
"severity": SEVERITY_WARNING,
|
||||
"message": "Suspicious URL",
|
||||
"location": "setup.py:15",
|
||||
"code": "url = 'http://evil.com'",
|
||||
},
|
||||
)
|
||||
db_session.add(finding)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(finding)
|
||||
return finding
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def sample_finding_with_report(db_session):
|
||||
from guarddog_nexus.constants import SEVERITY_WARNING
|
||||
|
||||
report = {
|
||||
"verdict": "safe",
|
||||
"summary": "ok",
|
||||
"analysis": "all good",
|
||||
"severity_rating": "low",
|
||||
}
|
||||
finding = Finding(
|
||||
scan_id=1,
|
||||
data={"rule": "test", "severity": SEVERITY_WARNING, "message": "test"},
|
||||
report=report,
|
||||
)
|
||||
db_session.add(finding)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(finding)
|
||||
return finding
|
||||
|
||||
|
||||
# --- T4: analyze_finding() client ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_finding_no_api_key():
|
||||
from guarddog_nexus.core.llm import analyze_finding
|
||||
|
||||
result = await analyze_finding({"rule": "test", "severity": "WARNING"})
|
||||
assert result is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_finding_timeout():
|
||||
import guarddog_nexus.config
|
||||
from guarddog_nexus.core.llm import analyze_finding
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
||||
guarddog_nexus.config.config.llm_timeout = 1
|
||||
|
||||
import httpx
|
||||
|
||||
with patch("httpx.AsyncClient.post", side_effect=httpx.TimeoutException("timeout")):
|
||||
result = await analyze_finding({"rule": "test", "severity": "WARNING"})
|
||||
assert result is None
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_finding_api_error():
|
||||
import guarddog_nexus.config
|
||||
from guarddog_nexus.core.llm import analyze_finding
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
||||
guarddog_nexus.config.config.llm_timeout = 30
|
||||
|
||||
with patch("httpx.AsyncClient.post", side_effect=Exception("connection refused")):
|
||||
result = await analyze_finding({"rule": "test", "severity": "WARNING"})
|
||||
assert result is None
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_finding_success():
|
||||
import guarddog_nexus.config
|
||||
from guarddog_nexus.core.llm import analyze_finding
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
||||
guarddog_nexus.config.config.llm_timeout = 30
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.raise_for_status.return_value = None
|
||||
mock_resp.json.return_value = {
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"content": '{"verdict":"safe","summary":"ok",'
|
||||
'"analysis":"fine","severity_rating":"low"}',
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
with patch("guarddog_nexus.core.llm.httpx.AsyncClient.post", return_value=mock_resp):
|
||||
result = await analyze_finding({"rule": "test"})
|
||||
assert result is not None
|
||||
assert result["verdict"] == "safe"
|
||||
assert result["severity_rating"] == "low"
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_finding_markdown_unwrap():
|
||||
import guarddog_nexus.config
|
||||
from guarddog_nexus.core.llm import analyze_finding
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = "sk-test"
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.raise_for_status.return_value = None
|
||||
mock_resp.json.return_value = {
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"content": '```json\n{"verdict":"suspicious","summary":"hm",'
|
||||
'"analysis":"...","severity_rating":"medium"}\n```',
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
with patch("guarddog_nexus.core.llm.httpx.AsyncClient.post", return_value=mock_resp):
|
||||
result = await analyze_finding({"rule": "test"})
|
||||
assert result is not None
|
||||
assert result["verdict"] == "suspicious"
|
||||
|
||||
guarddog_nexus.config.config.llm_api_key = ""
|
||||
|
||||
|
||||
# --- T1: analyze_finding_htmx endpoint ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_endpoint_llm_disabled(client, sample_finding):
|
||||
import guarddog_nexus.config
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = False
|
||||
|
||||
resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze")
|
||||
assert resp.status_code == 200
|
||||
assert "disabled" in resp.text.lower()
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_endpoint_not_found(client):
|
||||
import guarddog_nexus.config
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = True
|
||||
|
||||
resp = await client.post("/api/v1/findings/99999/analyze")
|
||||
assert resp.status_code == 404
|
||||
assert "not found" in resp.text.lower()
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_endpoint_idempotent_already_analyzed(client, sample_finding_with_report):
|
||||
import guarddog_nexus.config
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = True
|
||||
|
||||
resp = await client.post(f"/api/v1/findings/{sample_finding_with_report.id}/analyze")
|
||||
assert resp.status_code == 200
|
||||
assert "safe" in resp.text
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_endpoint_success(client, sample_finding):
|
||||
import guarddog_nexus.config
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = True
|
||||
|
||||
fake_report = {
|
||||
"verdict": "malicious",
|
||||
"summary": "bad",
|
||||
"analysis": "evil",
|
||||
"severity_rating": "critical",
|
||||
}
|
||||
|
||||
async def mock_analyze(data):
|
||||
return fake_report
|
||||
|
||||
with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze):
|
||||
resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze")
|
||||
assert resp.status_code == 200
|
||||
assert "malicious" in resp.text
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_endpoint_failure(client, sample_finding):
|
||||
import guarddog_nexus.config
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = True
|
||||
|
||||
async def mock_analyze(data):
|
||||
return None
|
||||
|
||||
with patch("guarddog_nexus.core.llm.analyze_finding", mock_analyze):
|
||||
resp = await client.post(f"/api/v1/findings/{sample_finding.id}/analyze")
|
||||
assert resp.status_code == 200
|
||||
assert "failed" in resp.text.lower()
|
||||
|
||||
guarddog_nexus.config.config.llm_enabled = False
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Tests for Nexus package info extractors."""
|
||||
|
||||
|
||||
from guarddog_nexus.core.nexus import (
|
||||
extract_go_info,
|
||||
extract_npm_info,
|
||||
@@ -37,9 +36,10 @@ class TestGoExtractor:
|
||||
)
|
||||
|
||||
def test_long_module(self):
|
||||
assert extract_go_info(
|
||||
"/packages/github.com/gin-gonic/gin/@v/v1.9.0.zip"
|
||||
) == ("github.com/gin-gonic/gin", "v1.9.0")
|
||||
assert extract_go_info("/packages/github.com/gin-gonic/gin/@v/v1.9.0.zip") == (
|
||||
"github.com/gin-gonic/gin",
|
||||
"v1.9.0",
|
||||
)
|
||||
|
||||
def test_no_at_v(self):
|
||||
assert extract_go_info("packages/some/pkg/v1.0.0.zip") is None
|
||||
@@ -68,21 +68,22 @@ class TestNpmExtractor:
|
||||
|
||||
class TestDispatchExtractor:
|
||||
def test_pypi(self):
|
||||
assert extract_package_info(
|
||||
"/packages/requests/2.31.0/requests-2.31.0.tar.gz", "pypi"
|
||||
) == ("requests", "2.31.0")
|
||||
assert extract_package_info("/packages/requests/2.31.0/requests-2.31.0.tar.gz", "pypi") == (
|
||||
"requests",
|
||||
"2.31.0",
|
||||
)
|
||||
|
||||
def test_go(self):
|
||||
assert extract_package_info(
|
||||
"github.com/gorilla/mux/@v/v1.8.0.zip", "go"
|
||||
) == ("github.com/gorilla/mux", "v1.8.0")
|
||||
assert extract_package_info("github.com/gorilla/mux/@v/v1.8.0.zip", "go") == (
|
||||
"github.com/gorilla/mux",
|
||||
"v1.8.0",
|
||||
)
|
||||
|
||||
def test_npm(self):
|
||||
assert extract_package_info(
|
||||
"packages/lodash/-/lodash-4.17.21.tgz", "npm"
|
||||
) == ("lodash", "4.17.21")
|
||||
assert extract_package_info("packages/lodash/-/lodash-4.17.21.tgz", "npm") == (
|
||||
"lodash",
|
||||
"4.17.21",
|
||||
)
|
||||
|
||||
def test_unknown_ecosystem(self):
|
||||
assert extract_package_info(
|
||||
"/packages/pkg/1.0/file.tar.gz", "unknown"
|
||||
) == ("pkg", "1.0")
|
||||
assert extract_package_info("/packages/pkg/1.0/file.tar.gz", "unknown") == ("pkg", "1.0")
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
"""Tests for GuardDog scanner integration."""
|
||||
|
||||
from guarddog_nexus.core.scanner import _normalize_output
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from guarddog_nexus.core.scanner import _normalize_output, scan_package
|
||||
|
||||
|
||||
def test_normalize_clean_output(guarddog_output_clean):
|
||||
@@ -49,3 +54,47 @@ def test_normalize_semgrep_list():
|
||||
assert len(result["findings"]) == 2
|
||||
assert result["findings"][0]["location"] == "setup.py:10"
|
||||
assert result["findings"][0]["severity"] == "ERROR"
|
||||
|
||||
|
||||
# --- scan_package() error paths ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scan_package_timeout():
|
||||
with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError):
|
||||
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
||||
assert result["findings"] == []
|
||||
assert "timeout" in result["errors"][0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scan_package_binary_not_found():
|
||||
with patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError):
|
||||
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
||||
assert result["findings"] == []
|
||||
assert "not_found" in result["errors"][0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scan_package_invalid_json():
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.returncode = 0
|
||||
mock_proc.communicate.return_value = (b"not valid json", b"")
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
||||
with patch("asyncio.wait_for", return_value=(b"not valid json", b"")):
|
||||
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
||||
assert result["findings"] == []
|
||||
assert "json" in result["errors"][0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scan_package_non_zero_exit():
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.returncode = 2
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
||||
with patch("asyncio.wait_for", return_value=(b"{}", b"guarddog: corrupted")):
|
||||
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
||||
assert result["findings"] == []
|
||||
assert "guarddog" in result["errors"][0]
|
||||
|
||||
@@ -86,6 +86,7 @@ async def test_webhook_component_no_version(client, sample_nexus_component_webho
|
||||
|
||||
# --- Ecosystem detection tests ---
|
||||
|
||||
|
||||
def test_detect_ecosystem_pypi():
|
||||
from guarddog_nexus.routes.webhooks import _detect_ecosystem
|
||||
|
||||
@@ -111,12 +112,13 @@ def test_detect_ecosystem_npm():
|
||||
def test_detect_ecosystem_unknown():
|
||||
from guarddog_nexus.routes.webhooks import _detect_ecosystem
|
||||
|
||||
assert _detect_ecosystem({"format": "maven"}) == "maven"
|
||||
assert _detect_ecosystem({"format": "maven"}) == "pypi" # unknown → default
|
||||
assert _detect_ecosystem({}) == "pypi" # default
|
||||
|
||||
|
||||
# --- Go/npm webhook integration ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_go_asset(client, sample_nexus_go_webhook):
|
||||
with patch("guarddog_nexus.routes.webhooks._scan_in_background") as _mock:
|
||||
@@ -131,3 +133,34 @@ async def test_webhook_npm_asset(client, sample_nexus_npm_webhook):
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_npm_webhook)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
|
||||
|
||||
# --- Webhook signature validation ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_missing_signature_when_required(client, sample_nexus_webhook):
|
||||
import guarddog_nexus.config
|
||||
|
||||
guarddog_nexus.config.config.webhook_secret = "test-secret"
|
||||
|
||||
resp = await client.post("/webhooks/nexus", json=sample_nexus_webhook)
|
||||
assert resp.status_code == 401
|
||||
|
||||
guarddog_nexus.config.config.webhook_secret = ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_invalid_signature(client, sample_nexus_webhook):
|
||||
import guarddog_nexus.config
|
||||
|
||||
guarddog_nexus.config.config.webhook_secret = "test-secret"
|
||||
|
||||
resp = await client.post(
|
||||
"/webhooks/nexus",
|
||||
json=sample_nexus_webhook,
|
||||
headers={"X-Nexus-Webhook-Signature": "badsignature"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
guarddog_nexus.config.config.webhook_secret = ""
|
||||
|
||||
Reference in New Issue
Block a user