diff --git a/control-pipeline/scripts/migrate_jsonb.py b/control-pipeline/scripts/migrate_jsonb.py new file mode 100644 index 0000000..88626d7 --- /dev/null +++ b/control-pipeline/scripts/migrate_jsonb.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Migrate generation_metadata from TEXT (Python dict repr) to valid JSON. + +Converts single quotes, None, True, False to JSON equivalents. +Run this BEFORE altering the column type to JSONB. + +Usage: + python3 migrate_jsonb.py [--dry-run] [--batch-size 1000] +""" + +import ast +import json +import logging +import os +import sys + +from sqlalchemy import create_engine, text + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") +logger = logging.getLogger(__name__) + +DATABASE_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@localhost:5432/breakpilot_db", +) +BATCH_SIZE = int(sys.argv[sys.argv.index("--batch-size") + 1]) if "--batch-size" in sys.argv else 1000 +DRY_RUN = "--dry-run" in sys.argv + + +def convert_python_dict_to_json(text_value: str) -> str: + """Convert Python dict repr to valid JSON string.""" + # Try ast.literal_eval first (handles single quotes, None, True, False) + try: + parsed = ast.literal_eval(text_value) + return json.dumps(parsed, ensure_ascii=False) + except (ValueError, SyntaxError): + pass + + # Already valid JSON? + try: + json.loads(text_value) + return text_value + except (json.JSONDecodeError, ValueError): + pass + + # Manual replacement as fallback + try: + fixed = text_value + fixed = fixed.replace("'", '"') + fixed = fixed.replace("None", "null") + fixed = fixed.replace("True", "true") + fixed = fixed.replace("False", "false") + json.loads(fixed) # Validate + return fixed + except (json.JSONDecodeError, ValueError): + pass + + return None + + +def main(): + engine = create_engine(DATABASE_URL) + + with engine.connect() as conn: + conn.execute(text("SET search_path TO compliance, core, public")) + + # Count rows needing conversion + total = conn.execute(text(""" + SELECT COUNT(*) FROM canonical_controls + WHERE generation_metadata IS NOT NULL + AND generation_metadata != '' + AND LEFT(generation_metadata, 2) != '{"' + """)).scalar() + + logger.info("Found %d rows with Python dict format (need conversion)", total) + logger.info("Dry run: %s, Batch size: %d", DRY_RUN, BATCH_SIZE) + + if total == 0: + logger.info("Nothing to convert!") + return + + converted = 0 + failed = 0 + offset = 0 + + while offset < total + BATCH_SIZE: + rows = conn.execute(text(""" + SELECT id, generation_metadata FROM canonical_controls + WHERE generation_metadata IS NOT NULL + AND generation_metadata != '' + AND LEFT(generation_metadata, 2) != '{"' + ORDER BY id + LIMIT :batch + """), {"batch": BATCH_SIZE}).fetchall() + + if not rows: + break + + for row in rows: + result = convert_python_dict_to_json(row.generation_metadata) + if result is None: + failed += 1 + logger.warning("FAILED id=%s: %s", row.id, row.generation_metadata[:100]) + # Set to empty JSON object so ALTER TABLE doesn't fail + if not DRY_RUN: + conn.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = '{}' + WHERE id = :id + """), {"id": row.id}) + else: + converted += 1 + if not DRY_RUN: + conn.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = :val + WHERE id = :id + """), {"id": row.id, "val": result}) + + if not DRY_RUN: + conn.commit() + + offset += len(rows) + logger.info("Progress: %d/%d converted, %d failed", converted, total, failed) + + # Also set empty strings to NULL + if not DRY_RUN: + nulled = conn.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = NULL + WHERE generation_metadata = '' + """)).rowcount + conn.commit() + logger.info("Set %d empty strings to NULL", nulled) + + logger.info("DONE: %d converted, %d failed out of %d total", converted, failed, total) + + if not DRY_RUN and failed == 0: + logger.info("All rows are now valid JSON. Safe to ALTER COLUMN to JSONB.") + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/services/pipeline_adapter.py b/control-pipeline/services/pipeline_adapter.py index ceb8c04..1aa704f 100644 --- a/control-pipeline/services/pipeline_adapter.py +++ b/control-pipeline/services/pipeline_adapter.py @@ -452,7 +452,7 @@ class MigrationPasses: "review": """ UPDATE canonical_controls SET generation_metadata = jsonb_set( - COALESCE(generation_metadata::jsonb, '{}'::jsonb), + COALESCE(generation_metadata, '{}'::jsonb), '{triage_status}', '"review"' ) WHERE release_state NOT IN ('deprecated') @@ -462,7 +462,7 @@ class MigrationPasses: "needs_obligation": """ UPDATE canonical_controls SET generation_metadata = jsonb_set( - COALESCE(generation_metadata::jsonb, '{}'::jsonb), + COALESCE(generation_metadata, '{}'::jsonb), '{triage_status}', '"needs_obligation"' ) WHERE release_state NOT IN ('deprecated') @@ -472,7 +472,7 @@ class MigrationPasses: "needs_pattern": """ UPDATE canonical_controls SET generation_metadata = jsonb_set( - COALESCE(generation_metadata::jsonb, '{}'::jsonb), + COALESCE(generation_metadata, '{}'::jsonb), '{triage_status}', '"needs_pattern"' ) WHERE release_state NOT IN ('deprecated') @@ -482,7 +482,7 @@ class MigrationPasses: "legacy_unlinked": """ UPDATE canonical_controls SET generation_metadata = jsonb_set( - COALESCE(generation_metadata::jsonb, '{}'::jsonb), + COALESCE(generation_metadata, '{}'::jsonb), '{triage_status}', '"legacy_unlinked"' ) WHERE release_state NOT IN ('deprecated') @@ -517,7 +517,7 @@ class MigrationPasses: ) SELECT COALESCE( - (generation_metadata::jsonb->>'source_regulation'), + (generation_metadata->>'source_regulation'), '' ) AS regulation_code, obl.value::text AS obligation_id, @@ -586,7 +586,7 @@ class MigrationPasses: UPDATE canonical_controls SET release_state = 'deprecated', generation_metadata = jsonb_set( - COALESCE(generation_metadata::jsonb, '{}'::jsonb), + COALESCE(generation_metadata, '{}'::jsonb), '{deprecated_reason}', '"duplicate_same_obligation_pattern"' ) WHERE id = CAST(:uuid AS uuid)