"""Async SQLite database setup via SQLAlchemy.""" from sqlalchemy import inspect, text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase from guarddog_nexus.config import config from guarddog_nexus.logging_setup import log DATABASE_URL = f"sqlite+aiosqlite:///{config.database_path}" _engine = create_async_engine(DATABASE_URL, echo=False) _async_session = async_sessionmaker(_engine, class_=AsyncSession, expire_on_commit=False) class Base(DeclarativeBase): pass async def _migrate(): """Add any missing columns from model definitions to existing SQLite tables.""" import guarddog_nexus.db.models # noqa: F401 async with _engine.connect() as conn: for table in Base.metadata.sorted_tables: # Get existing columns in DB col_names = [] try: existing = await conn.run_sync( lambda c: [col["name"] for col in inspect(c).get_columns(table.name)] ) col_names = existing except Exception: continue # Add missing model columns for col in table.columns: if col.name not in col_names: col_type = col.type.compile(_engine.dialect) nullable = "" if col.nullable else " NOT NULL" default = "" if col.default and col.default.arg is not None: default_val = col.default.arg if isinstance(default_val, str): default = f" DEFAULT '{default_val}'" else: default = f" DEFAULT {default_val}" if col.server_default: # Skip — func.now() etc. not trivially stringable pass sql = ( f"ALTER TABLE {table.name} ADD COLUMN " f"{col.name} {col_type}{nullable}{default}" ) log.info("Migration: %s", sql) try: await conn.execute(text(sql)) await conn.commit() except Exception as e: log.warning("Migration skipped (may already exist): %s", e) async def init_db(): import guarddog_nexus.db.models # noqa: F401 async with _engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) await _migrate() await _ensure_indexes() await _reap_stale_analysis() async def get_session() -> AsyncSession: async with _async_session() as session: yield session async def _ensure_indexes(): """Create indexes that are not covered by ORM model definitions.""" indexes = [ "CREATE INDEX IF NOT EXISTS idx_scans_status ON scans(status)", "CREATE INDEX IF NOT EXISTS idx_scans_sha256 ON scans(sha256)", "CREATE INDEX IF NOT EXISTS idx_scans_package_name ON scans(package_name)", "CREATE INDEX IF NOT EXISTS idx_scans_package_version ON scans(package_version)", "CREATE INDEX IF NOT EXISTS idx_scans_flagged ON scans(flagged)", "CREATE INDEX IF NOT EXISTS idx_scans_nexus_asset_url ON scans(nexus_asset_url)", "CREATE INDEX IF NOT EXISTS idx_findings_scan_id ON findings(scan_id)", ] async with _engine.begin() as conn: for sql in indexes: await conn.execute(text(sql)) async def _reap_stale_analysis(): """Reset stuck 'analyzing' statuses left from crashes.""" sql = ( "UPDATE findings SET report = NULL " "WHERE report IS NOT NULL " "AND json_extract(report, '$.status') = 'analyzing'" ) async with _engine.begin() as conn: result = await conn.execute(text(sql)) count = result.rowcount if count: log.warning("Reset %d stale LLM analysis statuses", count)