Files
breakpilot-core/control-pipeline/scripts/migrate_jsonb.py
Benjamin Admin 4716023abc feat(control-pipeline): JSONB migration for generation_metadata
- Add migration script (scripts/migrate_jsonb.py) that converts
  89,443 Python dict repr rows to valid JSON via ast.literal_eval
- Column altered from TEXT to native JSONB
- Index created on generation_metadata->>'merge_group_hint'
- Remove unnecessary ::jsonb casts in pipeline_adapter.py

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-22 07:49:11 +02:00

145 lines
4.6 KiB
Python

#!/usr/bin/env python3
"""
Migrate generation_metadata from TEXT (Python dict repr) to valid JSON.
Converts single quotes, None, True, False to JSON equivalents.
Run this BEFORE altering the column type to JSONB.
Usage:
python3 migrate_jsonb.py [--dry-run] [--batch-size 1000]
"""
import ast
import json
import logging
import os
import sys
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger(__name__)
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@localhost:5432/breakpilot_db",
)
BATCH_SIZE = int(sys.argv[sys.argv.index("--batch-size") + 1]) if "--batch-size" in sys.argv else 1000
DRY_RUN = "--dry-run" in sys.argv
def convert_python_dict_to_json(text_value: str) -> str:
"""Convert Python dict repr to valid JSON string."""
# Try ast.literal_eval first (handles single quotes, None, True, False)
try:
parsed = ast.literal_eval(text_value)
return json.dumps(parsed, ensure_ascii=False)
except (ValueError, SyntaxError):
pass
# Already valid JSON?
try:
json.loads(text_value)
return text_value
except (json.JSONDecodeError, ValueError):
pass
# Manual replacement as fallback
try:
fixed = text_value
fixed = fixed.replace("'", '"')
fixed = fixed.replace("None", "null")
fixed = fixed.replace("True", "true")
fixed = fixed.replace("False", "false")
json.loads(fixed) # Validate
return fixed
except (json.JSONDecodeError, ValueError):
pass
return None
def main():
engine = create_engine(DATABASE_URL)
with engine.connect() as conn:
conn.execute(text("SET search_path TO compliance, core, public"))
# Count rows needing conversion
total = conn.execute(text("""
SELECT COUNT(*) FROM canonical_controls
WHERE generation_metadata IS NOT NULL
AND generation_metadata != ''
AND LEFT(generation_metadata, 2) != '{"'
""")).scalar()
logger.info("Found %d rows with Python dict format (need conversion)", total)
logger.info("Dry run: %s, Batch size: %d", DRY_RUN, BATCH_SIZE)
if total == 0:
logger.info("Nothing to convert!")
return
converted = 0
failed = 0
offset = 0
while offset < total + BATCH_SIZE:
rows = conn.execute(text("""
SELECT id, generation_metadata FROM canonical_controls
WHERE generation_metadata IS NOT NULL
AND generation_metadata != ''
AND LEFT(generation_metadata, 2) != '{"'
ORDER BY id
LIMIT :batch
"""), {"batch": BATCH_SIZE}).fetchall()
if not rows:
break
for row in rows:
result = convert_python_dict_to_json(row.generation_metadata)
if result is None:
failed += 1
logger.warning("FAILED id=%s: %s", row.id, row.generation_metadata[:100])
# Set to empty JSON object so ALTER TABLE doesn't fail
if not DRY_RUN:
conn.execute(text("""
UPDATE canonical_controls
SET generation_metadata = '{}'
WHERE id = :id
"""), {"id": row.id})
else:
converted += 1
if not DRY_RUN:
conn.execute(text("""
UPDATE canonical_controls
SET generation_metadata = :val
WHERE id = :id
"""), {"id": row.id, "val": result})
if not DRY_RUN:
conn.commit()
offset += len(rows)
logger.info("Progress: %d/%d converted, %d failed", converted, total, failed)
# Also set empty strings to NULL
if not DRY_RUN:
nulled = conn.execute(text("""
UPDATE canonical_controls
SET generation_metadata = NULL
WHERE generation_metadata = ''
""")).rowcount
conn.commit()
logger.info("Set %d empty strings to NULL", nulled)
logger.info("DONE: %d converted, %d failed out of %d total", converted, failed, total)
if not DRY_RUN and failed == 0:
logger.info("All rows are now valid JSON. Safe to ALTER COLUMN to JSONB.")
if __name__ == "__main__":
main()