""" Auto-Migration Runner for BreakPilot Compliance Backend. Runs all SQL migrations from the migrations/ directory on startup. Tracks which migrations have already been applied in a _migration_history table. Migrations are executed in filename order (001_, 002_, ...). Safe for repeated runs: already-applied migrations are skipped. Uses raw DBAPI connections to handle SQL files with explicit BEGIN/COMMIT. """ import logging import os import re from pathlib import Path from sqlalchemy import text from database import engine, SCHEMA_SEARCH_PATH logger = logging.getLogger(__name__) MIGRATIONS_DIR = Path(__file__).parent / "migrations" MIGRATION_PATTERN = re.compile(r"^(\d{3})_.+\.sql$") # Migrations that existed before the auto-runner was introduced. # These are assumed to have been applied manually on all environments. _PRE_RUNNER_CUTOFF = 45 def run_migrations(): """Run all pending SQL migrations in order.""" if os.getenv("SKIP_MIGRATIONS", "").lower() in ("1", "true", "yes"): logger.info("SKIP_MIGRATIONS is set — skipping auto-migration") return if not MIGRATIONS_DIR.is_dir(): logger.warning("Migrations directory not found: %s", MIGRATIONS_DIR) return # Use raw DBAPI connection for full SQL execution (BEGIN/COMMIT support) raw_conn = engine.raw_connection() try: cursor = raw_conn.cursor() cursor.execute(f"SET search_path TO {SCHEMA_SEARCH_PATH}") # Ensure tracking table exists cursor.execute(""" CREATE TABLE IF NOT EXISTS _migration_history ( id SERIAL PRIMARY KEY, filename VARCHAR(255) NOT NULL UNIQUE, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) raw_conn.commit() # Get already-applied migrations cursor.execute("SELECT filename FROM _migration_history") applied = {row[0] for row in cursor.fetchall()} # Discover and sort migration files migration_files = sorted( f for f in MIGRATIONS_DIR.iterdir() if f.is_file() and MIGRATION_PATTERN.match(f.name) ) # First run: if _migration_history is empty but DB already has tables # from manually applied migrations, seed the history so we don't re-run them. if not applied: _seed_existing_migrations(cursor, raw_conn, migration_files) cursor.execute("SELECT filename FROM _migration_history") applied = {row[0] for row in cursor.fetchall()} pending = [f for f in migration_files if f.name not in applied] if not pending: logger.info("All %d migrations already applied", len(applied)) return logger.info("%d pending migrations (of %d total)", len(pending), len(migration_files)) failed = [] for migration_file in pending: logger.info("Applying migration: %s", migration_file.name) try: sql = migration_file.read_text(encoding="utf-8") # Strip explicit BEGIN/COMMIT — we manage transactions ourselves sql = re.sub(r'(?mi)^\s*BEGIN\s*;\s*$', '', sql) sql = re.sub(r'(?mi)^\s*COMMIT\s*;\s*$', '', sql) cursor.execute(sql) raw_conn.commit() # Record successful application cursor.execute( "INSERT INTO _migration_history (filename) VALUES (%s)", (migration_file.name,), ) raw_conn.commit() logger.info(" OK: %s", migration_file.name) except Exception as e: raw_conn.rollback() logger.error(" FAILED: %s — %s", migration_file.name, e) failed.append((migration_file.name, str(e))) # Continue with remaining migrations instead of aborting if failed: names = ", ".join(f[0] for f in failed) logger.error("Some migrations failed: %s", names) else: logger.info("All migrations applied successfully") finally: raw_conn.close() def _seed_existing_migrations(cursor, conn, migration_files: list[Path]): """Seed _migration_history with pre-existing migrations. On first run the history table is empty. We check if the DB already has tables from earlier migrations (e.g. canonical_controls from 044). If yes, we mark all migrations up to _PRE_RUNNER_CUTOFF as applied so the runner doesn't try to re-run them. """ cursor.execute(""" SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_name = 'canonical_controls' ) """) db_has_tables = cursor.fetchone()[0] if not db_has_tables: logger.info("Fresh database — no seeding needed, will run all migrations") return logger.info( "Existing database detected — seeding migration history for pre-runner migrations (001-%03d)", _PRE_RUNNER_CUTOFF, ) for f in migration_files: match = MIGRATION_PATTERN.match(f.name) if not match: continue num = int(match.group(1)) if num <= _PRE_RUNNER_CUTOFF: cursor.execute( "INSERT INTO _migration_history (filename) VALUES (%s) ON CONFLICT (filename) DO NOTHING", (f.name,), ) conn.commit() logger.info("Seeded %d pre-existing migrations", _PRE_RUNNER_CUTOFF)