"""Harvester: download a package from Nexus, scan it, store results.""" import asyncio import datetime import os import shutil import tempfile from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from ..config import config from ..constants import ( DEFAULT_ECOSYSTEM, ERROR_MESSAGE_MAX_LENGTH, PACKAGE_EXTENSIONS, SCAN_ERROR_DOWNLOAD_FAILED, ) from ..db.models import Finding, Scan, ScanStatus from ..logging_setup import log from .nexus import compute_sha256, download_asset, extract_package_info from .scanner import scan_package # Per-URL locks to avoid parallel scans of the same asset _url_locks: dict[str, asyncio.Lock] = {} _url_lock = asyncio.Lock() # Global semaphore to limit concurrent GuardDog processes _scan_semaphore = asyncio.Semaphore(config.max_concurrent_scans) async def harvest( download_url: str, repository: str, format_: str, asset_path: str, session: AsyncSession, initiator: str | None = None, source_ip: str | None = None, ) -> Scan | None: ecosystem = format_ if format_ else DEFAULT_ECOSYSTEM filename = os.path.basename(download_url.split("?")[0]) if not filename.endswith(PACKAGE_EXTENSIONS): log.info("Skipping non-package asset: %s", filename) return None info = extract_package_info(asset_path, ecosystem) if info is None: log.warning("Could not parse package info from path: %s", asset_path) return None package_name, package_version = info # Acquire per-URL lock to prevent parallel scans of the same asset async with _url_lock: if download_url not in _url_locks: _url_locks[download_url] = asyncio.Lock() lock = _url_locks[download_url] if lock.locked(): log.info("URL already being processed, skipping: %s", download_url) async with _url_lock: _url_locks.pop(download_url, None) return None async with lock: try: # Re-check DB in case another task already created and finished a scan active = await session.scalar( select(Scan.id).where( Scan.nexus_asset_url == download_url, Scan.status.in_([ScanStatus.PENDING.value, ScanStatus.SCANNING.value]), ) ) if active: log.info("Already scanning this URL, skipping") return None finally: async with _url_lock: _url_locks.pop(download_url, None) scan = Scan( package_name=package_name, package_version=package_version, ecosystem=ecosystem, repository=repository, nexus_asset_url=download_url, initiator=initiator, source_ip=source_ip, status=ScanStatus.PENDING.value, ) session.add(scan) await session.commit() await session.refresh(scan) try: os.makedirs(config.temp_dir, exist_ok=True) tmpdir = tempfile.mkdtemp(dir=config.temp_dir) scan.status = ScanStatus.SCANNING.value await session.commit() downloaded = await download_asset(download_url, tmpdir) if not downloaded: scan.status = ScanStatus.FAILED.value scan.error_message = SCAN_ERROR_DOWNLOAD_FAILED scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() return scan scan.sha256 = await compute_sha256(downloaded) await session.commit() existing = await session.scalar( select(Scan.id).where( Scan.sha256 == scan.sha256, Scan.id != scan.id, ) ) if existing: log.info( "SHA256 already seen in scan #%d for %s==%s, skipping", existing, package_name, package_version, ) scan.status = ScanStatus.COMPLETED.value scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() return scan log.info("Scanning %s==%s", package_name, package_version) async with _scan_semaphore: result = await scan_package(downloaded, ecosystem) findings_list = result.get("findings", []) created_findings: list[Finding] = [] for fdata in findings_list: f = Finding(scan_id=scan.id, data=fdata) session.add(f) created_findings.append(f) scan.total_findings = len(findings_list) scan.flagged = len(findings_list) > 0 scan.status = ScanStatus.COMPLETED.value scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() # Refresh to get IDs for f in created_findings: await session.refresh(f) # Auto-trigger LLM analysis for flagged packages llm_reports = [] if scan.flagged and config.llm_enabled and config.llm_auto_analyze: try: llm_reports = await _run_llm_analysis(created_findings, session) except Exception as e: log.error("LLM analysis failed for %s==%s: %s", package_name, package_version, e) llm_reports = [] if scan.flagged: extra = { "scan_id": scan.id, "package": f"{package_name}=={package_version}", "findings_count": scan.total_findings, "repository": repository, } if llm_reports: extra["llm_analysis"] = llm_reports log.warning( "FLAGGED %s==%s: %d findings in repo %s", package_name, package_version, scan.total_findings, repository, ) if llm_reports: log.info( "LLM analysis complete for %s==%s: %d reports", package_name, package_version, len(llm_reports), ) log.info( "Scan complete: %s==%s (%d findings)", package_name, package_version, scan.total_findings, ) return scan except Exception as e: log.exception("Scan failed for %s==%s", package_name, package_version) scan.status = ScanStatus.FAILED.value scan.error_message = str(e)[:ERROR_MESSAGE_MAX_LENGTH] scan.finished_at = datetime.datetime.now(datetime.timezone.utc) await session.commit() return scan finally: shutil.rmtree(tmpdir, ignore_errors=True) async def _run_llm_analysis(findings: list[Finding], session: AsyncSession) -> list[dict]: """Run LLM analysis on findings and persist reports to the database.""" from .llm import analyze_finding # Mark all as analyzing so the UI shows a spinner for finding in findings: finding.report = {"status": "analyzing"} await session.commit() reports = [] for finding in findings: report = await analyze_finding(finding.data) if report: finding.report = report reports.append(report) else: finding.report = None await session.commit() return reports