"""Pydantic schemas for API request/response models.""" from datetime import datetime from pydantic import BaseModel class ScanOut(BaseModel): id: int package_name: str package_version: str ecosystem: str repository: str status: str total_findings: int flagged: bool started_at: datetime | None = None finished_at: datetime | None = None error_message: str | None = None model_config = {"from_attributes": True} class ScanListResponse(BaseModel): total: int limit: int offset: int scans: list[ScanOut] class ScanDetailOut(ScanOut): nexus_asset_url: str | None = None sha256: str | None = None initiator: str | None = None source_ip: str | None = None findings: list[dict] = [] class FindingOut(BaseModel): id: int scan_id: int rule: str = "" severity: str = "" message: str = "" location: str = "" code: str = "" report: dict | None = None created_at: datetime | None = None model_config = {"from_attributes": True} class FindingsListResponse(BaseModel): total: int limit: int offset: int findings: list[FindingOut] class PackageOut(BaseModel): name: str version: str ecosystem: str repository: str last_scanned_at: datetime | None = None flagged: bool total_findings: int latest_scan_id: int class PackageListResponse(BaseModel): total: int limit: int offset: int packages: list[PackageOut] class PackageScanOut(BaseModel): id: int status: str total_findings: int flagged: bool started_at: datetime | None = None class PackageDetailOut(BaseModel): name: str version: str ecosystem: str repository: str flagged: bool scans: list[PackageScanOut] findings: list[dict] class StatsResponse(BaseModel): total_scans: int flagged_scans: int recent_flagged: int total_findings: int top_rules: list[dict] latest_scan_at: datetime | None = None # Webhook payload models class WebhookAsset(BaseModel): id: str | None = None format: str = "" path: str | None = None name: str | None = None downloadUrl: str | None = None class WebhookComponent(BaseModel): id: str | None = None format: str = "" name: str = "" version: str = "" class WebhookPayload(BaseModel): action: str = "" repositoryName: str = "" initiator: str | None = None asset: WebhookAsset | None = None component: WebhookComponent | None = None # Finding data known fields (prevents **f.data from overwriting id/scan_id) _FINDING_DATA_FIELDS = ("rule", "severity", "message", "location", "code") def serialize_finding(finding) -> dict: """Extract known fields from a Finding, preventing data field injection.""" result = { "id": finding.id, "scan_id": finding.scan_id, "report": finding.report, "created_at": finding.created_at.isoformat() if finding.created_at else None, } for field in _FINDING_DATA_FIELDS: result[field] = finding.data.get(field) or "" return result