"""Harvester: download a package from Nexus, scan it, store results.""" import asyncio import datetime import os import shutil import tempfile from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from guarddog_nexus.config import config from guarddog_nexus.constants import ( DEFAULT_ECOSYSTEM, ERROR_MESSAGE_MAX_LENGTH, PACKAGE_EXTENSIONS, SCAN_ERROR_DOWNLOAD_FAILED, ) from guarddog_nexus.logging_setup import log from guarddog_nexus.models import Finding, Scan, ScanStatus from guarddog_nexus.nexus_client import compute_sha256, download_asset, extract_pypi_info from guarddog_nexus.scanner import scan_package # Per-URL locks to avoid parallel scans of the same asset _url_locks: dict[str, asyncio.Lock] = {} _url_lock = asyncio.Lock() async def harvest( download_url: str, repository: str, format_: str, asset_path: str, session: AsyncSession, ) -> Scan | None: ecosystem = DEFAULT_ECOSYSTEM if format_ in (DEFAULT_ECOSYSTEM,) else format_ filename = os.path.basename(download_url.split("?")[0]) if not filename.endswith(PACKAGE_EXTENSIONS): log.info("Skipping non-package asset: %s", filename) return None info = extract_pypi_info(asset_path) if info is None: log.warning("Could not parse package info from path: %s", asset_path) return None package_name, package_version = info # Acquire per-URL lock to prevent parallel scans of the same asset async with _url_lock: if download_url not in _url_locks: _url_locks[download_url] = asyncio.Lock() lock = _url_locks[download_url] if lock.locked(): log.info("URL already being processed, skipping: %s", download_url) return None async with lock: # Re-check DB in case another task already created and finished a scan active = await session.scalar( select(Scan.id).where( Scan.nexus_asset_url == download_url, Scan.status.in_([ScanStatus.PENDING.value, ScanStatus.SCANNING.value]), ) ) if active: log.info("Already scanning this URL, skipping") return None scan = Scan( package_name=package_name, package_version=package_version, ecosystem=ecosystem, repository=repository, nexus_asset_url=download_url, status=ScanStatus.PENDING.value, ) session.add(scan) await session.commit() await session.refresh(scan) os.makedirs(config.temp_dir, exist_ok=True) tmpdir = tempfile.mkdtemp(dir=config.temp_dir) try: scan.status = ScanStatus.SCANNING.value await session.commit() downloaded = await download_asset(download_url, tmpdir) if not downloaded: scan.status = ScanStatus.FAILED.value scan.error_message = SCAN_ERROR_DOWNLOAD_FAILED scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() return scan scan.sha256 = compute_sha256(downloaded) await session.commit() existing = await session.scalar( select(Scan.id).where( Scan.sha256 == scan.sha256, Scan.id != scan.id, ) ) if existing: log.info( "SHA256 already seen in scan #%d for %s==%s, skipping", existing, package_name, package_version, ) scan.status = ScanStatus.COMPLETED.value scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() return scan log.info("Scanning %s==%s", package_name, package_version) result = await scan_package(downloaded, ecosystem) findings_list = result.get("findings", []) created_findings: list[Finding] = [] for fdata in findings_list: f = Finding(scan_id=scan.id, data=fdata) session.add(f) created_findings.append(f) scan.total_findings = len(findings_list) scan.flagged = len(findings_list) > 0 scan.status = ScanStatus.COMPLETED.value scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() # Refresh to get IDs for f in created_findings: await session.refresh(f) # Auto-trigger LLM analysis for flagged packages llm_reports = [] if scan.flagged and config.llm_enabled: llm_reports = await _run_llm_analysis(created_findings, session) if scan.flagged: extra = { "scan_id": scan.id, "package": f"{package_name}=={package_version}", "findings_count": scan.total_findings, "repository": repository, } if llm_reports: extra["llm_analysis"] = llm_reports log.warning( "FLAGGED %s==%s: %d findings in repo %s", package_name, package_version, scan.total_findings, repository, ) if llm_reports: log.info( "LLM analysis complete for %s==%s: %d reports", package_name, package_version, len(llm_reports), ) log.info( "Scan complete: %s==%s (%d findings)", package_name, package_version, scan.total_findings, ) return scan except Exception as e: log.error("Scan failed for %s==%s: %s", package_name, package_version, e) scan.status = ScanStatus.FAILED.value scan.error_message = str(e)[:ERROR_MESSAGE_MAX_LENGTH] scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() return scan finally: shutil.rmtree(tmpdir, ignore_errors=True) async def _run_llm_analysis(findings: list[Finding], session: AsyncSession) -> list[dict]: """Run LLM analysis on findings and persist reports to the database.""" from guarddog_nexus.llm import analyze_finding reports = [] for finding in findings: report = await analyze_finding(finding.data) if report: finding.report = report reports.append(report) await session.commit() return reports