""" Step 3: Apply article mappings to all controls + detect duplicates. 1. Update source_citation article/paragraph for controls that have a better mapping 2. Identify duplicate controls (same regulation + article + paragraph) """ import json import os import sys from collections import defaultdict from sqlalchemy import create_engine, text as sql_text DB_URL = os.environ['DATABASE_URL'] engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"}) DRY_RUN = '--dry-run' in sys.argv # Load mappings with open("/tmp/all_article_mappings.json") as f: article_mapping = json.load(f) print(f"Loaded {len(article_mapping)} article mappings") print(f"\n{'=' * 70}") print("STEP 3a: UPDATE CONTROLS WITH IMPROVED ARTICLE MAPPINGS") print(f"{'=' * 70}") with engine.begin() as conn: # Fast approach: load all chunk→control mappings at once print(" Loading chunk→control mappings...") chunk_rows = conn.execute(sql_text(""" SELECT chunk_hash, jsonb_array_elements_text(generated_control_ids) as control_id FROM compliance.canonical_processed_chunks WHERE jsonb_array_length(COALESCE(generated_control_ids, '[]'::jsonb)) > 0 """)).fetchall() control_to_hash = {} for row in chunk_rows: control_to_hash[row[1]] = row[0] print(f" Unique controls with chunk: {len(control_to_hash)}") # Get current article info for controls with citations (skip v1/v2 without citation) print(" Loading control article data...") ctrl_rows = conn.execute(sql_text(""" SELECT id, source_citation->>'article' as current_article, source_citation->>'paragraph' as current_paragraph FROM compliance.canonical_controls WHERE source_citation IS NOT NULL AND release_state NOT IN ('rejected') """)).fetchall() print(f" Controls with citation: {len(ctrl_rows)}") updated = 0 improved = 0 changed = 0 for row in ctrl_rows: ctrl_id = str(row[0]) current_art = row[1] or "" current_para = row[2] or "" chunk_hash = control_to_hash.get(ctrl_id) if not chunk_hash: continue mapping = article_mapping.get(chunk_hash) if not mapping or not mapping["article"]: continue new_art = mapping["article"] new_para = mapping["paragraph"] # Only update if it's an improvement if current_art == new_art and current_para == new_para: continue if not current_art and new_art: improved += 1 elif current_art != new_art: changed += 1 if not DRY_RUN: citation_patch = json.dumps({"article": new_art, "paragraph": new_para}) meta_patch = json.dumps({"source_article": new_art, "source_paragraph": new_para}) conn.execute(sql_text(""" UPDATE compliance.canonical_controls SET source_citation = COALESCE(source_citation, '{}'::jsonb) || CAST(:citation AS jsonb), generation_metadata = COALESCE(generation_metadata, '{}'::jsonb) || CAST(:meta AS jsonb) WHERE id = :id """), {"id": row[0], "citation": citation_patch, "meta": meta_patch}) updated += 1 print(f"\n Updated: {updated}") print(f" New article (was empty): {improved}") print(f" Changed article: {changed}") print(f" Dry run: {DRY_RUN}") # ── Step 3b: Verification — article coverage after update ───────── print(f"\n{'=' * 70}") print("STEP 3b: ARTICLE COVERAGE AFTER UPDATE") print(f"{'=' * 70}") r = conn.execute(sql_text(""" SELECT generation_metadata->>'source_regulation' as reg, count(*) as total, count(*) FILTER (WHERE source_citation->>'article' != '' AND source_citation->>'article' IS NOT NULL) as with_art, count(*) FILTER (WHERE source_citation IS NULL) as no_cit FROM compliance.canonical_controls WHERE release_state NOT IN ('rejected') GROUP BY generation_metadata->>'source_regulation' HAVING count(*) >= 3 ORDER BY count(*) DESC """)) print(f"\n {'Regulation':35s} {'Total':>6s} {'WithArt':>7s} {'%':>5s}") print(f" {'-' * 60}") grand_total = 0 grand_art = 0 for row in r.fetchall(): reg = str(row[0])[:35] if row[0] else "(none/v1v2)" pct = f"{row[2]/row[1]*100:.0f}%" if row[1] > 0 else "" print(f" {reg:35s} {row[1]:6d} {row[2]:7d} {pct:>5s}") grand_total += row[1] grand_art += row[2] print(f"\n TOTAL: {grand_total} controls, {grand_art} with article ({grand_art/grand_total*100:.0f}%)") # ── Step 3c: Duplicate analysis ────────────────────────────────── print(f"\n{'=' * 70}") print("STEP 3c: DUPLICATE CONTROLS (same reg + article + paragraph, >1)") print(f"{'=' * 70}") r2 = conn.execute(sql_text(""" SELECT generation_metadata->>'source_regulation' as reg, source_citation->>'article' as article, source_citation->>'paragraph' as paragraph, count(*) as cnt, array_agg(id ORDER BY created_at) as ids, array_agg(title ORDER BY created_at) as titles, array_agg(release_state ORDER BY created_at) as states FROM compliance.canonical_controls WHERE release_state NOT IN ('rejected', 'too_close') AND source_citation->>'article' IS NOT NULL AND source_citation->>'article' != '' GROUP BY generation_metadata->>'source_regulation', source_citation->>'article', source_citation->>'paragraph' HAVING count(*) > 1 ORDER BY count(*) DESC """)) dup_groups = [] total_dup_controls = 0 total_removable = 0 for row in r2.fetchall(): group = { "reg": row[0], "article": row[1], "paragraph": row[2], "count": row[3], "ids": [str(i) for i in row[4]], "titles": row[5], "states": row[6], } dup_groups.append(group) total_dup_controls += row[3] total_removable += row[3] - 1 # Keep the oldest print(f"\n Duplicate groups: {len(dup_groups)}") print(f" Controls in groups: {total_dup_controls}") print(f" Removable (keep oldest): {total_removable}") # Show top 20 print(f"\n {'Reg':25s} {'Article':15s} {'Para':10s} {'Count':>5s}") print(f" {'-' * 60}") for g in dup_groups[:30]: print(f" {str(g['reg']):25s} {str(g['article']):15s} {str(g['paragraph']):10s} {g['count']:5d}") for i, title in enumerate(g['titles'][:3]): state = g['states'][i] if i < len(g['states']) else '?' marker = "KEEP" if i == 0 else "DUP " print(f" [{marker}][{state:6s}] {title[:70]}") if g['count'] > 3: print(f" ... +{g['count'] - 3} more") # Save dedup plan with open("/tmp/dedup_plan.json", "w") as f: json.dump(dup_groups, f, indent=2, default=str) print(f"\n Saved dedup plan to /tmp/dedup_plan.json")