Files
guarddog-nexus/guarddog_nexus/core/nexus.py

166 lines
5.0 KiB
Python

"""Sonatype Nexus REST API client using httpx async."""
import asyncio
import hashlib
import os
from urllib.parse import unquote, urlparse
import httpx
from ..config import config
from ..constants import (
PKG_PATH_PREFIX,
SHA256_CHUNK_SIZE,
)
from ..logging_setup import log
def _validate_download_url(url: str) -> bool:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False
if parsed.hostname not in config.nexus_allowed_hosts:
return False
return True
def extract_pypi_info(asset_path: str) -> tuple[str, str] | None:
"""Extract package name and version from a PyPI asset path.
Path format: packages/requests/2.31.0/requests-2.31.0.tar.gz
"""
parts = asset_path.strip("/").split("/")
if len(parts) >= 3 and parts[0] == PKG_PATH_PREFIX:
return parts[1], parts[2]
return None
def extract_go_info(asset_path: str) -> tuple[str, str] | None:
"""Extract module and version from a Go proxy asset path.
Path format: packages/github.com/gin-gonic/gin/@v/v1.9.0.zip
"""
cleaned = asset_path.strip("/")
# Find @v/ marker
idx = cleaned.find("/@v/")
if idx == -1:
return None
if cleaned.startswith(PKG_PATH_PREFIX + "/"):
module = cleaned[len(PKG_PATH_PREFIX) + 1 : idx]
else:
module = cleaned[:idx]
if not module:
return None
# Version: after @v/ up to the next / or end
ver_start = idx + 4 # len("/@v/")
rest = cleaned[ver_start:]
version = rest.split("/")[0] if "/" in rest else rest
if version.endswith(".zip"):
version = version[:-4]
return module, version
def extract_npm_info(asset_path: str) -> tuple[str, str] | None:
"""Extract package name and version from an npm proxy asset path.
Path format:
packages/react/-/react-18.2.0.tgz
packages/@angular/core/-/core-18.0.0.tgz (scoped)
"""
parts = asset_path.strip("/").split("/")
if len(parts) < 4 or parts[0] != PKG_PATH_PREFIX:
return None
# Scoped package: @scope/name
if parts[1].startswith("@"):
if len(parts) < 5:
return None
name = f"{parts[1]}/{parts[2]}"
short_name = parts[2]
else:
name = parts[1]
short_name = name
last = parts[-1]
if last.startswith(short_name + "-"):
raw = last[len(short_name) + 1 :]
for ext in (".tgz", ".tar.gz"):
if raw.endswith(ext):
return name, raw[: -len(ext)]
return None
# Map of ecosystem → extractor function
EXTRACTORS = {
"pypi": extract_pypi_info,
"go": extract_go_info,
"npm": extract_npm_info,
}
def extract_package_info(asset_path: str, ecosystem: str) -> tuple[str, str] | None:
"""Extract package name and version based on ecosystem."""
extractor = EXTRACTORS.get(ecosystem)
if extractor:
return extractor(asset_path)
# Fallback for unknown ecosystems — try simple parts split
parts = asset_path.strip("/").split("/")
if len(parts) >= 3:
return parts[1], parts[2]
return None
def parse_package_path(path: str) -> tuple[str, str]:
"""Parse a URL path like 'eviltest/0.1.0' or 'github.com/attacker/evilmodule/v0.1.0'
into (package_name, package_version)."""
parts = path.rsplit("/", 1)
pkg_name = unquote(parts[0])
pkg_version = unquote(parts[1]) if len(parts) == 2 else ""
return pkg_name, pkg_version
async def download_asset(download_url: str, dest_dir: str) -> str | None:
"""Download an asset from Nexus using async httpx."""
if not _validate_download_url(download_url):
parsed = urlparse(download_url)
log.warning("SSRF prevention: blocked download from %s", parsed.hostname or "unknown")
return None
dest_path = os.path.join(dest_dir, os.path.basename(download_url.split("?")[0]))
async with httpx.AsyncClient(
timeout=config.nexus_download_timeout, follow_redirects=True
) as client:
try:
response = await client.get(download_url)
response.raise_for_status()
content = response.content
await asyncio.to_thread(_write_file, dest_path, content)
return dest_path
except Exception as e:
log.warning("Failed to download %s: %s", download_url, e)
return None
def _write_file(path: str, content: bytes) -> None:
with open(path, "wb") as f:
f.write(content)
async def nexus_get(path: str) -> httpx.Response:
"""Make a GET request to Nexus REST API (anonymous access)."""
async with httpx.AsyncClient(timeout=config.nexus_api_timeout) as client:
return await client.get(f"{config.nexus_url.rstrip('/')}{path}")
async def compute_sha256(filepath: str) -> str:
return await asyncio.to_thread(_compute_sha256_sync, filepath)
def _compute_sha256_sync(filepath: str) -> str:
h = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(SHA256_CHUNK_SIZE), b""):
h.update(chunk)
return h.hexdigest()