feat(control-pipeline): JSONB migration for generation_metadata
- Add migration script (scripts/migrate_jsonb.py) that converts 89,443 Python dict repr rows to valid JSON via ast.literal_eval - Column altered from TEXT to native JSONB - Index created on generation_metadata->>'merge_group_hint' - Remove unnecessary ::jsonb casts in pipeline_adapter.py Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
144
control-pipeline/scripts/migrate_jsonb.py
Normal file
144
control-pipeline/scripts/migrate_jsonb.py
Normal file
@@ -0,0 +1,144 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migrate generation_metadata from TEXT (Python dict repr) to valid JSON.
|
||||
|
||||
Converts single quotes, None, True, False to JSON equivalents.
|
||||
Run this BEFORE altering the column type to JSONB.
|
||||
|
||||
Usage:
|
||||
python3 migrate_jsonb.py [--dry-run] [--batch-size 1000]
|
||||
"""
|
||||
|
||||
import ast
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DATABASE_URL = os.getenv(
|
||||
"DATABASE_URL",
|
||||
"postgresql://breakpilot:breakpilot123@localhost:5432/breakpilot_db",
|
||||
)
|
||||
BATCH_SIZE = int(sys.argv[sys.argv.index("--batch-size") + 1]) if "--batch-size" in sys.argv else 1000
|
||||
DRY_RUN = "--dry-run" in sys.argv
|
||||
|
||||
|
||||
def convert_python_dict_to_json(text_value: str) -> str:
|
||||
"""Convert Python dict repr to valid JSON string."""
|
||||
# Try ast.literal_eval first (handles single quotes, None, True, False)
|
||||
try:
|
||||
parsed = ast.literal_eval(text_value)
|
||||
return json.dumps(parsed, ensure_ascii=False)
|
||||
except (ValueError, SyntaxError):
|
||||
pass
|
||||
|
||||
# Already valid JSON?
|
||||
try:
|
||||
json.loads(text_value)
|
||||
return text_value
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# Manual replacement as fallback
|
||||
try:
|
||||
fixed = text_value
|
||||
fixed = fixed.replace("'", '"')
|
||||
fixed = fixed.replace("None", "null")
|
||||
fixed = fixed.replace("True", "true")
|
||||
fixed = fixed.replace("False", "false")
|
||||
json.loads(fixed) # Validate
|
||||
return fixed
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
engine = create_engine(DATABASE_URL)
|
||||
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("SET search_path TO compliance, core, public"))
|
||||
|
||||
# Count rows needing conversion
|
||||
total = conn.execute(text("""
|
||||
SELECT COUNT(*) FROM canonical_controls
|
||||
WHERE generation_metadata IS NOT NULL
|
||||
AND generation_metadata != ''
|
||||
AND LEFT(generation_metadata, 2) != '{"'
|
||||
""")).scalar()
|
||||
|
||||
logger.info("Found %d rows with Python dict format (need conversion)", total)
|
||||
logger.info("Dry run: %s, Batch size: %d", DRY_RUN, BATCH_SIZE)
|
||||
|
||||
if total == 0:
|
||||
logger.info("Nothing to convert!")
|
||||
return
|
||||
|
||||
converted = 0
|
||||
failed = 0
|
||||
offset = 0
|
||||
|
||||
while offset < total + BATCH_SIZE:
|
||||
rows = conn.execute(text("""
|
||||
SELECT id, generation_metadata FROM canonical_controls
|
||||
WHERE generation_metadata IS NOT NULL
|
||||
AND generation_metadata != ''
|
||||
AND LEFT(generation_metadata, 2) != '{"'
|
||||
ORDER BY id
|
||||
LIMIT :batch
|
||||
"""), {"batch": BATCH_SIZE}).fetchall()
|
||||
|
||||
if not rows:
|
||||
break
|
||||
|
||||
for row in rows:
|
||||
result = convert_python_dict_to_json(row.generation_metadata)
|
||||
if result is None:
|
||||
failed += 1
|
||||
logger.warning("FAILED id=%s: %s", row.id, row.generation_metadata[:100])
|
||||
# Set to empty JSON object so ALTER TABLE doesn't fail
|
||||
if not DRY_RUN:
|
||||
conn.execute(text("""
|
||||
UPDATE canonical_controls
|
||||
SET generation_metadata = '{}'
|
||||
WHERE id = :id
|
||||
"""), {"id": row.id})
|
||||
else:
|
||||
converted += 1
|
||||
if not DRY_RUN:
|
||||
conn.execute(text("""
|
||||
UPDATE canonical_controls
|
||||
SET generation_metadata = :val
|
||||
WHERE id = :id
|
||||
"""), {"id": row.id, "val": result})
|
||||
|
||||
if not DRY_RUN:
|
||||
conn.commit()
|
||||
|
||||
offset += len(rows)
|
||||
logger.info("Progress: %d/%d converted, %d failed", converted, total, failed)
|
||||
|
||||
# Also set empty strings to NULL
|
||||
if not DRY_RUN:
|
||||
nulled = conn.execute(text("""
|
||||
UPDATE canonical_controls
|
||||
SET generation_metadata = NULL
|
||||
WHERE generation_metadata = ''
|
||||
""")).rowcount
|
||||
conn.commit()
|
||||
logger.info("Set %d empty strings to NULL", nulled)
|
||||
|
||||
logger.info("DONE: %d converted, %d failed out of %d total", converted, failed, total)
|
||||
|
||||
if not DRY_RUN and failed == 0:
|
||||
logger.info("All rows are now valid JSON. Safe to ALTER COLUMN to JSONB.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -452,7 +452,7 @@ class MigrationPasses:
|
||||
"review": """
|
||||
UPDATE canonical_controls
|
||||
SET generation_metadata = jsonb_set(
|
||||
COALESCE(generation_metadata::jsonb, '{}'::jsonb),
|
||||
COALESCE(generation_metadata, '{}'::jsonb),
|
||||
'{triage_status}', '"review"'
|
||||
)
|
||||
WHERE release_state NOT IN ('deprecated')
|
||||
@@ -462,7 +462,7 @@ class MigrationPasses:
|
||||
"needs_obligation": """
|
||||
UPDATE canonical_controls
|
||||
SET generation_metadata = jsonb_set(
|
||||
COALESCE(generation_metadata::jsonb, '{}'::jsonb),
|
||||
COALESCE(generation_metadata, '{}'::jsonb),
|
||||
'{triage_status}', '"needs_obligation"'
|
||||
)
|
||||
WHERE release_state NOT IN ('deprecated')
|
||||
@@ -472,7 +472,7 @@ class MigrationPasses:
|
||||
"needs_pattern": """
|
||||
UPDATE canonical_controls
|
||||
SET generation_metadata = jsonb_set(
|
||||
COALESCE(generation_metadata::jsonb, '{}'::jsonb),
|
||||
COALESCE(generation_metadata, '{}'::jsonb),
|
||||
'{triage_status}', '"needs_pattern"'
|
||||
)
|
||||
WHERE release_state NOT IN ('deprecated')
|
||||
@@ -482,7 +482,7 @@ class MigrationPasses:
|
||||
"legacy_unlinked": """
|
||||
UPDATE canonical_controls
|
||||
SET generation_metadata = jsonb_set(
|
||||
COALESCE(generation_metadata::jsonb, '{}'::jsonb),
|
||||
COALESCE(generation_metadata, '{}'::jsonb),
|
||||
'{triage_status}', '"legacy_unlinked"'
|
||||
)
|
||||
WHERE release_state NOT IN ('deprecated')
|
||||
@@ -517,7 +517,7 @@ class MigrationPasses:
|
||||
)
|
||||
SELECT
|
||||
COALESCE(
|
||||
(generation_metadata::jsonb->>'source_regulation'),
|
||||
(generation_metadata->>'source_regulation'),
|
||||
''
|
||||
) AS regulation_code,
|
||||
obl.value::text AS obligation_id,
|
||||
@@ -586,7 +586,7 @@ class MigrationPasses:
|
||||
UPDATE canonical_controls
|
||||
SET release_state = 'deprecated',
|
||||
generation_metadata = jsonb_set(
|
||||
COALESCE(generation_metadata::jsonb, '{}'::jsonb),
|
||||
COALESCE(generation_metadata, '{}'::jsonb),
|
||||
'{deprecated_reason}', '"duplicate_same_obligation_pattern"'
|
||||
)
|
||||
WHERE id = CAST(:uuid AS uuid)
|
||||
|
||||
Reference in New Issue
Block a user