diff --git a/control-pipeline/api/__init__.py b/control-pipeline/api/__init__.py index 5d23aae..c3087c9 100644 --- a/control-pipeline/api/__init__.py +++ b/control-pipeline/api/__init__.py @@ -4,9 +4,11 @@ from api.control_generator_routes import router as generator_router from api.canonical_control_routes import router as canonical_router from api.document_compliance_routes import router as document_router from api.dependency_routes import router as dependency_router +from api.master_control_routes import router as master_control_router router = APIRouter() router.include_router(generator_router) router.include_router(canonical_router) router.include_router(document_router) router.include_router(dependency_router) +router.include_router(master_control_router) diff --git a/control-pipeline/api/master_control_routes.py b/control-pipeline/api/master_control_routes.py new file mode 100644 index 0000000..70ffa04 --- /dev/null +++ b/control-pipeline/api/master_control_routes.py @@ -0,0 +1,178 @@ +"""Master Control API — G-pre3. + +Provides read access to Master Controls (lifecycle-grouped atomic controls). +""" + +import json +import logging +from typing import Optional + +from fastapi import APIRouter, HTTPException, Query +from sqlalchemy import text + +from db.session import SessionLocal + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/v1/master-controls", tags=["master-controls"]) + + +@router.get("") +async def list_master_controls( + limit: int = Query(50, ge=1, le=500), + offset: int = Query(0, ge=0), + search: Optional[str] = None, + min_phases: Optional[int] = None, + min_controls: Optional[int] = None, + sort: str = Query("total_controls", regex="^(total_controls|phases|name|created_at)$"), +): + """List Master Controls with optional filtering.""" + db = SessionLocal() + try: + where_clauses = [] + params: dict = {"limit": limit, "offset": offset} + + if search: + where_clauses.append("mc.canonical_name ILIKE :search") + params["search"] = f"%{search}%" + if min_phases: + where_clauses.append("jsonb_array_length(mc.phases_covered) >= :min_phases") + params["min_phases"] = min_phases + if min_controls: + where_clauses.append("mc.total_controls >= :min_controls") + params["min_controls"] = min_controls + + where = "WHERE " + " AND ".join(where_clauses) if where_clauses else "" + + sort_map = { + "total_controls": "mc.total_controls DESC", + "phases": "jsonb_array_length(mc.phases_covered) DESC", + "name": "mc.canonical_name ASC", + "created_at": "mc.created_at DESC", + } + order = sort_map.get(sort, "mc.total_controls DESC") + + rows = db.execute(text(f""" + SELECT mc.id, mc.master_control_id, mc.object_group_id, + mc.canonical_name, mc.phases_covered, + mc.phase_control_count, mc.total_controls, + mc.created_at + FROM master_controls mc + {where} + ORDER BY {order} + LIMIT :limit OFFSET :offset + """), params).fetchall() + + total = db.execute(text(f""" + SELECT count(*) FROM master_controls mc {where} + """), params).scalar() + + return { + "total": total, + "limit": limit, + "offset": offset, + "master_controls": [ + { + "id": str(r[0]), + "master_control_id": r[1], + "object_group_id": r[2], + "canonical_name": r[3], + "phases_covered": r[4], + "phase_control_count": r[5], + "total_controls": r[6], + "created_at": str(r[7]), + } + for r in rows + ], + } + finally: + db.close() + + +@router.get("/stats") +async def master_control_stats(): + """Aggregate statistics about Master Controls.""" + db = SessionLocal() + try: + stats = db.execute(text(""" + SELECT + count(*) AS total_master_controls, + sum(total_controls) AS total_member_controls, + avg(total_controls)::int AS avg_controls_per_mc, + max(total_controls) AS max_controls, + avg(jsonb_array_length(phases_covered))::numeric(3,1) AS avg_phases, + max(jsonb_array_length(phases_covered)) AS max_phases + FROM master_controls + """)).fetchone() + + phase_dist = db.execute(text(""" + SELECT phase, count(*) AS control_count + FROM master_control_members + GROUP BY phase + ORDER BY control_count DESC + """)).fetchall() + + return { + "total_master_controls": stats[0], + "total_member_controls": stats[1], + "avg_controls_per_mc": stats[2], + "max_controls": stats[3], + "avg_phases": float(stats[4]) if stats[4] else 0, + "max_phases": stats[5], + "phase_distribution": {r[0]: r[1] for r in phase_dist}, + } + finally: + db.close() + + +@router.get("/{mc_id}") +async def get_master_control(mc_id: str): + """Get a single Master Control with all phase-controls.""" + db = SessionLocal() + try: + mc = db.execute(text(""" + SELECT mc.id, mc.master_control_id, mc.object_group_id, + mc.canonical_name, mc.phases_covered, + mc.phase_control_count, mc.total_controls + FROM master_controls mc + WHERE mc.master_control_id = :mc_id + """), {"mc_id": mc_id}).fetchone() + + if not mc: + raise HTTPException(status_code=404, detail="Master Control not found") + + members = db.execute(text(""" + SELECT mcm.phase, mcm.action, + cc.control_id, cc.title, cc.severity, + cc.source_citation->>'source' AS source + FROM master_control_members mcm + JOIN canonical_controls cc ON cc.id = mcm.control_uuid + WHERE mcm.master_control_uuid = CAST(:mc_uuid AS uuid) + ORDER BY mcm.phase, cc.control_id + """), {"mc_uuid": str(mc[0])}).fetchall() + + # Group by phase + phases = {} + for phase, action, ctrl_id, title, severity, source in members: + if phase not in phases: + phases[phase] = [] + phases[phase].append({ + "control_id": ctrl_id, + "title": title, + "action": action, + "severity": severity, + "source": source, + }) + + return { + "id": str(mc[0]), + "master_control_id": mc[1], + "object_group_id": mc[2], + "canonical_name": mc[3], + "phases_covered": mc[4], + "phase_control_count": mc[5], + "total_controls": mc[6], + "phases": phases, + } + finally: + db.close() diff --git a/control-pipeline/migrations/004_object_groups.sql b/control-pipeline/migrations/004_object_groups.sql new file mode 100644 index 0000000..33486a5 --- /dev/null +++ b/control-pipeline/migrations/004_object_groups.sql @@ -0,0 +1,18 @@ +-- Migration 004: Object Groups (G-pre1) +-- Schema: compliance +-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/004_object_groups.sql + +SET search_path TO compliance, public; + +CREATE TABLE IF NOT EXISTS object_groups ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + group_id INTEGER NOT NULL, + canonical_name VARCHAR(200) NOT NULL, + member_count INTEGER DEFAULT 0, + members JSONB DEFAULT '[]', + top_controls_count INTEGER DEFAULT 0, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_object_groups_group_id ON object_groups(group_id); +CREATE INDEX IF NOT EXISTS idx_object_groups_canonical ON object_groups(canonical_name); diff --git a/control-pipeline/migrations/005_master_controls.sql b/control-pipeline/migrations/005_master_controls.sql new file mode 100644 index 0000000..1aed287 --- /dev/null +++ b/control-pipeline/migrations/005_master_controls.sql @@ -0,0 +1,30 @@ +-- Migration 005: Master Controls (G-pre2) +-- Schema: compliance +-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/005_master_controls.sql + +SET search_path TO compliance, public; + +CREATE TABLE IF NOT EXISTS master_controls ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + master_control_id VARCHAR(50) UNIQUE NOT NULL, + object_group_id INTEGER NOT NULL, + canonical_name VARCHAR(200) NOT NULL, + phases_covered JSONB NOT NULL DEFAULT '[]', + phase_control_count JSONB NOT NULL DEFAULT '{}', + total_controls INTEGER DEFAULT 0, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_master_controls_group ON master_controls(object_group_id); + +CREATE TABLE IF NOT EXISTS master_control_members ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + master_control_uuid UUID NOT NULL REFERENCES master_controls(id) ON DELETE CASCADE, + control_uuid UUID NOT NULL, + phase VARCHAR(50) NOT NULL, + action VARCHAR(50) NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_mc_members_master ON master_control_members(master_control_uuid); +CREATE INDEX IF NOT EXISTS idx_mc_members_control ON master_control_members(control_uuid); diff --git a/control-pipeline/scripts/gpre1_object_clustering.py b/control-pipeline/scripts/gpre1_object_clustering.py new file mode 100644 index 0000000..7205def --- /dev/null +++ b/control-pipeline/scripts/gpre1_object_clustering.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python3 +""" +G-pre1: Object Clustering via Mini-Batch K-Means on Embeddings. + +Clusters ~144k unique normalized objects into ~15-25k semantic groups +using bge-m3 embeddings and Mini-Batch K-Means. + +Usage (inside control-pipeline container): + python3 /app/scripts/gpre1_object_clustering.py --k 20000 + python3 /app/scripts/gpre1_object_clustering.py --k 20000 --dry-run +""" + +import argparse +import json +import logging +import sys +import time +from collections import Counter + +import httpx +import numpy as np +from sklearn.cluster import MiniBatchKMeans +from sqlalchemy import create_engine, text + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("gpre1") + +import os +DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db") +EMBEDDING_URL = "http://embedding-service:8087" +BATCH_SIZE = 64 # Embeddings per API call + + +def extract_objects(engine) -> tuple[list[str], dict[str, int]]: + """Extract unique normalized objects and their frequencies.""" + from services.control_dedup import normalize_object + + logger.info("Extracting objects from canonical_controls...") + with engine.connect() as c: + rows = c.execute(text(""" + SELECT split_part(generation_metadata->>'merge_group_hint', ':', 2) AS obj, + count(*) AS freq + FROM canonical_controls + WHERE generation_metadata->>'merge_group_hint' IS NOT NULL + AND generation_metadata->>'merge_group_hint' != '' + GROUP BY 1 + """)).fetchall() + + # Normalize and aggregate + norm_freq: Counter = Counter() + norm_to_raw: dict[str, list[str]] = {} + for raw_obj, freq in rows: + if not raw_obj or not raw_obj.strip(): + continue + normed = normalize_object(raw_obj) + norm_freq[normed] += freq + norm_to_raw.setdefault(normed, []).append(raw_obj) + + objects = list(norm_freq.keys()) + freqs = {obj: norm_freq[obj] for obj in objects} + logger.info("Extracted %d unique normalized objects (from %d raw)", len(objects), len(rows)) + return objects, freqs + + +def generate_embeddings(objects: list[str]) -> np.ndarray: + """Generate embeddings via embedding-service in batches. + + Uses pre-allocated numpy array to avoid Python list memory overhead + (Python float = 28 bytes vs numpy float32 = 4 bytes). + """ + total = len(objects) + # Pre-allocate: 144k × 1024 × 4 bytes = ~590 MB (vs ~4 GB with Python lists) + result = np.zeros((total, 1024), dtype=np.float32) + logger.info("Generating embeddings for %d objects (pre-allocated %.0f MB)...", + total, result.nbytes / 1024 / 1024) + + failed_batches = [] + for i in range(0, total, BATCH_SIZE): + batch = objects[i:i + BATCH_SIZE] + success = False + for attempt in range(3): # Max 3 retries per batch + try: + with httpx.Client(timeout=httpx.Timeout(60.0, connect=10.0)) as client: + resp = client.post( + f"{EMBEDDING_URL}/embed", + json={"texts": batch}, + ) + resp.raise_for_status() + embeddings = resp.json().get("embeddings", []) + end = min(i + len(embeddings), total) + result[i:end] = np.array(embeddings, dtype=np.float32) + success = True + break + except Exception as e: + if attempt < 2: + logger.warning("Batch %d attempt %d failed: %s — retrying", i, attempt + 1, e) + import time + time.sleep(2) + else: + logger.error("Batch %d failed after 3 attempts: %s", i, e) + failed_batches.append(i) + + if (i + BATCH_SIZE) % 5000 == 0 or i + BATCH_SIZE >= total: + logger.info(" Embedded %d/%d (%.1f%%) [%d failed]", + min(i + BATCH_SIZE, total), total, + min(i + BATCH_SIZE, total) / total * 100, + len(failed_batches)) + + return result + + +def cluster_objects(embeddings: np.ndarray, k: int) -> np.ndarray: + """Run Mini-Batch K-Means clustering.""" + logger.info("Clustering %d objects into %d groups (Mini-Batch K-Means)...", len(embeddings), k) + + # Normalize embeddings for cosine-like clustering + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + norms[norms == 0] = 1 + normalized = embeddings / norms + + kmeans = MiniBatchKMeans( + n_clusters=k, + batch_size=1000, + max_iter=100, + random_state=42, + verbose=0, + ) + labels = kmeans.fit_predict(normalized) + logger.info("Clustering done. Inertia: %.2f", kmeans.inertia_) + return labels + + +def store_results(engine, objects: list[str], freqs: dict[str, int], + labels: np.ndarray, dry_run: bool): + """Store clustering results in object_groups table.""" + # Build groups + groups: dict[int, list[tuple[str, int]]] = {} + for i, obj in enumerate(objects): + gid = int(labels[i]) + groups.setdefault(gid, []).append((obj, freqs.get(obj, 0))) + + # Pick canonical name (highest frequency in group) + results = [] + for gid, members in groups.items(): + members_sorted = sorted(members, key=lambda x: -x[1]) + canonical = members_sorted[0][0] + results.append({ + "group_id": gid, + "canonical_name": canonical, + "member_count": len(members), + "members": json.dumps([m[0] for m in members_sorted]), + "top_controls_count": members_sorted[0][1], + }) + + # Stats + sizes = [r["member_count"] for r in results] + logger.info("Groups: %d total", len(results)) + logger.info(" Singletons: %d", sum(1 for s in sizes if s == 1)) + logger.info(" Groups 2-5: %d", sum(1 for s in sizes if 2 <= s <= 5)) + logger.info(" Groups 6-20: %d", sum(1 for s in sizes if 6 <= s <= 20)) + logger.info(" Groups 21-100: %d", sum(1 for s in sizes if 21 <= s <= 100)) + logger.info(" Groups >100: %d", sum(1 for s in sizes if s > 100)) + logger.info(" Max group size: %d", max(sizes)) + logger.info(" Avg group size: %.1f", sum(sizes) / len(sizes)) + + # Top 10 largest groups + top10 = sorted(results, key=lambda x: -x["member_count"])[:10] + logger.info("\nTop 10 largest groups:") + for g in top10: + members_list = json.loads(g["members"]) + logger.info(" [%d] %s (%d members): %s", + g["group_id"], g["canonical_name"], g["member_count"], + ", ".join(members_list[:5])) + + if dry_run: + logger.info("DRY RUN — not writing to DB") + return + + # Write to DB + with engine.begin() as conn: + conn.execute(text("SET search_path TO compliance, public")) + conn.execute(text("DELETE FROM object_groups")) # Clear old results + for r in results: + conn.execute(text(""" + INSERT INTO object_groups (group_id, canonical_name, member_count, members, top_controls_count) + VALUES (:group_id, :canonical_name, :member_count, CAST(:members AS jsonb), :top_controls_count) + """), r) + logger.info("Wrote %d groups to object_groups table", len(results)) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--k", type=int, default=20000, help="Number of clusters") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"}) + + # Step 1: Extract + objects, freqs = extract_objects(engine) + + # Step 2: Embed + embeddings = generate_embeddings(objects) + logger.info("Embedding matrix: %s (%.1f MB)", embeddings.shape, + embeddings.nbytes / 1024 / 1024) + + # Adjust k if we have fewer objects + k = min(args.k, len(objects) // 2) + logger.info("Using k=%d (requested %d, objects=%d)", k, args.k, len(objects)) + + # Step 3: Cluster + labels = cluster_objects(embeddings, k) + + # Step 4: Store + store_results(engine, objects, freqs, labels, args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre1_subcluster.py b/control-pipeline/scripts/gpre1_subcluster.py new file mode 100644 index 0000000..31ab266 --- /dev/null +++ b/control-pipeline/scripts/gpre1_subcluster.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +""" +G-pre1 Step 2: Sub-cluster large object groups (>50 members) into k=4 sub-groups. + +Reads existing object_groups, re-embeds members of large groups, +applies K-Means with k=4 per group, and writes sub-groups back. + +Usage (inside container or with PYTHONPATH): + python3 /app/scripts/gpre1_subcluster.py + python3 /app/scripts/gpre1_subcluster.py --min-size 100 # only groups >100 + python3 /app/scripts/gpre1_subcluster.py --sub-k 6 # 6 sub-clusters +""" + +import argparse +import json +import logging +import os + +import httpx +import numpy as np +from sklearn.cluster import MiniBatchKMeans +from sqlalchemy import create_engine, text + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("gpre1-sub") + +DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db") +EMBEDDING_URL = "http://embedding-service:8087" + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--min-size", type=int, default=50, help="Min group size to sub-cluster") + parser.add_argument("--sub-k", type=int, default=4, help="Sub-clusters per group") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"}) + + # Load large groups + with engine.connect() as c: + groups = c.execute(text( + "SELECT group_id, canonical_name, member_count, members " + "FROM object_groups WHERE member_count > :min ORDER BY member_count DESC" + ), {"min": args.min_size}).fetchall() + + logger.info("Found %d groups with >%d members to sub-cluster", len(groups), args.min_size) + + # Find next available group_id + with engine.connect() as c: + max_gid = c.execute(text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups")).scalar() + next_gid = max_gid + 1 + + total_sub_groups = 0 + all_new_rows = [] + groups_to_delete = [] + + for group_id, canonical_name, member_count, members_json in groups: + members = json.loads(members_json) if isinstance(members_json, str) else members_json + + if len(members) < args.sub_k * 2: + logger.info(" Skip group %d (%s, %d members) — too small for k=%d", + group_id, canonical_name, len(members), args.sub_k) + continue + + # Embed members + embeddings = _embed_batch(members) + if embeddings is None: + logger.error(" Failed to embed group %d (%s)", group_id, canonical_name) + continue + + # Normalize for cosine + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + norms[norms == 0] = 1 + normalized = embeddings / norms + + # Sub-cluster + k = min(args.sub_k, len(members) // 2) + kmeans = MiniBatchKMeans(n_clusters=k, batch_size=min(100, len(members)), + max_iter=50, random_state=42) + labels = kmeans.fit_predict(normalized) + + # Build sub-groups + sub_groups: dict[int, list[str]] = {} + for i, member in enumerate(members): + sub_groups.setdefault(int(labels[i]), []).append(member) + + # Create new rows + for sub_id, sub_members in sub_groups.items(): + sub_canonical = sub_members[0] # Most frequent would be better but we don't have freq here + all_new_rows.append({ + "group_id": next_gid, + "canonical_name": sub_canonical, + "member_count": len(sub_members), + "members": json.dumps(sub_members), + "top_controls_count": 0, + "parent_group_id": group_id, + }) + next_gid += 1 + + groups_to_delete.append(group_id) + total_sub_groups += len(sub_groups) + + if len(groups_to_delete) % 50 == 0: + logger.info(" Processed %d/%d groups, %d sub-groups created", + len(groups_to_delete), len(groups), total_sub_groups) + + logger.info("Sub-clustering complete: %d groups → %d sub-groups", + len(groups_to_delete), total_sub_groups) + + # Stats + sub_sizes = [r["member_count"] for r in all_new_rows] + if sub_sizes: + logger.info(" Sub-group sizes: avg=%.1f, max=%d, min=%d", + sum(sub_sizes) / len(sub_sizes), max(sub_sizes), min(sub_sizes)) + + if args.dry_run: + logger.info("DRY RUN — not writing to DB") + for r in all_new_rows[:10]: + logger.info(" [%d] %s (%d members)", r["group_id"], r["canonical_name"], r["member_count"]) + return + + # Write to DB: delete old large groups, insert sub-groups + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + # Delete old large groups + for gid in groups_to_delete: + c.execute(text("DELETE FROM object_groups WHERE group_id = :gid"), {"gid": gid}) + # Insert sub-groups + for r in all_new_rows: + c.execute(text(""" + INSERT INTO object_groups (group_id, canonical_name, member_count, members, top_controls_count) + VALUES (:group_id, :canonical_name, :member_count, CAST(:members AS jsonb), :top_controls_count) + """), r) + + logger.info("Wrote %d sub-groups to DB (replaced %d large groups)", len(all_new_rows), len(groups_to_delete)) + + # Final stats + with engine.connect() as c: + total = c.execute(text("SELECT count(*) FROM object_groups")).scalar() + logger.info("Total groups in DB: %d", total) + + +def _embed_batch(texts: list[str]) -> np.ndarray | None: + """Embed a list of texts, return numpy array.""" + try: + all_emb = np.zeros((len(texts), 1024), dtype=np.float32) + batch_size = 64 + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + with httpx.Client(timeout=httpx.Timeout(60.0, connect=10.0)) as client: + resp = client.post(f"{EMBEDDING_URL}/embed", json={"texts": batch}) + resp.raise_for_status() + embs = resp.json().get("embeddings", []) + end = min(i + len(embs), len(texts)) + all_emb[i:end] = np.array(embs, dtype=np.float32) + return all_emb + except Exception as e: + logger.error("Embedding failed: %s", e) + return None + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/gpre2_master_controls.py b/control-pipeline/scripts/gpre2_master_controls.py new file mode 100644 index 0000000..43fcfb7 --- /dev/null +++ b/control-pipeline/scripts/gpre2_master_controls.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +""" +G-pre2: Build Master Controls from Object Groups + Lifecycle Phases. + +Groups atomic controls by (object_group_id, phase) and creates +Master Controls for groups with >=2 distinct phases. + +Usage: + python3 /app/scripts/gpre2_master_controls.py + python3 /app/scripts/gpre2_master_controls.py --min-phases 3 + python3 /app/scripts/gpre2_master_controls.py --dry-run +""" + +import argparse +import json +import logging +import os +from collections import defaultdict + +from sqlalchemy import create_engine, text + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("gpre2") + +DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db") + +# Canonical phase ordering for lifecycle chains +PHASE_ORDER = { + "scope": 0, + "definition": 1, "governance": 1, + "design": 2, + "implementation": 3, "configuration": 3, + "operation": 4, "training": 4, + "monitoring": 5, + "testing": 6, + "review": 7, + "assessment": 8, "remediation": 8, + "validation": 9, + "reporting": 10, + "evidence": 11, +} + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--min-phases", type=int, default=2, help="Min distinct phases for Master Control") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"}) + + # Step 1: Build reverse index (object_token → group_id) + logger.info("Building object → group_id reverse index...") + object_to_group = {} + with engine.connect() as c: + groups = c.execute(text("SELECT group_id, canonical_name, members FROM object_groups")).fetchall() + + for gid, canonical, members_json in groups: + members = json.loads(members_json) if isinstance(members_json, str) else members_json + for member in members: + object_to_group[member] = (gid, canonical) + + logger.info("Reverse index: %d objects → %d groups", len(object_to_group), len(groups)) + + # Step 2: Load all controls with merge_group_hint + logger.info("Loading controls with merge_group_hint...") + with engine.connect() as c: + rows = c.execute(text(""" + SELECT id, control_id, + generation_metadata->>'merge_group_hint' AS hint, + title + FROM canonical_controls + WHERE generation_metadata->>'merge_group_hint' IS NOT NULL + AND generation_metadata->>'merge_group_hint' != '' + AND release_state NOT IN ('deprecated', 'rejected') + """)).fetchall() + + logger.info("Loaded %d controls with merge_group_hint", len(rows)) + + # Step 3: Parse and group by (group_id, phase) + # Structure: group_id → {phase → [(control_uuid, control_id, action, title)]} + group_phases: dict[int, dict[str, list]] = defaultdict(lambda: defaultdict(list)) + group_names: dict[int, str] = {} + unmatched = 0 + + for uuid, control_id, hint, title in rows: + parts = hint.split(":", 2) + if len(parts) < 2: + continue + action = parts[0] + obj = parts[1] + phase = parts[2] if len(parts) > 2 else "implementation" + + # Normalize object and find group + from services.control_dedup import normalize_object + normed = normalize_object(obj) + + if normed in object_to_group: + gid, canonical = object_to_group[normed] + elif obj in object_to_group: + gid, canonical = object_to_group[obj] + else: + unmatched += 1 + continue + + group_phases[gid][phase].append((str(uuid), control_id, action, title)) + group_names[gid] = canonical + + logger.info("Grouped into %d object groups (%d controls unmatched to any group)", + len(group_phases), unmatched) + + # Step 4: Create Master Controls (groups with >= min_phases distinct phases) + master_controls = [] + master_members = [] + mc_counter = 0 + + for gid, phases in group_phases.items(): + if len(phases) < args.min_phases: + continue + + mc_counter += 1 + mc_id = "MC-%d" % gid + canonical = group_names.get(gid, "unknown") + + # Sort phases by lifecycle order + sorted_phases = sorted(phases.keys(), key=lambda p: PHASE_ORDER.get(p, 99)) + phase_counts = {p: len(ctrls) for p, ctrls in phases.items()} + total = sum(phase_counts.values()) + + master_controls.append({ + "master_control_id": mc_id, + "object_group_id": gid, + "canonical_name": canonical, + "phases_covered": json.dumps(sorted_phases), + "phase_control_count": json.dumps(phase_counts), + "total_controls": total, + }) + + for phase, controls in phases.items(): + for ctrl_uuid, ctrl_id, action, title in controls: + master_members.append({ + "mc_id": mc_id, + "control_uuid": ctrl_uuid, + "phase": phase, + "action": action, + }) + + logger.info("Created %d Master Controls with %d members (min %d phases)", + len(master_controls), len(master_members), args.min_phases) + + # Stats + if master_controls: + phase_counts = [mc["total_controls"] for mc in master_controls] + phases_per_mc = [len(json.loads(mc["phases_covered"])) for mc in master_controls] + logger.info(" Avg controls per MC: %.1f", sum(phase_counts) / len(phase_counts)) + logger.info(" Avg phases per MC: %.1f", sum(phases_per_mc) / len(phases_per_mc)) + logger.info(" Max controls in MC: %d", max(phase_counts)) + logger.info(" Max phases in MC: %d", max(phases_per_mc)) + + # Top 10 + top10 = sorted(master_controls, key=lambda x: -x["total_controls"])[:10] + logger.info("\nTop 10 Master Controls:") + for mc in top10: + logger.info(" %s: %s (%d controls, phases: %s)", + mc["master_control_id"], mc["canonical_name"], + mc["total_controls"], mc["phases_covered"]) + + if args.dry_run: + logger.info("DRY RUN — not writing to DB") + return + + # Step 5: Write to DB + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + c.execute(text("DELETE FROM master_control_members")) + c.execute(text("DELETE FROM master_controls")) + + for mc in master_controls: + c.execute(text(""" + INSERT INTO master_controls + (master_control_id, object_group_id, canonical_name, + phases_covered, phase_control_count, total_controls) + VALUES (:master_control_id, :object_group_id, :canonical_name, + CAST(:phases_covered AS jsonb), CAST(:phase_control_count AS jsonb), + :total_controls) + """), mc) + + # Get MC UUIDs for member inserts + mc_uuids = {} + for row in c.execute(text("SELECT id, master_control_id FROM master_controls")).fetchall(): + mc_uuids[row[1]] = str(row[0]) + + for mem in master_members: + mc_uuid = mc_uuids.get(mem["mc_id"]) + if not mc_uuid: + continue + c.execute(text(""" + INSERT INTO master_control_members + (master_control_uuid, control_uuid, phase, action) + VALUES (CAST(:mc_uuid AS uuid), CAST(:control_uuid AS uuid), :phase, :action) + """), { + "mc_uuid": mc_uuid, + "control_uuid": mem["control_uuid"], + "phase": mem["phase"], + "action": mem["action"], + }) + + logger.info("Wrote %d Master Controls + %d members to DB", + len(master_controls), len(master_members)) + + +if __name__ == "__main__": + main()