refactor: вынос хардкода + LLM-анализ finding'ов
## Часть A: Вынос хардкода
- Новый модуль constants.py — все magic strings, лимиты, severity, ключи
(104 хардкод-значения централизованы)
- Новый модуль queries.py — общие SQL-запросы (build_scan_list_query,
build_package_list_query, get_dashboard_stats)
Убрана дупликация между api/*.py и web/routes.py (~90%)
- config.py: добавлены NLP_ENABLED, nexus_timeout, guarddog_binary,
log_syslog_facility, LLM-переменные
- nexus_client.py: таймауты из конфига, SHA256_CHUNK_SIZE из constants
- scanner.py: error-ключи из constants, GUARDDOG_OUTPUT_FORMAT из constants
- webhooks.py: RELEVANT_WEBHOOK_ACTIONS, METADATA_PATTERNS, ignore-строки
из constants
- logging_setup.py: конфигурируемый syslog facility, APP_PACKAGE из constants
- main.py: APP_NAME, APP_DESCRIPTION, APP_PACKAGE из constants
- models.py: поле report: JSON | None в Finding для LLM-отчётов
- harvester.py: авто-очистка tmpdir через finally; ERROR_MESSAGE_MAX_LENGTH
из constants; PACKAGE_EXTENSIONS вместо SUPPORTED_EXTENSIONS (с .gem)
- api/*.py + web/routes.py: используют build_*_query из queries.py,
константы для лимитов и сортировок
- tests/conftest.py: SEVERITY_WARNING, DEFAULT_ECOSYSTEM из constants
## Часть B: LLM-анализ finding'ов
- llm.py: клиент для OpenAI-совместимых API с промптом security-аналитика
- harvester.py: авто-триггер после flagged scan, сохранение report в БД
- api/findings.py: POST /{id}/analyze — ручной триггер
- web/routes.py: POST /api/v1/findings/{id}/analyze — HTMX-фрагмент
- _llm_report_fragment.html: шаблон фрагмента с вердиктом
- scan_detail.html, package_detail.html: кнопка Analyze with LLM
(htmx-post, spinner, inline-замена на LLM-отчёт)
- style.css: стили для .llm-report .verdict-safe/suspicious/malicious
## Часть C: Тесты
- 50 тестов, все зелёные
- Линтер чистый
- Тесты используют constants где нужно
This commit is contained in:
@@ -2,17 +2,25 @@
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import shutil
|
||||
|
||||
from guarddog_nexus.config import config
|
||||
from guarddog_nexus.constants import (
|
||||
DEFAULT_ECOSYSTEM,
|
||||
DEFAULT_FINDING_SEVERITY,
|
||||
GUARDDOG_OUTPUT_FORMAT,
|
||||
GUARDDOG_OUTPUT_KEY,
|
||||
GUARDDOG_RESULTS_KEY,
|
||||
SCAN_ERROR_BINARY_NOT_FOUND,
|
||||
SCAN_ERROR_JSON_PARSE,
|
||||
SCAN_ERROR_TIMEOUT,
|
||||
)
|
||||
from guarddog_nexus.logging_setup import log
|
||||
|
||||
GUARDDOG_BIN = shutil.which("guarddog") or "guarddog"
|
||||
|
||||
|
||||
async def scan_package(filepath: str, ecosystem: str = "pypi") -> dict:
|
||||
async def scan_package(filepath: str, ecosystem: str = DEFAULT_ECOSYSTEM) -> dict:
|
||||
"""Run guarddog scan on a downloaded package file. Returns normalized dict."""
|
||||
cmd = [GUARDDOG_BIN, ecosystem, "scan", filepath, "--output-format", "json"]
|
||||
guarddog_bin = config.guarddog_binary
|
||||
cmd = [guarddog_bin, ecosystem, "scan", filepath, GUARDDOG_OUTPUT_KEY, GUARDDOG_OUTPUT_FORMAT]
|
||||
log.info("Running: %s", " ".join(cmd))
|
||||
|
||||
try:
|
||||
@@ -26,10 +34,10 @@ async def scan_package(filepath: str, ecosystem: str = "pypi") -> dict:
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
log.error("GuardDog scan timed out for %s", filepath)
|
||||
return {"findings": [], "errors": ["timeout"]}
|
||||
return {"findings": [], "errors": [SCAN_ERROR_TIMEOUT]}
|
||||
except FileNotFoundError:
|
||||
log.error("GuardDog binary not found at %s", GUARDDOG_BIN)
|
||||
return {"findings": [], "errors": ["guarddog_not_found"]}
|
||||
log.error("GuardDog binary not found at %s", guarddog_bin)
|
||||
return {"findings": [], "errors": [SCAN_ERROR_BINARY_NOT_FOUND]}
|
||||
|
||||
if proc.returncode not in (0, 1):
|
||||
log.error("GuardDog exited %d: %s", proc.returncode, stderr.decode())
|
||||
@@ -39,7 +47,7 @@ async def scan_package(filepath: str, ecosystem: str = "pypi") -> dict:
|
||||
data = json.loads(stdout.decode())
|
||||
except json.JSONDecodeError:
|
||||
log.error("GuardDog returned invalid JSON for %s", filepath)
|
||||
return {"findings": [], "errors": ["json_parse_error"]}
|
||||
return {"findings": [], "errors": [SCAN_ERROR_JSON_PARSE]}
|
||||
|
||||
return _normalize_output(data)
|
||||
|
||||
@@ -56,7 +64,7 @@ def _normalize_output(data: dict) -> dict:
|
||||
- list → semgrep findings [{message, location, code}]
|
||||
"""
|
||||
findings = []
|
||||
results = data.get("results", {})
|
||||
results = data.get(GUARDDOG_RESULTS_KEY, {})
|
||||
|
||||
if isinstance(results, list):
|
||||
results = {}
|
||||
@@ -68,7 +76,7 @@ def _normalize_output(data: dict) -> dict:
|
||||
findings.append(
|
||||
{
|
||||
"rule": rule_name,
|
||||
"severity": "WARNING",
|
||||
"severity": DEFAULT_FINDING_SEVERITY,
|
||||
"message": value,
|
||||
"location": "",
|
||||
"code": "",
|
||||
@@ -80,7 +88,7 @@ def _normalize_output(data: dict) -> dict:
|
||||
findings.append(
|
||||
{
|
||||
"rule": rule_name,
|
||||
"severity": item.get("severity", "WARNING"),
|
||||
"severity": item.get("severity", DEFAULT_FINDING_SEVERITY),
|
||||
"message": item.get("message", ""),
|
||||
"location": item.get("location", ""),
|
||||
"code": item.get("code", ""),
|
||||
|
||||
Reference in New Issue
Block a user