#!/usr/bin/env python3 """ Migrate generation_metadata from TEXT (Python dict repr) to valid JSON. Converts single quotes, None, True, False to JSON equivalents. Run this BEFORE altering the column type to JSONB. Usage: python3 migrate_jsonb.py [--dry-run] [--batch-size 1000] """ import ast import json import logging import os import sys from sqlalchemy import create_engine, text logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") logger = logging.getLogger(__name__) DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql://breakpilot:breakpilot123@localhost:5432/breakpilot_db", ) BATCH_SIZE = int(sys.argv[sys.argv.index("--batch-size") + 1]) if "--batch-size" in sys.argv else 1000 DRY_RUN = "--dry-run" in sys.argv def convert_python_dict_to_json(text_value: str) -> str: """Convert Python dict repr to valid JSON string.""" # Try ast.literal_eval first (handles single quotes, None, True, False) try: parsed = ast.literal_eval(text_value) return json.dumps(parsed, ensure_ascii=False) except (ValueError, SyntaxError): pass # Already valid JSON? try: json.loads(text_value) return text_value except (json.JSONDecodeError, ValueError): pass # Manual replacement as fallback try: fixed = text_value fixed = fixed.replace("'", '"') fixed = fixed.replace("None", "null") fixed = fixed.replace("True", "true") fixed = fixed.replace("False", "false") json.loads(fixed) # Validate return fixed except (json.JSONDecodeError, ValueError): pass return None def main(): engine = create_engine(DATABASE_URL) with engine.connect() as conn: conn.execute(text("SET search_path TO compliance, core, public")) # Count rows needing conversion total = conn.execute(text(""" SELECT COUNT(*) FROM canonical_controls WHERE generation_metadata IS NOT NULL AND generation_metadata != '' AND LEFT(generation_metadata, 2) != '{"' """)).scalar() logger.info("Found %d rows with Python dict format (need conversion)", total) logger.info("Dry run: %s, Batch size: %d", DRY_RUN, BATCH_SIZE) if total == 0: logger.info("Nothing to convert!") return converted = 0 failed = 0 offset = 0 while offset < total + BATCH_SIZE: rows = conn.execute(text(""" SELECT id, generation_metadata FROM canonical_controls WHERE generation_metadata IS NOT NULL AND generation_metadata != '' AND LEFT(generation_metadata, 2) != '{"' ORDER BY id LIMIT :batch """), {"batch": BATCH_SIZE}).fetchall() if not rows: break for row in rows: result = convert_python_dict_to_json(row.generation_metadata) if result is None: failed += 1 logger.warning("FAILED id=%s: %s", row.id, row.generation_metadata[:100]) # Set to empty JSON object so ALTER TABLE doesn't fail if not DRY_RUN: conn.execute(text(""" UPDATE canonical_controls SET generation_metadata = '{}' WHERE id = :id """), {"id": row.id}) else: converted += 1 if not DRY_RUN: conn.execute(text(""" UPDATE canonical_controls SET generation_metadata = :val WHERE id = :id """), {"id": row.id, "val": result}) if not DRY_RUN: conn.commit() offset += len(rows) logger.info("Progress: %d/%d converted, %d failed", converted, total, failed) # Also set empty strings to NULL if not DRY_RUN: nulled = conn.execute(text(""" UPDATE canonical_controls SET generation_metadata = NULL WHERE generation_metadata = '' """)).rowcount conn.commit() logger.info("Set %d empty strings to NULL", nulled) logger.info("DONE: %d converted, %d failed out of %d total", converted, failed, total) if not DRY_RUN and failed == 0: logger.info("All rows are now valid JSON. Safe to ALTER COLUMN to JSONB.") if __name__ == "__main__": main()