"""Sonatype Nexus REST API client.""" import hashlib import os import subprocess from guarddog_nexus.config import config from guarddog_nexus.logging_setup import log SUPPORTED_EXTENSIONS = (".tar.gz", ".tgz", ".whl", ".zip") PACKAGE_FILE_PATTERNS = ("packages/",) def get_ecosystem_from_format(fmt: str) -> str | None: mapping = { "pypi": "pypi", "npm": "npm", "rubygems": "rubygems", "go": "go", "raw": None, } return mapping.get(fmt.lower() if fmt else "") def extract_pypi_info(asset_path: str) -> tuple[str, str] | None: """Extract package name and version from a PyPI asset path. Path format: packages/requests/2.31.0/requests-2.31.0.tar.gz """ parts = asset_path.strip("/").split("/") if len(parts) >= 3 and parts[0] == "packages": return parts[1], parts[2] return None def download_asset(download_url: str, dest_dir: str) -> str | None: """Download an asset from Nexus using curl (available in Docker).""" dest_path = os.path.join(dest_dir, os.path.basename(download_url.split("?")[0])) try: result = subprocess.run( [ "curl", "-sfSL", "-u", f"{config.nexus_username}:{config.nexus_password}", "-o", dest_path, download_url, ], capture_output=True, text=True, timeout=120, ) if result.returncode != 0: log.warning("Failed to download %s: %s", download_url, result.stderr) return None return dest_path except Exception as e: log.error("Download error for %s: %s", download_url, e) return None def compute_sha256(filepath: str) -> str: h = hashlib.sha256() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest()