144 lines
3.1 KiB
Python
144 lines
3.1 KiB
Python
"""Pydantic schemas for API request/response models."""
|
|
|
|
from datetime import datetime
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class ScanOut(BaseModel):
|
|
id: int
|
|
package_name: str
|
|
package_version: str
|
|
ecosystem: str
|
|
repository: str
|
|
status: str
|
|
total_findings: int
|
|
flagged: bool
|
|
started_at: datetime | None = None
|
|
finished_at: datetime | None = None
|
|
error_message: str | None = None
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class ScanListResponse(BaseModel):
|
|
total: int
|
|
limit: int
|
|
offset: int
|
|
scans: list[ScanOut]
|
|
|
|
|
|
class ScanDetailOut(ScanOut):
|
|
nexus_asset_url: str | None = None
|
|
sha256: str | None = None
|
|
initiator: str | None = None
|
|
source_ip: str | None = None
|
|
findings: list[dict] = []
|
|
|
|
|
|
class FindingOut(BaseModel):
|
|
id: int
|
|
scan_id: int
|
|
rule: str = ""
|
|
severity: str = ""
|
|
message: str = ""
|
|
location: str = ""
|
|
code: str = ""
|
|
report: dict | None = None
|
|
created_at: datetime | None = None
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class FindingsListResponse(BaseModel):
|
|
total: int
|
|
limit: int
|
|
offset: int
|
|
findings: list[FindingOut]
|
|
|
|
|
|
class PackageOut(BaseModel):
|
|
name: str
|
|
version: str
|
|
ecosystem: str
|
|
repository: str
|
|
last_scanned_at: datetime | None = None
|
|
flagged: bool
|
|
total_findings: int
|
|
latest_scan_id: int
|
|
|
|
|
|
class PackageListResponse(BaseModel):
|
|
total: int
|
|
limit: int
|
|
offset: int
|
|
packages: list[PackageOut]
|
|
|
|
|
|
class PackageScanOut(BaseModel):
|
|
id: int
|
|
status: str
|
|
total_findings: int
|
|
flagged: bool
|
|
started_at: datetime | None = None
|
|
|
|
|
|
class PackageDetailOut(BaseModel):
|
|
name: str
|
|
version: str
|
|
ecosystem: str
|
|
repository: str
|
|
flagged: bool
|
|
scans: list[PackageScanOut]
|
|
findings: list[dict]
|
|
|
|
|
|
class StatsResponse(BaseModel):
|
|
total_scans: int
|
|
flagged_scans: int
|
|
recent_flagged: int
|
|
total_findings: int
|
|
top_rules: list[dict]
|
|
latest_scan_at: datetime | None = None
|
|
|
|
|
|
# Webhook payload models
|
|
class WebhookAsset(BaseModel):
|
|
id: str | None = None
|
|
format: str = ""
|
|
path: str | None = None
|
|
name: str | None = None
|
|
downloadUrl: str | None = None
|
|
|
|
|
|
class WebhookComponent(BaseModel):
|
|
id: str | None = None
|
|
format: str = ""
|
|
name: str = ""
|
|
version: str = ""
|
|
|
|
|
|
class WebhookPayload(BaseModel):
|
|
action: str = ""
|
|
repositoryName: str = ""
|
|
initiator: str | None = None
|
|
asset: WebhookAsset | None = None
|
|
component: WebhookComponent | None = None
|
|
|
|
|
|
# Finding data known fields (prevents **f.data from overwriting id/scan_id)
|
|
_FINDING_DATA_FIELDS = ("rule", "severity", "message", "location", "code")
|
|
|
|
|
|
def serialize_finding(finding) -> dict:
|
|
"""Extract known fields from a Finding, preventing data field injection."""
|
|
result = {
|
|
"id": finding.id,
|
|
"scan_id": finding.scan_id,
|
|
"report": finding.report,
|
|
"created_at": finding.created_at.isoformat() if finding.created_at else None,
|
|
}
|
|
for field in _FINDING_DATA_FIELDS:
|
|
result[field] = finding.data.get(field, "")
|
|
return result
|