- S4: bump jinja2>=3.1.4, python-multipart>=0.0.18, httpx>=0.28.0
- S5: _detect_ecosystem — DEFAULT_ECOSYSTEM для неизвестных форматов
- S6: harvester — log.exception() вместо log.error()
- S8: _scan_component — urlencode параметров
- P1: scanner — proc.kill() при таймауте
- P3: api_packages — selectinload(Scan.findings), убран N+1
- P4+P5: утечка _url_locks и _llm_locks при early return
- P6: DB reaper — сброс {'status':'analyzing'} при старте
- UI: htmx-пагинация, фильтры не теряют flagged, 404 с layout
- UI: мобильные таблицы overflow-x, полная стата на дашборде
- UI: i18n статусов в _status_badge, urlencode package_name
- 16 новых тестов: analyze endpoint (6), scanner errors (4),
webhook signature (2), llm client (4)
101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
"""Tests for GuardDog scanner integration."""
|
|
|
|
import asyncio
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from guarddog_nexus.core.scanner import _normalize_output, scan_package
|
|
|
|
|
|
def test_normalize_clean_output(guarddog_output_clean):
|
|
result = _normalize_output(guarddog_output_clean)
|
|
assert len(result["findings"]) == 0
|
|
assert len(result["errors"]) == 0
|
|
|
|
|
|
def test_normalize_flagged_output(guarddog_output_flagged):
|
|
result = _normalize_output(guarddog_output_flagged)
|
|
assert len(result["findings"]) == 3
|
|
rules = {f["rule"] for f in result["findings"]}
|
|
assert "shady-links" in rules
|
|
assert "exec-base64" in rules
|
|
assert "empty_information" in rules
|
|
|
|
|
|
def test_normalize_skips_null_and_empty_dicts():
|
|
data = {
|
|
"issues": 0,
|
|
"errors": {},
|
|
"results": {
|
|
"foo": None,
|
|
"bar": {},
|
|
"baz": "metadata finding",
|
|
},
|
|
}
|
|
result = _normalize_output(data)
|
|
assert len(result["findings"]) == 1
|
|
assert result["findings"][0]["rule"] == "baz"
|
|
assert result["findings"][0]["message"] == "metadata finding"
|
|
|
|
|
|
def test_normalize_semgrep_list():
|
|
data = {
|
|
"issues": 2,
|
|
"errors": {},
|
|
"results": {
|
|
"code-execution": [
|
|
{"message": "Found exec()", "location": "setup.py:10", "severity": "ERROR"},
|
|
{"message": "Found eval()", "location": "core.py:5", "severity": "ERROR"},
|
|
],
|
|
},
|
|
}
|
|
result = _normalize_output(data)
|
|
assert len(result["findings"]) == 2
|
|
assert result["findings"][0]["location"] == "setup.py:10"
|
|
assert result["findings"][0]["severity"] == "ERROR"
|
|
|
|
|
|
# --- scan_package() error paths ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_package_timeout():
|
|
with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError):
|
|
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
|
assert result["findings"] == []
|
|
assert "timeout" in result["errors"][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_package_binary_not_found():
|
|
with patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError):
|
|
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
|
assert result["findings"] == []
|
|
assert "not_found" in result["errors"][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_package_invalid_json():
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.communicate.return_value = (b"not valid json", b"")
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
|
with patch("asyncio.wait_for", return_value=(b"not valid json", b"")):
|
|
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
|
assert result["findings"] == []
|
|
assert "json" in result["errors"][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_package_non_zero_exit():
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 2
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
|
with patch("asyncio.wait_for", return_value=(b"{}", b"guarddog: corrupted")):
|
|
result = await scan_package("/tmp/test.tar.gz", "pypi")
|
|
assert result["findings"] == []
|
|
assert "guarddog" in result["errors"][0]
|