Files
guarddog-nexus/guarddog_nexus/harvester.py
Marker689 6523f55dcd feat: поддержка Go и npm экосистем
- setup-nexus.sh: создание go-proxy (proxy.golang.org) и npm-proxy (registry.npmjs.org)
- nexus_client.py: extract_go_info() и extract_npm_info() для парсинга путей
  Go:  packages/github.com/gorilla/mux/@v/v1.8.0.zip → name=github.com/gorilla/mux ver=v1.8.0
  npm: packages/lodash/-/lodash-4.17.21.tgz → name=lodash ver=4.17.21
- nexus_client.py: EXTRACTORS dict + extract_package_info() универсальный extractor
- webhooks.py: _detect_ecosystem() — определяет экосистему из asset.format
- harvester.py: использует extract_package_info() вместо extract_pypi_info()
- Всё в Docker-контейнере, на хосте ничего не ставится
- GuardDog поддерживает go и npm из коробки
2026-05-10 06:29:34 +03:00

206 lines
6.5 KiB
Python

"""Harvester: download a package from Nexus, scan it, store results."""
import asyncio
import datetime
import os
import shutil
import tempfile
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from guarddog_nexus.config import config
from guarddog_nexus.constants import (
DEFAULT_ECOSYSTEM,
ERROR_MESSAGE_MAX_LENGTH,
PACKAGE_EXTENSIONS,
SCAN_ERROR_DOWNLOAD_FAILED,
)
from guarddog_nexus.logging_setup import log
from guarddog_nexus.models import Finding, Scan, ScanStatus
from guarddog_nexus.nexus_client import compute_sha256, download_asset, extract_package_info
from guarddog_nexus.scanner import scan_package
# Per-URL locks to avoid parallel scans of the same asset
_url_locks: dict[str, asyncio.Lock] = {}
_url_lock = asyncio.Lock()
# Global semaphore to limit concurrent GuardDog processes
_scan_semaphore = asyncio.Semaphore(config.max_concurrent_scans)
async def harvest(
download_url: str,
repository: str,
format_: str,
asset_path: str,
session: AsyncSession,
) -> Scan | None:
ecosystem = format_ if format_ else DEFAULT_ECOSYSTEM
filename = os.path.basename(download_url.split("?")[0])
if not filename.endswith(PACKAGE_EXTENSIONS):
log.info("Skipping non-package asset: %s", filename)
return None
info = extract_package_info(asset_path, ecosystem)
if info is None:
log.warning("Could not parse package info from path: %s", asset_path)
return None
package_name, package_version = info
# Acquire per-URL lock to prevent parallel scans of the same asset
async with _url_lock:
if download_url not in _url_locks:
_url_locks[download_url] = asyncio.Lock()
lock = _url_locks[download_url]
if lock.locked():
log.info("URL already being processed, skipping: %s", download_url)
return None
async with lock:
# Re-check DB in case another task already created and finished a scan
active = await session.scalar(
select(Scan.id).where(
Scan.nexus_asset_url == download_url,
Scan.status.in_([ScanStatus.PENDING.value, ScanStatus.SCANNING.value]),
)
)
if active:
log.info("Already scanning this URL, skipping")
return None
scan = Scan(
package_name=package_name,
package_version=package_version,
ecosystem=ecosystem,
repository=repository,
nexus_asset_url=download_url,
status=ScanStatus.PENDING.value,
)
session.add(scan)
await session.commit()
await session.refresh(scan)
os.makedirs(config.temp_dir, exist_ok=True)
tmpdir = tempfile.mkdtemp(dir=config.temp_dir)
try:
scan.status = ScanStatus.SCANNING.value
await session.commit()
downloaded = await download_asset(download_url, tmpdir)
if not downloaded:
scan.status = ScanStatus.FAILED.value
scan.error_message = SCAN_ERROR_DOWNLOAD_FAILED
scan.finished_at = datetime.datetime.now(datetime.timezone.utc)
await session.commit()
return scan
scan.sha256 = compute_sha256(downloaded)
await session.commit()
existing = await session.scalar(
select(Scan.id).where(
Scan.sha256 == scan.sha256,
Scan.id != scan.id,
)
)
if existing:
log.info(
"SHA256 already seen in scan #%d for %s==%s, skipping",
existing,
package_name,
package_version,
)
scan.status = ScanStatus.COMPLETED.value
scan.finished_at = datetime.datetime.now(datetime.timezone.utc)
await session.commit()
return scan
log.info("Scanning %s==%s", package_name, package_version)
async with _scan_semaphore:
result = await scan_package(downloaded, ecosystem)
findings_list = result.get("findings", [])
created_findings: list[Finding] = []
for fdata in findings_list:
f = Finding(scan_id=scan.id, data=fdata)
session.add(f)
created_findings.append(f)
scan.total_findings = len(findings_list)
scan.flagged = len(findings_list) > 0
scan.status = ScanStatus.COMPLETED.value
scan.finished_at = datetime.datetime.now(datetime.timezone.utc)
await session.commit()
# Refresh to get IDs
for f in created_findings:
await session.refresh(f)
# Auto-trigger LLM analysis for flagged packages
llm_reports = []
if scan.flagged and config.llm_enabled:
llm_reports = await _run_llm_analysis(created_findings, session)
if scan.flagged:
extra = {
"scan_id": scan.id,
"package": f"{package_name}=={package_version}",
"findings_count": scan.total_findings,
"repository": repository,
}
if llm_reports:
extra["llm_analysis"] = llm_reports
log.warning(
"FLAGGED %s==%s: %d findings in repo %s",
package_name,
package_version,
scan.total_findings,
repository,
)
if llm_reports:
log.info(
"LLM analysis complete for %s==%s: %d reports",
package_name,
package_version,
len(llm_reports),
)
log.info(
"Scan complete: %s==%s (%d findings)",
package_name,
package_version,
scan.total_findings,
)
return scan
except Exception as e:
log.error("Scan failed for %s==%s: %s", package_name, package_version, e)
scan.status = ScanStatus.FAILED.value
scan.error_message = str(e)[:ERROR_MESSAGE_MAX_LENGTH]
scan.finished_at = datetime.datetime.now(datetime.timezone.utc)
await session.commit()
return scan
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
async def _run_llm_analysis(findings: list[Finding], session: AsyncSession) -> list[dict]:
"""Run LLM analysis on findings and persist reports to the database."""
from guarddog_nexus.llm import analyze_finding
reports = []
for finding in findings:
report = await analyze_finding(finding.data)
if report:
finding.report = report
reports.append(report)
await session.commit()
return reports