260 lines
8.4 KiB
Python
260 lines
8.4 KiB
Python
"""Tests for harvester pipeline."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from sqlalchemy import select
|
|
|
|
from guarddog_nexus.core.harvester import harvest
|
|
from guarddog_nexus.db.models import Finding
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_new_package(db_session, guarddog_normalized_flagged):
|
|
with (
|
|
patch("guarddog_nexus.core.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.core.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.core.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test-package.tar.gz"
|
|
mock_sha.return_value = "abc123"
|
|
mock_scan.return_value = guarddog_normalized_flagged
|
|
|
|
scan = await harvest(
|
|
download_url="http://nexus/repo/pypi-proxy/packages/requests/2.31.0/requests-2.31.0.tar.gz",
|
|
repository="pypi-proxy",
|
|
format_="pypi",
|
|
asset_path="packages/requests/2.31.0/requests-2.31.0.tar.gz",
|
|
session=db_session,
|
|
)
|
|
|
|
assert scan is not None
|
|
assert scan.package_name == "requests"
|
|
assert scan.package_version == "2.31.0"
|
|
assert scan.status == "completed"
|
|
assert scan.flagged is True
|
|
assert scan.total_findings == 3
|
|
assert scan.sha256 == "abc123"
|
|
|
|
findings = (
|
|
(await db_session.execute(select(Finding).where(Finding.scan_id == scan.id)))
|
|
.scalars()
|
|
.all()
|
|
)
|
|
assert len(findings) == 3
|
|
rules = {f.data["rule"] for f in findings}
|
|
assert "shady-links" in rules
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_same_sha256_skips(db_session, guarddog_normalized_flagged):
|
|
"""Same SHA256 as existing scan → skip, don't re-scan."""
|
|
with (
|
|
patch("guarddog_nexus.core.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.core.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.core.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test.tar.gz"
|
|
mock_sha.return_value = "deadbeef"
|
|
mock_scan.return_value = guarddog_normalized_flagged
|
|
|
|
first = await harvest(
|
|
"http://nexus/repo/pkg/x/1.0/x-1.0.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/x/1.0/x-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
assert first is not None
|
|
assert first.total_findings == 3
|
|
|
|
second = await harvest(
|
|
"http://nexus/repo/pkg/x/1.0/x-1.0-evil.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/x/1.0/x-1.0-evil.tar.gz",
|
|
db_session,
|
|
)
|
|
assert second is not None
|
|
assert second.total_findings == 0 # skipped due to same sha256, no findings copied
|
|
assert second.status == "completed"
|
|
assert second.sha256 == "deadbeef"
|
|
assert mock_scan.call_count == 1 # second scan skipped
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_different_sha256_scans_again(db_session, guarddog_normalized_flagged):
|
|
"""Same name/version, different SHA256 → new scan."""
|
|
with (
|
|
patch("guarddog_nexus.core.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.core.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.core.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test.tar.gz"
|
|
mock_scan.return_value = guarddog_normalized_flagged
|
|
|
|
mock_sha.return_value = "aaa"
|
|
first = await harvest(
|
|
"http://nexus/repo/pkg/y/1.0/y-1.0.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/y/1.0/y-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
assert first is not None
|
|
assert first.sha256 == "aaa"
|
|
|
|
mock_sha.return_value = "bbb"
|
|
second = await harvest(
|
|
"http://nexus/repo/pkg/y/1.0/y-1.0-malicious.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/y/1.0/y-1.0-malicious.tar.gz",
|
|
db_session,
|
|
)
|
|
assert second is not None
|
|
assert second.sha256 == "bbb"
|
|
assert second.package_name == "y"
|
|
assert second.package_version == "1.0"
|
|
assert mock_scan.call_count == 2 # both scanned
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_skips_active_scan_same_url(db_session, guarddog_normalized_flagged):
|
|
"""Concurrent webhooks for same URL: first proceeding, second skips as PENDING."""
|
|
with (
|
|
patch("guarddog_nexus.core.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.core.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.core.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test.tar.gz"
|
|
mock_sha.return_value = "aaa"
|
|
mock_scan.return_value = guarddog_normalized_flagged
|
|
|
|
url = "http://nexus/repo/pkg/z/1.0/z-1.0.tar.gz"
|
|
first = await harvest(
|
|
url,
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/z/1.0/z-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
assert first is not None
|
|
assert first.status == "completed"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_same_url_sha256_dedup(db_session, guarddog_normalized_flagged):
|
|
"""Same URL twice: second run hits SHA256 dedup (first already completed)."""
|
|
with (
|
|
patch("guarddog_nexus.core.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.core.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.core.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test.tar.gz"
|
|
mock_sha.return_value = "ccc"
|
|
mock_scan.return_value = guarddog_normalized_flagged
|
|
|
|
url = "http://nexus/repo/pkg/w/1.0/w-1.0.tar.gz"
|
|
first = await harvest(
|
|
url,
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/w/1.0/w-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
assert first is not None
|
|
assert first.status == "completed"
|
|
assert mock_scan.call_count == 1
|
|
|
|
second = await harvest(
|
|
url,
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/w/1.0/w-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
assert second is not None
|
|
assert second.status == "completed"
|
|
assert mock_scan.call_count == 1 # no new scan, reused from sha256 match
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_clean_package(db_session, guarddog_normalized_clean):
|
|
with (
|
|
patch("guarddog_nexus.core.harvester.download_asset") as mock_dl,
|
|
patch("guarddog_nexus.core.harvester.compute_sha256") as mock_sha,
|
|
patch("guarddog_nexus.core.harvester.scan_package") as mock_scan,
|
|
):
|
|
mock_dl.return_value = "/tmp/test.tar.gz"
|
|
mock_sha.return_value = "abc"
|
|
mock_scan.return_value = guarddog_normalized_clean
|
|
|
|
scan = await harvest(
|
|
"http://nexus/repo/pkg/django/4.2/django-4.2.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/django/4.2/django-4.2.tar.gz",
|
|
db_session,
|
|
)
|
|
|
|
assert scan is not None
|
|
assert scan.flagged is False
|
|
assert scan.total_findings == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_download_failure(db_session):
|
|
with patch("guarddog_nexus.core.harvester.download_asset") as mock_dl:
|
|
mock_dl.return_value = None
|
|
|
|
scan = await harvest(
|
|
"http://nexus/repo/pkg/fail/1.0/fail-1.0.tar.gz",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"packages/fail/1.0/fail-1.0.tar.gz",
|
|
db_session,
|
|
)
|
|
|
|
assert scan is not None
|
|
assert scan.status == "failed"
|
|
assert "Download failed" in (scan.error_message or "")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_harvest_skips_non_package_asset(db_session):
|
|
scan = await harvest(
|
|
"http://nexus/repo/simple/index.html",
|
|
"pypi-proxy",
|
|
"pypi",
|
|
"simple/index.html",
|
|
db_session,
|
|
)
|
|
assert scan is None
|
|
|
|
|
|
# --- Lock cleanup ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_url_locks_removes_unlocked():
|
|
import asyncio
|
|
|
|
from guarddog_nexus.core.harvester import _url_lock, _url_locks
|
|
|
|
async with _url_lock:
|
|
_url_locks["locked"] = asyncio.Lock()
|
|
_url_locks["unlocked"] = asyncio.Lock()
|
|
|
|
await _url_locks["locked"].acquire()
|
|
|
|
for key in list(_url_locks.keys()):
|
|
if not _url_locks[key].locked():
|
|
_url_locks.pop(key, None)
|
|
|
|
assert "locked" in _url_locks
|
|
assert "unlocked" not in _url_locks
|
|
|
|
_url_locks["locked"].release()
|
|
_url_locks.clear()
|