Files
guarddog-nexus/tests/test_harvester.py

115 lines
3.9 KiB
Python

"""Tests for harvester pipeline."""
from unittest.mock import patch
import pytest
from sqlalchemy import select
from guarddog_nexus.harvester import harvest
from guarddog_nexus.models import Finding
@pytest.mark.asyncio
async def test_harvest_new_package(db_session, guarddog_normalized_flagged):
with (
patch("guarddog_nexus.harvester.download_asset") as mock_dl,
patch("guarddog_nexus.harvester.compute_sha256") as mock_sha,
patch("guarddog_nexus.harvester.scan_package") as mock_scan,
):
mock_dl.return_value = "/tmp/test-package.tar.gz"
mock_sha.return_value = "abc123"
mock_scan.return_value = guarddog_normalized_flagged
scan = await harvest(
download_url="http://nexus:8081/repository/pypi-proxy/packages/requests/2.31.0/requests-2.31.0.tar.gz",
repository="pypi-proxy",
format_="pypi",
asset_path="packages/requests/2.31.0/requests-2.31.0.tar.gz",
session=db_session,
)
assert scan is not None
assert scan.package_name == "requests"
assert scan.package_version == "2.31.0"
assert scan.ecosystem == "pypi"
assert scan.status == "completed"
assert scan.flagged is True
assert scan.total_findings == 2
assert scan.sha256 == "abc123"
findings = (
(await db_session.execute(select(Finding).where(Finding.scan_id == scan.id)))
.scalars()
.all()
)
assert len(findings) == 2
@pytest.mark.asyncio
async def test_harvest_skips_duplicate(db_session, guarddog_normalized_flagged):
with (
patch("guarddog_nexus.harvester.download_asset") as mock_dl,
patch("guarddog_nexus.harvester.compute_sha256") as mock_sha,
patch("guarddog_nexus.harvester.scan_package") as mock_scan,
):
mock_dl.return_value = "/tmp/test.tar.gz"
mock_sha.return_value = "abc"
mock_scan.return_value = guarddog_normalized_flagged
first = await harvest(
"http://nexus:8081/repo/pypi-proxy/packages/x/1.0/x-1.0.tar.gz",
"pypi-proxy", "pypi", "packages/x/1.0/x-1.0.tar.gz", db_session,
)
second = await harvest(
"http://nexus:8081/repo/pypi-proxy/packages/x/1.0/x-1.0.tar.gz",
"pypi-proxy", "pypi", "packages/x/1.0/x-1.0.tar.gz", db_session,
)
assert first is not None
assert second is None # skipped duplicate
@pytest.mark.asyncio
async def test_harvest_clean_package(db_session, guarddog_normalized_clean):
with (
patch("guarddog_nexus.harvester.download_asset") as mock_dl,
patch("guarddog_nexus.harvester.compute_sha256") as mock_sha,
patch("guarddog_nexus.harvester.scan_package") as mock_scan,
):
mock_dl.return_value = "/tmp/test.tar.gz"
mock_sha.return_value = "abc"
mock_scan.return_value = guarddog_normalized_clean
scan = await harvest(
"http://nexus:8081/repo/pypi-proxy/packages/django/4.2/django-4.2.tar.gz",
"pypi-proxy", "pypi", "packages/django/4.2/django-4.2.tar.gz", db_session,
)
assert scan is not None
assert scan.flagged is False
assert scan.total_findings == 0
@pytest.mark.asyncio
async def test_harvest_download_failure(db_session):
with patch("guarddog_nexus.harvester.download_asset") as mock_dl:
mock_dl.return_value = None
scan = await harvest(
"http://nexus:8081/repo/pypi-proxy/packages/fail/1.0/fail-1.0.tar.gz",
"pypi-proxy", "pypi", "packages/fail/1.0/fail-1.0.tar.gz", db_session,
)
assert scan is not None
assert scan.status == "failed"
assert "Download failed" in (scan.error_message or "")
@pytest.mark.asyncio
async def test_harvest_skips_non_package_asset(db_session):
scan = await harvest(
"http://nexus:8081/repo/pypi-proxy/simple/index.html",
"pypi-proxy", "pypi", "simple/index.html", db_session,
)
assert scan is None