Files
breakpilot-compliance/scripts/obligation_discovery/meta_cluster.py
T
Benjamin Admin e1b270c36e Add obligation discovery pipeline tooling
Sichert die validierte Obligation Discovery Pipeline aus /tmp als dauerhaftes,
committetes Tooling (scripts/obligation_discovery/) — der eigentliche Vermögenswert.

Stufen: precluster (Embedding-Cache + Mikro-Cluster) → meta_cluster (Review Units,
Skalierungs-Fix) → synthesize_obligations (Opus, Key aus ENV, Streaming, harte Tier-Regel,
Provenance) → validate_registry → merge_review_diff. Reine Helfer in _core.py, 16 Unit-Tests.

Doku docs-src/development/obligation_discovery_pipeline_v1.md mit Meilensteinen
(SBOM/Vuln reproduziert, Auth 4408→170 Review Units→54→kuriert 29) und der Architekturregel:
Runtime deterministisch, Discovery LLM-gestützt.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 07:41:45 +02:00

56 lines
2.2 KiB
Python

"""Stufe 2 — Meta-Cluster (der Skalierungs-Fix für große Domänen): Mikro-Cluster →
REVIEW UNITS. Review Unit = das, was der LLM-Synthese-Pass sieht (entkoppelt vom Clustering,
später merge/split-bar). Nutzt den Embedding-Cache aus precluster (kein Re-Embed).
python3 scripts/obligation_discovery/meta_cluster.py --scope auth --meta-thr 0.62
"""
from __future__ import annotations
import argparse
import json
import os
import pickle
from _core import centroid, greedy_cluster
def run(scope: str, meta_thr: float, outdir: str) -> None:
micro = json.load(open(os.path.join(outdir, f"{scope}_micro_clusters.json"), encoding="utf-8"))
vecs = pickle.load(open(os.path.join(outdir, f"{scope}_vecs.pkl"), "rb"))
centroids = [centroid(m["member_indices"], vecs) for m in micro]
meta = greedy_cluster(centroids, meta_thr)
print(f"scope={scope} pass-2 (meta-thr={meta_thr}): {len(micro)} micro → {len(meta)} review-units")
out = []
for mi, m in enumerate(meta):
ctrl_ids, titles = [], []
for micro_idx in m["members"]:
mc = micro[micro_idx]
ctrl_ids += mc["control_ids"]
titles.append(mc["titles"][0] if mc["titles"] else "")
out.append({"review_unit_id": f"M{mi}", "n_micro": len(m["members"]),
"n_controls": len(ctrl_ids), "control_ids": ctrl_ids,
"sample_titles": titles[:8]})
out.sort(key=lambda x: -x["n_controls"])
path = os.path.join(outdir, f"{scope}_review_units.json")
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
print("=== top review units (inspect for cross-domain mixing BEFORE synthesis) ===")
for m in out[:12]:
print(f" {m['review_unit_id']:5} ctrl={m['n_controls']:4} micro={m['n_micro']:3} "
f"| {' || '.join(t[:30] for t in m['sample_titles'][:3])}")
print(f"written: {path} ({len(out)} review units)")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--scope", default="auth")
ap.add_argument("--meta-thr", type=float, default=0.62)
ap.add_argument("--outdir", default="/tmp")
a = ap.parse_args()
run(a.scope, a.meta_thr, a.outdir)
if __name__ == "__main__":
main()