Files
guarddog-nexus/tests/test_scanner.py
Marker689 1341404568 fix: аудит — 19 фиксов безопасности, надёжности, UI и 16 новых тестов
- S4: bump jinja2>=3.1.4, python-multipart>=0.0.18, httpx>=0.28.0
- S5: _detect_ecosystem — DEFAULT_ECOSYSTEM для неизвестных форматов
- S6: harvester — log.exception() вместо log.error()
- S8: _scan_component — urlencode параметров
- P1: scanner — proc.kill() при таймауте
- P3: api_packages — selectinload(Scan.findings), убран N+1
- P4+P5: утечка _url_locks и _llm_locks при early return
- P6: DB reaper — сброс {'status':'analyzing'} при старте
- UI: htmx-пагинация, фильтры не теряют flagged, 404 с layout
- UI: мобильные таблицы overflow-x, полная стата на дашборде
- UI: i18n статусов в _status_badge, urlencode package_name
- 16 новых тестов: analyze endpoint (6), scanner errors (4),
  webhook signature (2), llm client (4)
2026-05-10 10:45:44 +03:00

101 lines
3.2 KiB
Python

"""Tests for GuardDog scanner integration."""
import asyncio
from unittest.mock import MagicMock, patch
import pytest
from guarddog_nexus.core.scanner import _normalize_output, scan_package
def test_normalize_clean_output(guarddog_output_clean):
result = _normalize_output(guarddog_output_clean)
assert len(result["findings"]) == 0
assert len(result["errors"]) == 0
def test_normalize_flagged_output(guarddog_output_flagged):
result = _normalize_output(guarddog_output_flagged)
assert len(result["findings"]) == 3
rules = {f["rule"] for f in result["findings"]}
assert "shady-links" in rules
assert "exec-base64" in rules
assert "empty_information" in rules
def test_normalize_skips_null_and_empty_dicts():
data = {
"issues": 0,
"errors": {},
"results": {
"foo": None,
"bar": {},
"baz": "metadata finding",
},
}
result = _normalize_output(data)
assert len(result["findings"]) == 1
assert result["findings"][0]["rule"] == "baz"
assert result["findings"][0]["message"] == "metadata finding"
def test_normalize_semgrep_list():
data = {
"issues": 2,
"errors": {},
"results": {
"code-execution": [
{"message": "Found exec()", "location": "setup.py:10", "severity": "ERROR"},
{"message": "Found eval()", "location": "core.py:5", "severity": "ERROR"},
],
},
}
result = _normalize_output(data)
assert len(result["findings"]) == 2
assert result["findings"][0]["location"] == "setup.py:10"
assert result["findings"][0]["severity"] == "ERROR"
# --- scan_package() error paths ---
@pytest.mark.asyncio
async def test_scan_package_timeout():
with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError):
result = await scan_package("/tmp/test.tar.gz", "pypi")
assert result["findings"] == []
assert "timeout" in result["errors"][0]
@pytest.mark.asyncio
async def test_scan_package_binary_not_found():
with patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError):
result = await scan_package("/tmp/test.tar.gz", "pypi")
assert result["findings"] == []
assert "not_found" in result["errors"][0]
@pytest.mark.asyncio
async def test_scan_package_invalid_json():
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.communicate.return_value = (b"not valid json", b"")
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
with patch("asyncio.wait_for", return_value=(b"not valid json", b"")):
result = await scan_package("/tmp/test.tar.gz", "pypi")
assert result["findings"] == []
assert "json" in result["errors"][0]
@pytest.mark.asyncio
async def test_scan_package_non_zero_exit():
mock_proc = MagicMock()
mock_proc.returncode = 2
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
with patch("asyncio.wait_for", return_value=(b"{}", b"guarddog: corrupted")):
result = await scan_package("/tmp/test.tar.gz", "pypi")
assert result["findings"] == []
assert "guarddog" in result["errors"][0]