"""Stufe 3 — LLM-Synthese: REVIEW UNITS → Obligation Registry (Schema obligation_registry_v1). Geschärfter Prompt: kleinste Menge regulatorisch UNTERSCHIEDLICHER Obligations. Harte Tier- Regel in Code erzwungen. Provenance pro Obligation. ANTHROPIC_API_KEY aus ENV (nie hartcodiert). Große Calls → STREAMING (SDK blockt non-streaming >10min). ANTHROPIC_API_KEY=… python3 scripts/obligation_discovery/synthesize_obligations.py \ --units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json """ from __future__ import annotations import argparse import json import os import re from collections import Counter from _core import SEMANTIC_EDGE_TYPES SYS = """Du bist Knowledge Engineer und baust eine LEGAL OBLIGATION REGISTRY fuer __REGULATION__ (Thema: __THEME__). Input: REVIEW UNITS (algorithmisch vor-gebuendelte Control-Gruppen), jede kann MEHRERE unterschiedliche Pflichten enthalten. AUFGABE: Zerlege die Review Units in die KLEINSTE MENGE regulatorisch UNTERSCHIEDLICHER Legal Obligations. Regeln: - Nichts zusammenfuehren nur wegen aehnlicher Woerter. - Unterschiedliche Rechtsgrundlage => unterschiedliche Obligation. - Unterschiedliche Applicability => unterschiedliche Obligation. - Unterschiedliche Evidence-Facette (governance/capability/evidence) => GLEICHE Obligation, andere Facette. - Unterschiedliche Umsetzung (NIST/OWASP/ISO/BSI) => guidance_basis, KEINE neue Obligation. - Gleiche Pflicht ueber mehrere Review Units => EINE Obligation (mehrere member_review_units). Gib AUSSCHLIESSLICH JSON aus: {"obligations":[{"id":"snake_case","name":"","description":"","tier":"LEGAL_MINIMUM|BEST_PRACTICE|IMPLEMENTATION_GUIDANCE|EVIDENCE","applicability":"universal|conditional:|domain:","evidence_facets":{"governance":true,"capability":true,"evidence":false},"source_role":"LEGAL_BASIS|GUIDANCE|EVIDENCE|IMPLEMENTATION","legal_basis":[{"source":"__REGULATION__","anchor":"","citation":""}],"guidance_basis":[{"source":"NIST|OWASP|ISO|BSI","anchor":"","role":"best_practice"}],"subdomain":"","member_review_units":["M0"],"source_meta_cluster":"M0","discovery_confidence":0.9}], "relationships":[{"type":"depends_on|supports|produces_evidence_for|implements|derived_from","from":"id","to":"id","note":""},{"type":"out_of_scope","review_units":["M0"],"note":""}]} HARTE REGELN: - tier=LEGAL_MINIMUM NUR mit legal_basis (Primaerrecht). Sonst tier=BEST_PRACTICE, legal_basis=[]. - legal_basis NUR Primaerrecht der Regulierung; NIST/OWASP/ISO/BSI => guidance_basis. - relationships SPARSAM, gerichtet, nur klar belegbar. - out_of_scope: Review Units, die NICHT zum Thema gehoeren (andere Regulierung/Domaene).""" def build_user(units: list[dict]) -> str: lines = [] for u in units: t = " | ".join(str(x)[:46] for x in u.get("sample_titles", [])[:6]) lines.append(f"{u['review_unit_id']} (controls={u['n_controls']}): {t}") return "Review Units:\n" + "\n".join(lines) def synthesize(units, regulation, theme, model): import anthropic key = os.environ["ANTHROPIC_API_KEY"] sys = SYS.replace("__REGULATION__", regulation).replace("__THEME__", theme) client = anthropic.Anthropic(api_key=key) with client.messages.stream(model=model, max_tokens=24000, system=sys, messages=[{"role": "user", "content": build_user(units)}]) as st: msg = st.get_final_message() txt = msg.content[0].text m = re.search(r"\{.*\}", txt, re.DOTALL) return json.loads(m.group(0) if m else txt) def post_process(data, units, regulation, model): cmap = {u["review_unit_id"]: u["control_ids"] for u in units} size = {u["review_unit_id"]: u["n_controls"] for u in units} obls = [] for o in data.get("obligations", []): rus = [r for r in (o.get("member_review_units") or []) if r in cmap] members = sorted({c for ru in rus for c in cmap[ru]}) lb = o.get("legal_basis") or [] tier, review = o.get("tier", "BEST_PRACTICE"), "draft" if tier == "LEGAL_MINIMUM" and not lb: tier, review = "BEST_PRACTICE", "needs_legal_basis" smc = o.get("source_meta_cluster") or (rus[0] if rus else "") obls.append({ "id": o["id"], "name": o.get("name", ""), "description": o.get("description", ""), "tier": tier, "subdomain": o.get("subdomain", ""), "applicability": o.get("applicability", "universal"), "evidence_facets": o.get("evidence_facets", {}), "source_role": o.get("source_role", ""), "legal_basis": lb, "guidance_basis": o.get("guidance_basis") or [], "member_review_units": rus, "member_controls": members, "member_count": len(members), "relationships": [], "citation_anchor_ids": [], "citation_status": "pending_span_anchor", "review_status": review, "provenance": {"discovery_confidence": o.get("discovery_confidence"), "source_meta_cluster": smc, "cluster_size": size.get(smc), "llm_model": model, "synthesis_version": "v1"}}) rels = [r for r in data.get("relationships", []) if r.get("type") in SEMANTIC_EDGE_TYPES or r.get("type") == "out_of_scope"] return {"schema_version": "obligation_registry_v1", "regulation": regulation, "generated_by": f"obligation_discovery/{model}", "synthesis_version": "v1", "citation_status": "pending_span_anchor", "obligations": obls, "relationships": rels} def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--units", required=True) ap.add_argument("--regulation", default="CRA") ap.add_argument("--theme", default="") ap.add_argument("--model", default="claude-opus-4-8") ap.add_argument("--out", required=True) a = ap.parse_args() units = json.load(open(a.units, encoding="utf-8")) data = synthesize(units, a.regulation, a.theme, a.model) reg = post_process(data, units, a.regulation, a.model) json.dump(reg, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1) o = reg["obligations"] print(f"obligations: {len(o)} | tier: {dict(Counter(x['tier'] for x in o))}") print(f"written: {a.out}") if __name__ == "__main__": main()