"""Tests for GuardDog scanner integration.""" import asyncio from unittest.mock import MagicMock, patch import pytest from guarddog_nexus.core.scanner import _normalize_output, scan_package def test_normalize_clean_output(guarddog_output_clean): result = _normalize_output(guarddog_output_clean) assert len(result["findings"]) == 0 assert len(result["errors"]) == 0 def test_normalize_flagged_output(guarddog_output_flagged): result = _normalize_output(guarddog_output_flagged) assert len(result["findings"]) == 3 rules = {f["rule"] for f in result["findings"]} assert "shady-links" in rules assert "exec-base64" in rules assert "empty_information" in rules def test_normalize_skips_null_and_empty_dicts(): data = { "issues": 0, "errors": {}, "results": { "foo": None, "bar": {}, "baz": "metadata finding", }, } result = _normalize_output(data) assert len(result["findings"]) == 1 assert result["findings"][0]["rule"] == "baz" assert result["findings"][0]["message"] == "metadata finding" def test_normalize_semgrep_list(): data = { "issues": 2, "errors": {}, "results": { "code-execution": [ {"message": "Found exec()", "location": "setup.py:10", "severity": "ERROR"}, {"message": "Found eval()", "location": "core.py:5", "severity": "ERROR"}, ], }, } result = _normalize_output(data) assert len(result["findings"]) == 2 assert result["findings"][0]["location"] == "setup.py:10" assert result["findings"][0]["severity"] == "ERROR" # --- scan_package() error paths --- @pytest.mark.asyncio async def test_scan_package_timeout(): with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError): result = await scan_package("/tmp/test.tar.gz", "pypi") assert result["findings"] == [] assert "timeout" in result["errors"][0] @pytest.mark.asyncio async def test_scan_package_binary_not_found(): with patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError): result = await scan_package("/tmp/test.tar.gz", "pypi") assert result["findings"] == [] assert "not_found" in result["errors"][0] @pytest.mark.asyncio async def test_scan_package_invalid_json(): mock_proc = MagicMock() mock_proc.returncode = 0 mock_proc.communicate.return_value = (b"not valid json", b"") with patch("asyncio.create_subprocess_exec", return_value=mock_proc): with patch("asyncio.wait_for", return_value=(b"not valid json", b"")): result = await scan_package("/tmp/test.tar.gz", "pypi") assert result["findings"] == [] assert "json" in result["errors"][0] @pytest.mark.asyncio async def test_scan_package_non_zero_exit(): mock_proc = MagicMock() mock_proc.returncode = 2 with patch("asyncio.create_subprocess_exec", return_value=mock_proc): with patch("asyncio.wait_for", return_value=(b"{}", b"guarddog: corrupted")): result = await scan_package("/tmp/test.tar.gz", "pypi") assert result["findings"] == [] assert "guarddog" in result["errors"][0]