#!/usr/bin/env python3 """ G-pre2: Build Master Controls from Object Groups + Lifecycle Phases. Groups atomic controls by (object_group_id, phase) and creates Master Controls for groups with >=2 distinct phases. Usage: python3 /app/scripts/gpre2_master_controls.py python3 /app/scripts/gpre2_master_controls.py --min-phases 3 python3 /app/scripts/gpre2_master_controls.py --dry-run """ import argparse import json import logging import os from collections import defaultdict from sqlalchemy import create_engine, text logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("gpre2") DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db") # Canonical phase ordering for lifecycle chains PHASE_ORDER = { "scope": 0, "definition": 1, "governance": 1, "design": 2, "implementation": 3, "configuration": 3, "operation": 4, "training": 4, "monitoring": 5, "testing": 6, "review": 7, "assessment": 8, "remediation": 8, "validation": 9, "reporting": 10, "evidence": 11, } def main(): parser = argparse.ArgumentParser() parser.add_argument("--min-phases", type=int, default=2, help="Min distinct phases for Master Control") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"}) # Step 1: Build reverse index (object_token → group_id) logger.info("Building object → group_id reverse index...") object_to_group = {} with engine.connect() as c: groups = c.execute(text("SELECT group_id, canonical_name, members FROM object_groups")).fetchall() for gid, canonical, members_json in groups: members = json.loads(members_json) if isinstance(members_json, str) else members_json for member in members: object_to_group[member] = (gid, canonical) logger.info("Reverse index: %d objects → %d groups", len(object_to_group), len(groups)) # Step 2: Load all controls with merge_group_hint logger.info("Loading controls with merge_group_hint...") with engine.connect() as c: rows = c.execute(text(""" SELECT id, control_id, generation_metadata->>'merge_group_hint' AS hint, title FROM canonical_controls WHERE generation_metadata->>'merge_group_hint' IS NOT NULL AND generation_metadata->>'merge_group_hint' != '' AND release_state NOT IN ('deprecated', 'rejected') """)).fetchall() logger.info("Loaded %d controls with merge_group_hint", len(rows)) # Step 3: Parse and group by (group_id, phase) # Structure: group_id → {phase → [(control_uuid, control_id, action, title)]} group_phases: dict[int, dict[str, list]] = defaultdict(lambda: defaultdict(list)) group_names: dict[int, str] = {} unmatched = 0 for uuid, control_id, hint, title in rows: parts = hint.split(":", 2) if len(parts) < 2: continue action = parts[0] obj = parts[1] phase = parts[2] if len(parts) > 2 else "implementation" # Normalize object and find group from services.control_dedup import normalize_object normed = normalize_object(obj) if normed in object_to_group: gid, canonical = object_to_group[normed] elif obj in object_to_group: gid, canonical = object_to_group[obj] else: unmatched += 1 continue group_phases[gid][phase].append((str(uuid), control_id, action, title)) group_names[gid] = canonical logger.info("Grouped into %d object groups (%d controls unmatched to any group)", len(group_phases), unmatched) # Step 4: Create Master Controls (groups with >= min_phases distinct phases) master_controls = [] master_members = [] mc_counter = 0 for gid, phases in group_phases.items(): if len(phases) < args.min_phases: continue mc_counter += 1 mc_id = "MC-%d" % gid canonical = group_names.get(gid, "unknown") # Sort phases by lifecycle order sorted_phases = sorted(phases.keys(), key=lambda p: PHASE_ORDER.get(p, 99)) phase_counts = {p: len(ctrls) for p, ctrls in phases.items()} total = sum(phase_counts.values()) master_controls.append({ "master_control_id": mc_id, "object_group_id": gid, "canonical_name": canonical, "phases_covered": json.dumps(sorted_phases), "phase_control_count": json.dumps(phase_counts), "total_controls": total, }) for phase, controls in phases.items(): for ctrl_uuid, ctrl_id, action, title in controls: master_members.append({ "mc_id": mc_id, "control_uuid": ctrl_uuid, "phase": phase, "action": action, }) logger.info("Created %d Master Controls with %d members (min %d phases)", len(master_controls), len(master_members), args.min_phases) # Stats if master_controls: phase_counts = [mc["total_controls"] for mc in master_controls] phases_per_mc = [len(json.loads(mc["phases_covered"])) for mc in master_controls] logger.info(" Avg controls per MC: %.1f", sum(phase_counts) / len(phase_counts)) logger.info(" Avg phases per MC: %.1f", sum(phases_per_mc) / len(phases_per_mc)) logger.info(" Max controls in MC: %d", max(phase_counts)) logger.info(" Max phases in MC: %d", max(phases_per_mc)) # Top 10 top10 = sorted(master_controls, key=lambda x: -x["total_controls"])[:10] logger.info("\nTop 10 Master Controls:") for mc in top10: logger.info(" %s: %s (%d controls, phases: %s)", mc["master_control_id"], mc["canonical_name"], mc["total_controls"], mc["phases_covered"]) if args.dry_run: logger.info("DRY RUN — not writing to DB") return # Step 5: Write to DB with engine.begin() as c: c.execute(text("SET search_path TO compliance, public")) c.execute(text("DELETE FROM master_control_members")) c.execute(text("DELETE FROM master_controls")) for mc in master_controls: c.execute(text(""" INSERT INTO master_controls (master_control_id, object_group_id, canonical_name, phases_covered, phase_control_count, total_controls) VALUES (:master_control_id, :object_group_id, :canonical_name, CAST(:phases_covered AS jsonb), CAST(:phase_control_count AS jsonb), :total_controls) """), mc) # Get MC UUIDs for member inserts mc_uuids = {} for row in c.execute(text("SELECT id, master_control_id FROM master_controls")).fetchall(): mc_uuids[row[1]] = str(row[0]) for mem in master_members: mc_uuid = mc_uuids.get(mem["mc_id"]) if not mc_uuid: continue c.execute(text(""" INSERT INTO master_control_members (master_control_uuid, control_uuid, phase, action) VALUES (CAST(:mc_uuid AS uuid), CAST(:control_uuid AS uuid), :phase, :action) """), { "mc_uuid": mc_uuid, "control_uuid": mem["control_uuid"], "phase": mem["phase"], "action": mem["action"], }) logger.info("Wrote %d Master Controls + %d members to DB", len(master_controls), len(master_members)) if __name__ == "__main__": main()