#!/usr/bin/env python3 """ G-pre2 v2: Build Master Controls directly from canonical tokens. No K-Means needed — Phase 2 already normalized merge_group_hints to 74 canonical tokens. Each token = one object group. Groups controls by (canonical_token, phase) and creates MCs for tokens with >=2 distinct phases. Usage: python3 /app/scripts/gpre2_direct_mc.py --dry-run python3 /app/scripts/gpre2_direct_mc.py --min-phases 2 """ import argparse import json import logging import os from collections import defaultdict from sqlalchemy import create_engine, text logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger("gpre2-direct") DB_URL = os.getenv( "DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", ) PHASE_ORDER = { "scope": 0, "definition": 1, "governance": 1, "design": 2, "implementation": 3, "configuration": 3, "operation": 4, "training": 4, "monitoring": 5, "testing": 6, "review": 7, "assessment": 8, "remediation": 8, "validation": 9, "reporting": 10, "evidence": 11, } def main(): parser = argparse.ArgumentParser() parser.add_argument("--min-phases", type=int, default=2) parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine( DB_URL, connect_args={"options": "-c search_path=compliance,public"} ) # Step 1: Load all controls with merge_group_hint logger.info("Loading controls...") with engine.connect() as c: rows = c.execute(text(""" SELECT id, control_id, generation_metadata->>'merge_group_hint' AS hint FROM canonical_controls WHERE generation_metadata->>'merge_group_hint' IS NOT NULL AND generation_metadata->>'merge_group_hint' != '' AND release_state NOT IN ('deprecated', 'rejected') """)).fetchall() logger.info("Loaded %d controls", len(rows)) # Step 2: Group by (object_token, phase) token_phases: dict[str, dict[str, list]] = defaultdict( lambda: defaultdict(list) ) for uuid, control_id, hint in rows: parts = hint.split(":", 2) if len(parts) < 2: continue action = parts[0] obj = parts[1] phase = parts[2] if len(parts) > 2 else "implementation" token_phases[obj][phase].append((str(uuid), control_id, action)) logger.info("Found %d unique object tokens", len(token_phases)) # Step 3: Create Master Controls master_controls = [] master_members = [] for token, phases in token_phases.items(): if len(phases) < args.min_phases: continue sorted_phases = sorted( phases.keys(), key=lambda p: PHASE_ORDER.get(p, 99) ) phase_counts = {p: len(ctrls) for p, ctrls in phases.items()} total = sum(phase_counts.values()) master_controls.append({ "canonical_name": token, "phases_covered": json.dumps(sorted_phases), "phase_control_count": json.dumps(phase_counts), "total_controls": total, }) for phase, controls in phases.items(): for ctrl_uuid, ctrl_id, action in controls: master_members.append({ "canonical_name": token, "control_uuid": ctrl_uuid, "phase": phase, "action": action, }) logger.info( "Created %d Master Controls with %d members (min %d phases)", len(master_controls), len(master_members), args.min_phases, ) # Stats if master_controls: counts = [mc["total_controls"] for mc in master_controls] phases_per = [ len(json.loads(mc["phases_covered"])) for mc in master_controls ] logger.info(" Avg controls/MC: %.1f", sum(counts) / len(counts)) logger.info(" Max controls/MC: %d", max(counts)) logger.info(" Avg phases/MC: %.1f", sum(phases_per) / len(phases_per)) logger.info(" Max phases/MC: %d", max(phases_per)) # Size distribution logger.info("\n Size distribution:") logger.info(" ≤10: %d", sum(1 for c in counts if c <= 10)) logger.info(" 11-50: %d", sum(1 for c in counts if 11 <= c <= 50)) logger.info(" 51-200: %d", sum(1 for c in counts if 51 <= c <= 200)) logger.info(" 201-500: %d", sum(1 for c in counts if 201 <= c <= 500)) logger.info(" 501-2K: %d", sum(1 for c in counts if 501 <= c <= 2000)) logger.info(" >2K: %d", sum(1 for c in counts if c > 2000)) # Top 15 top = sorted(master_controls, key=lambda x: -x["total_controls"])[:15] logger.info("\n Top 15 Master Controls:") for mc in top: logger.info( " %6d %s (%d phases)", mc["total_controls"], mc["canonical_name"], len(json.loads(mc["phases_covered"])), ) if args.dry_run: logger.info("\nDRY RUN — not writing to DB") return # Step 4: Write to DB with engine.begin() as c: c.execute(text("SET search_path TO compliance, public")) c.execute(text("DELETE FROM master_control_members")) c.execute(text("DELETE FROM master_controls")) # Get next object_group_id max_gid = c.execute( text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups") ).scalar() next_gid = max_gid + 1 mc_uuids = {} for mc in master_controls: gid = next_gid next_gid += 1 mc_id = f"MC-{gid}" c.execute(text(""" INSERT INTO master_controls (master_control_id, object_group_id, canonical_name, phases_covered, phase_control_count, total_controls) VALUES (:mcid, :gid, :name, CAST(:phases AS jsonb), CAST(:pcounts AS jsonb), :total) """), { "mcid": mc_id, "gid": gid, "name": mc["canonical_name"], "phases": mc["phases_covered"], "pcounts": mc["phase_control_count"], "total": mc["total_controls"], }) mc_uuid = c.execute(text( "SELECT id FROM master_controls WHERE master_control_id = :mcid" ), {"mcid": mc_id}).scalar() mc_uuids[mc["canonical_name"]] = str(mc_uuid) # Insert members mem_count = 0 for mem in master_members: mc_uuid = mc_uuids.get(mem["canonical_name"]) if not mc_uuid: continue c.execute(text(""" INSERT INTO master_control_members (master_control_uuid, control_uuid, phase, action) VALUES (CAST(:mc AS uuid), CAST(:ctrl AS uuid), :phase, :action) """), { "mc": mc_uuid, "ctrl": mem["control_uuid"], "phase": mem["phase"], "action": mem["action"], }) mem_count += 1 logger.info("Wrote %d MCs + %d members to DB", len(master_controls), mem_count) if __name__ == "__main__": main()