Files
Benjamin Admin ad24835940 feat(pipeline): G-pre1/2/3 — Object Clustering + Master Controls + API
G-pre1: 144k objects clustered into 7,466 groups via Mini-Batch K-Means
  on bge-m3 embeddings. Two-stage: k=5000 base + sub-cluster groups >50.
G-pre2: 5,114 Master Controls from lifecycle phase chains
  (define→implement→test→monitor), linking 172,504 atomic controls.
G-pre3: REST API for Master Controls
  GET /v1/master-controls (list, search, filter)
  GET /v1/master-controls/stats
  GET /v1/master-controls/{mc_id} (detail with phase-controls)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 15:11:38 +02:00

165 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
G-pre1 Step 2: Sub-cluster large object groups (>50 members) into k=4 sub-groups.
Reads existing object_groups, re-embeds members of large groups,
applies K-Means with k=4 per group, and writes sub-groups back.
Usage (inside container or with PYTHONPATH):
python3 /app/scripts/gpre1_subcluster.py
python3 /app/scripts/gpre1_subcluster.py --min-size 100 # only groups >100
python3 /app/scripts/gpre1_subcluster.py --sub-k 6 # 6 sub-clusters
"""
import argparse
import json
import logging
import os
import httpx
import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("gpre1-sub")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
EMBEDDING_URL = "http://embedding-service:8087"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--min-size", type=int, default=50, help="Min group size to sub-cluster")
parser.add_argument("--sub-k", type=int, default=4, help="Sub-clusters per group")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
# Load large groups
with engine.connect() as c:
groups = c.execute(text(
"SELECT group_id, canonical_name, member_count, members "
"FROM object_groups WHERE member_count > :min ORDER BY member_count DESC"
), {"min": args.min_size}).fetchall()
logger.info("Found %d groups with >%d members to sub-cluster", len(groups), args.min_size)
# Find next available group_id
with engine.connect() as c:
max_gid = c.execute(text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups")).scalar()
next_gid = max_gid + 1
total_sub_groups = 0
all_new_rows = []
groups_to_delete = []
for group_id, canonical_name, member_count, members_json in groups:
members = json.loads(members_json) if isinstance(members_json, str) else members_json
if len(members) < args.sub_k * 2:
logger.info(" Skip group %d (%s, %d members) — too small for k=%d",
group_id, canonical_name, len(members), args.sub_k)
continue
# Embed members
embeddings = _embed_batch(members)
if embeddings is None:
logger.error(" Failed to embed group %d (%s)", group_id, canonical_name)
continue
# Normalize for cosine
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
normalized = embeddings / norms
# Sub-cluster
k = min(args.sub_k, len(members) // 2)
kmeans = MiniBatchKMeans(n_clusters=k, batch_size=min(100, len(members)),
max_iter=50, random_state=42)
labels = kmeans.fit_predict(normalized)
# Build sub-groups
sub_groups: dict[int, list[str]] = {}
for i, member in enumerate(members):
sub_groups.setdefault(int(labels[i]), []).append(member)
# Create new rows
for sub_id, sub_members in sub_groups.items():
sub_canonical = sub_members[0] # Most frequent would be better but we don't have freq here
all_new_rows.append({
"group_id": next_gid,
"canonical_name": sub_canonical,
"member_count": len(sub_members),
"members": json.dumps(sub_members),
"top_controls_count": 0,
"parent_group_id": group_id,
})
next_gid += 1
groups_to_delete.append(group_id)
total_sub_groups += len(sub_groups)
if len(groups_to_delete) % 50 == 0:
logger.info(" Processed %d/%d groups, %d sub-groups created",
len(groups_to_delete), len(groups), total_sub_groups)
logger.info("Sub-clustering complete: %d groups → %d sub-groups",
len(groups_to_delete), total_sub_groups)
# Stats
sub_sizes = [r["member_count"] for r in all_new_rows]
if sub_sizes:
logger.info(" Sub-group sizes: avg=%.1f, max=%d, min=%d",
sum(sub_sizes) / len(sub_sizes), max(sub_sizes), min(sub_sizes))
if args.dry_run:
logger.info("DRY RUN — not writing to DB")
for r in all_new_rows[:10]:
logger.info(" [%d] %s (%d members)", r["group_id"], r["canonical_name"], r["member_count"])
return
# Write to DB: delete old large groups, insert sub-groups
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
# Delete old large groups
for gid in groups_to_delete:
c.execute(text("DELETE FROM object_groups WHERE group_id = :gid"), {"gid": gid})
# Insert sub-groups
for r in all_new_rows:
c.execute(text("""
INSERT INTO object_groups (group_id, canonical_name, member_count, members, top_controls_count)
VALUES (:group_id, :canonical_name, :member_count, CAST(:members AS jsonb), :top_controls_count)
"""), r)
logger.info("Wrote %d sub-groups to DB (replaced %d large groups)", len(all_new_rows), len(groups_to_delete))
# Final stats
with engine.connect() as c:
total = c.execute(text("SELECT count(*) FROM object_groups")).scalar()
logger.info("Total groups in DB: %d", total)
def _embed_batch(texts: list[str]) -> np.ndarray | None:
"""Embed a list of texts, return numpy array."""
try:
all_emb = np.zeros((len(texts), 1024), dtype=np.float32)
batch_size = 64
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
with httpx.Client(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
resp = client.post(f"{EMBEDDING_URL}/embed", json={"texts": batch})
resp.raise_for_status()
embs = resp.json().get("embeddings", [])
end = min(i + len(embs), len(texts))
all_emb[i:end] = np.array(embs, dtype=np.float32)
return all_emb
except Exception as e:
logger.error("Embedding failed: %s", e)
return None
if __name__ == "__main__":
main()