136 lines
4.3 KiB
Python
136 lines
4.3 KiB
Python
"""Tests for harvester pipeline."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from sqlalchemy import select
|
|
|
|
from guarddog_nexus.harvester import harvest
|
|
from guarddog_nexus.models import Finding
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_new_package(db_session, guarddog_normalized_flagged):
|
|
with (
|
|
patch("guarddog_nexus.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test-package.tar.gz"
|
|
mock_sha.return_value = "abc123"
|
|
mock_scan.return_value = guarddog_normalized_flagged
|
|
|
|
scan = await harvest(
|
|
download_url="http://nexus:8081/repository/pypi-proxy/packages/requests/2.31.0/requests-2.31.0.tar.gz",
|
|
repository="pypi-proxy",
|
|
format_="pypi",
|
|
asset_path="packages/requests/2.31.0/requests-2.31.0.tar.gz",
|
|
session=db_session,
|
|
)
|
|
|
|
assert scan is not None
|
|
assert scan.package_name == "requests"
|
|
assert scan.package_version == "2.31.0"
|
|
assert scan.ecosystem == "pypi"
|
|
assert scan.status == "completed"
|
|
assert scan.flagged is True
|
|
assert scan.total_findings == 3
|
|
assert scan.sha256 == "abc123"
|
|
|
|
findings = (
|
|
(await db_session.execute(select(Finding).where(Finding.scan_id == scan.id)))
|
|
.scalars()
|
|
.all()
|
|
)
|
|
assert len(findings) == 3
|
|
rules = {f.data["rule"] for f in findings}
|
|
assert "shady-links" in rules
|
|
# Check code is preserved
|
|
for f in findings:
|
|
if f.data["rule"] == "shady-links":
|
|
assert f.data["code"] == "url = 'http://evil.com'"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_skips_duplicate(db_session, guarddog_normalized_flagged):
|
|
with (
|
|
patch("guarddog_nexus.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test.tar.gz"
|
|
mock_sha.return_value = "abc"
|
|
mock_scan.return_value = guarddog_normalized_flagged
|
|
|
|
first = await harvest(
|
|
"http://nexus:8081/repo/pypi-proxy/packages/x/1.0/x-1.0.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/x/1.0/x-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
second = await harvest(
|
|
"http://nexus:8081/repo/pypi-proxy/packages/x/1.0/x-1.0.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/x/1.0/x-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
|
|
assert first is not None
|
|
assert second is None # skipped duplicate
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_clean_package(db_session, guarddog_normalized_clean):
|
|
with (
|
|
patch("guarddog_nexus.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test.tar.gz"
|
|
mock_sha.return_value = "abc"
|
|
mock_scan.return_value = guarddog_normalized_clean
|
|
|
|
scan = await harvest(
|
|
"http://nexus:8081/repo/pypi-proxy/packages/django/4.2/django-4.2.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/django/4.2/django-4.2.tar.gz",
|
|
db_session,
|
|
)
|
|
|
|
assert scan is not None
|
|
assert scan.flagged is False
|
|
assert scan.total_findings == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_download_failure(db_session):
|
|
with patch("guarddog_nexus.harvester.download_asset") as mock_dl:
|
|
mock_dl.return_value = None
|
|
|
|
scan = await harvest(
|
|
"http://nexus:8081/repo/pypi-proxy/packages/fail/1.0/fail-1.0.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/fail/1.0/fail-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
|
|
assert scan is not None
|
|
assert scan.status == "failed"
|
|
assert "Download failed" in (scan.error_message or "")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_skips_non_package_asset(db_session):
|
|
scan = await harvest(
|
|
"http://nexus:8081/repo/pypi-proxy/simple/index.html",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"simple/index.html",
|
|
db_session,
|
|
)
|
|
assert scan is None
|