feat: 31 new tests, metrics LLM counters, Dockerfile caching, Makefile targets, compose limits, code fixes

This commit is contained in:
Marker689
2026-05-11 23:08:09 +03:00
parent 20bf7e6745
commit 18efcf482e
26 changed files with 840 additions and 12 deletions

View File

@@ -247,3 +247,52 @@ async def test_health_no_db_leak(client):
for _ in range(5):
resp = await client.get("/health")
assert resp.status_code == 200
# --- CSV formula injection ---
class TestCsvSafe:
def test_formula_prefixes_escaped(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe("=cmd|'calc'!A0") == "'=cmd|'calc'!A0"
assert _csv_safe("+SUM(1,2)") == "'+SUM(1,2)"
assert _csv_safe("-3+4") == "'-3+4"
assert _csv_safe("@REF(A1)") == "'@REF(A1)"
def test_normal_values_unchanged(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe("requests") == "requests"
assert _csv_safe("2.0.0") == "2.0.0"
def test_empty_string(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe("") == ""
def test_none_passes_through(self):
from guarddog_nexus.routes.api_scans import _csv_safe
assert _csv_safe(None) is None
@pytest.mark.asyncio
async def test_csv_export_escapes_formula_injection(client, db_session):
from guarddog_nexus.db.models import Scan, ScanStatus
scan = Scan(
package_name="=cmd|'calc'!A0",
package_version="1.0",
ecosystem="pypi",
repository="pypi-proxy",
nexus_asset_url="http://nexus:8081/repo/evil-1.0.tar.gz",
status=ScanStatus.COMPLETED.value,
)
db_session.add(scan)
await db_session.commit()
resp = await client.get("/api/v1/scans/export")
assert resp.status_code == 200
assert "'=cmd" in resp.text

34
tests/test_config.py Normal file
View File

@@ -0,0 +1,34 @@
"""Tests for config module — _env_int error path."""
import os
from unittest.mock import patch
def test_env_int_invalid_value_warns_and_returns_default():
from guarddog_nexus.config import _env_int
os.environ["TEST_PORT"] = "notanumber"
with patch("logging.getLogger") as mock_logger:
result = _env_int("TEST_PORT", 42)
assert result == 42
mock_logger.return_value.warning.assert_called_once()
del os.environ["TEST_PORT"]
def test_env_int_missing_returns_default():
from guarddog_nexus.config import _env_int
os.environ.pop("TEST_MISSING", None)
assert _env_int("TEST_MISSING", 99) == 99
def test_env_int_valid_returns_parsed():
from guarddog_nexus.config import _env_int
os.environ["TEST_VALID"] = "8080"
assert _env_int("TEST_VALID", 42) == 8080
del os.environ["TEST_VALID"]

59
tests/test_engine.py Normal file
View File

@@ -0,0 +1,59 @@
"""Tests for database engine — reaping and migrations."""
import pytest
from sqlalchemy import text
@pytest.mark.asyncio
async def test_reap_stale_analysis_resets_stuck_findings(db_session):
from guarddog_nexus.db.models import Finding
stuck = Finding(
scan_id=1,
data={"rule": "test", "severity": "WARNING", "message": "test"},
report={"status": "analyzing"},
)
db_session.add(stuck)
await db_session.commit()
from guarddog_nexus.db.engine import _engine
async with _engine.begin() as conn:
pass # ensure tables exist in _engine too
await db_session.execute(
text(
"UPDATE findings SET report = NULL "
"WHERE report IS NOT NULL "
"AND json_extract(report, '$.status') = 'analyzing'"
)
)
await db_session.commit()
await db_session.refresh(stuck)
assert stuck.report is None
@pytest.mark.asyncio
async def test_reap_stale_analysis_spares_completed_reports(db_session):
from guarddog_nexus.db.models import Finding
valid = Finding(
scan_id=1,
data={"rule": "test", "severity": "WARNING", "message": "test"},
report={"verdict": "safe", "summary": "ok"},
)
db_session.add(valid)
await db_session.commit()
await db_session.execute(
text(
"UPDATE findings SET report = NULL "
"WHERE report IS NOT NULL "
"AND json_extract(report, '$.status') = 'analyzing'"
)
)
await db_session.commit()
await db_session.refresh(valid)
assert valid.report == {"verdict": "safe", "summary": "ok"}

View File

@@ -231,3 +231,29 @@ async def test_harvest_skips_non_package_asset(db_session):
db_session,
)
assert scan is None
# --- Lock cleanup ---
@pytest.mark.asyncio
async def test_cleanup_url_locks_removes_unlocked():
import asyncio
from guarddog_nexus.core.harvester import _url_lock, _url_locks
async with _url_lock:
_url_locks["locked"] = asyncio.Lock()
_url_locks["unlocked"] = asyncio.Lock()
await _url_locks["locked"].acquire()
for key in list(_url_locks.keys()):
if not _url_locks[key].locked():
_url_locks.pop(key, None)
assert "locked" in _url_locks
assert "unlocked" not in _url_locks
_url_locks["locked"].release()
_url_locks.clear()

View File

@@ -230,3 +230,97 @@ async def test_analyze_endpoint_failure(client, sample_finding):
assert "failed" in resp.text.lower()
guarddog_nexus.config.config.llm_enabled = False
# --- GET /analyze polling endpoint ---
class TestAnalyzeStatusEndpoint:
@pytest.mark.asyncio
async def test_status_finding_not_found(self, client):
resp = await client.get("/api/v1/findings/99999/analyze")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_status_returns_report_when_complete(self, client, sample_finding_with_report):
import guarddog_nexus.config
guarddog_nexus.config.config.llm_enabled = True
resp = await client.get(f"/api/v1/findings/{sample_finding_with_report.id}/analyze")
assert resp.status_code == 200
assert "safe" in resp.text
guarddog_nexus.config.config.llm_enabled = False
@pytest.mark.asyncio
async def test_status_returns_spinner_when_no_report(self, client, sample_finding):
import guarddog_nexus.config
guarddog_nexus.config.config.llm_enabled = True
resp = await client.get(f"/api/v1/findings/{sample_finding.id}/analyze")
assert resp.status_code == 200
assert "hx-get" in resp.text.lower()
guarddog_nexus.config.config.llm_enabled = False
@pytest.mark.asyncio
async def test_status_returns_spinner_when_analyzing(self, client, db_session, sample_finding):
from sqlalchemy import select
from guarddog_nexus.db.models import Finding
finding = await db_session.scalar(select(Finding).where(Finding.id == sample_finding.id))
finding.report = {"status": "analyzing"}
await db_session.commit()
resp = await client.get(f"/api/v1/findings/{sample_finding.id}/analyze")
assert resp.status_code == 200
assert "hx-get" in resp.text.lower()
# --- LLM retry exhaustion ---
@pytest.mark.asyncio
async def test_analyze_finding_exhausts_all_retries():
import guarddog_nexus.config
from guarddog_nexus.core.llm import analyze_finding
guarddog_nexus.config.config.llm_api_key = "sk-test"
with patch("guarddog_nexus.core.llm._attempt_llm_call", return_value=None):
with patch("guarddog_nexus.core.llm.asyncio.sleep") as mock_sleep:
result = await analyze_finding({"rule": "test-rule"}, max_retries=2)
assert result is None
assert mock_sleep.call_count == 1
guarddog_nexus.config.config.llm_api_key = ""
# --- LLM lock cleanup ---
@pytest.mark.asyncio
async def test_cleanup_llm_locks_removes_unlocked():
import asyncio
from guarddog_nexus.routes.web import _llm_lock, _llm_locks
async with _llm_lock:
_llm_locks[100] = asyncio.Lock()
_llm_locks[200] = asyncio.Lock()
await _llm_locks[100].acquire()
for key in list(_llm_locks.keys()):
if not _llm_locks[key].locked():
_llm_locks.pop(key, None)
assert 100 in _llm_locks
assert 200 not in _llm_locks
_llm_locks[100].release()
_llm_locks.clear()

View File

@@ -95,3 +95,57 @@ class TestDispatchExtractor:
def test_unknown_ecosystem(self):
assert extract_package_info("/packages/pkg/1.0/file.tar.gz", "unknown") == ("pkg", "1.0")
class TestValidateDownloadUrl:
def test_allowed_hostname_passes(self):
from guarddog_nexus.core.nexus import _validate_download_url
assert _validate_download_url("http://nexus:8081/repository/pkg/foo.tar.gz") is True
assert _validate_download_url("https://nexus:8081/repository/bar") is True
def test_blocked_hostname(self):
from guarddog_nexus.core.nexus import _validate_download_url
assert _validate_download_url("http://evil.com/malware.tar.gz") is False
assert _validate_download_url("https://169.254.169.254/latest/meta-data") is False
def test_non_http_scheme_blocked(self):
from guarddog_nexus.core.nexus import _validate_download_url
assert _validate_download_url("file:///etc/passwd") is False
assert _validate_download_url("ftp://nexus:8081/foo") is False
def test_empty_or_invalid_url_blocked(self):
from guarddog_nexus.core.nexus import _validate_download_url
assert _validate_download_url("") is False
assert _validate_download_url("not-a-valid-url") is False
class TestNpmScopedEdgeCases:
def test_scoped_too_short(self):
from guarddog_nexus.core.nexus import extract_npm_info
assert extract_npm_info("packages/@scope") is None
def test_scoped_no_filename_match(self):
from guarddog_nexus.core.nexus import extract_npm_info
assert extract_npm_info("packages/@scope/name/-/otherfile.tgz") is None
def test_scoped_version_with_hyphens(self):
from guarddog_nexus.core.nexus import extract_npm_info
assert extract_npm_info("packages/@scope/name/-/name-1.0.0-beta.1.tgz") == (
"@scope/name",
"1.0.0-beta.1",
)
def test_scoped_tar_gz_extension(self):
from guarddog_nexus.core.nexus import extract_npm_info
assert extract_npm_info("packages/@scope/name/-/name-1.0.0.tar.gz") == (
"@scope/name",
"1.0.0",
)

78
tests/test_schemas.py Normal file
View File

@@ -0,0 +1,78 @@
"""Tests for schemas and serialize_finding."""
import datetime
from unittest.mock import MagicMock
class TestSerializeFinding:
def test_normal_finding(self):
from guarddog_nexus.schemas import serialize_finding
finding = MagicMock()
finding.id = 42
finding.scan_id = 7
finding.report = {"verdict": "safe"}
finding.created_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
finding.data = {
"rule": "shady-links",
"severity": "WARNING",
"message": "Suspicious URL",
"location": "setup.py:15",
"code": "url = 'http://evil.com'",
}
result = serialize_finding(finding)
assert result["id"] == 42
assert result["scan_id"] == 7
assert result["rule"] == "shady-links"
assert result["severity"] == "WARNING"
assert result["report"] == {"verdict": "safe"}
assert result["created_at"] == "2026-01-01T00:00:00+00:00"
def test_created_at_none(self):
from guarddog_nexus.schemas import serialize_finding
finding = MagicMock()
finding.id = 1
finding.scan_id = 1
finding.report = None
finding.created_at = None
finding.data = {"rule": "test", "message": "msg"}
result = serialize_finding(finding)
assert result["created_at"] is None
assert result["report"] is None
def test_missing_data_fields_default_to_empty_string(self):
from guarddog_nexus.schemas import serialize_finding
finding = MagicMock()
finding.id = 1
finding.scan_id = 1
finding.report = None
finding.created_at = None
finding.data = {"rule": "only-rule"}
result = serialize_finding(finding)
assert result["rule"] == "only-rule"
assert result["severity"] == ""
assert result["message"] == ""
def test_data_values_none_become_empty_strings(self):
from guarddog_nexus.schemas import serialize_finding
finding = MagicMock()
finding.id = 1
finding.scan_id = 1
finding.report = None
finding.created_at = None
finding.data = {"rule": None, "severity": None, "message": None}
result = serialize_finding(finding)
assert result["rule"] == ""
assert result["severity"] == ""
assert result["message"] == ""

View File

@@ -164,3 +164,42 @@ async def test_webhook_invalid_signature(client, sample_nexus_webhook):
assert resp.status_code == 403
guarddog_nexus.config.config.webhook_secret = ""
# --- Unknown ecosystem rejection ---
@pytest.mark.asyncio
async def test_webhook_rejects_unknown_ecosystem_asset(client):
resp = await client.post(
"/webhooks/nexus",
json={
"action": "UPDATED",
"repositoryName": "test-repo",
"asset": {
"format": "maven",
"name": "/packages/test/1.0/test-1.0.tar.gz",
"downloadUrl": "http://nexus:8081/repo/test/1.0/test-1.0.tar.gz",
},
},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ignored"
assert data["reason"] == "unknown_ecosystem"
@pytest.mark.asyncio
async def test_webhook_rejects_unknown_ecosystem_component(client):
resp = await client.post(
"/webhooks/nexus",
json={
"action": "UPDATED",
"repositoryName": "test-repo",
"component": {"format": "maven", "name": "test", "version": "1.0"},
},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ignored"
assert data["reason"] == "unknown_ecosystem"