Files
breakpilot-compliance/scripts/obligation_discovery/synthesize_obligations.py
T
Benjamin Admin e1b270c36e Add obligation discovery pipeline tooling
Sichert die validierte Obligation Discovery Pipeline aus /tmp als dauerhaftes,
committetes Tooling (scripts/obligation_discovery/) — der eigentliche Vermögenswert.

Stufen: precluster (Embedding-Cache + Mikro-Cluster) → meta_cluster (Review Units,
Skalierungs-Fix) → synthesize_obligations (Opus, Key aus ENV, Streaming, harte Tier-Regel,
Provenance) → validate_registry → merge_review_diff. Reine Helfer in _core.py, 16 Unit-Tests.

Doku docs-src/development/obligation_discovery_pipeline_v1.md mit Meilensteinen
(SBOM/Vuln reproduziert, Auth 4408→170 Review Units→54→kuriert 29) und der Architekturregel:
Runtime deterministisch, Discovery LLM-gestützt.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 07:41:45 +02:00

114 lines
6.2 KiB
Python

"""Stufe 3 — LLM-Synthese: REVIEW UNITS → Obligation Registry (Schema obligation_registry_v1).
Geschärfter Prompt: kleinste Menge regulatorisch UNTERSCHIEDLICHER Obligations. Harte Tier-
Regel in Code erzwungen. Provenance pro Obligation. ANTHROPIC_API_KEY aus ENV (nie hartcodiert).
Große Calls → STREAMING (SDK blockt non-streaming >10min).
ANTHROPIC_API_KEY=… python3 scripts/obligation_discovery/synthesize_obligations.py \
--units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json
"""
from __future__ import annotations
import argparse
import json
import os
import re
from collections import Counter
from _core import SEMANTIC_EDGE_TYPES
SYS = """Du bist Knowledge Engineer und baust eine LEGAL OBLIGATION REGISTRY fuer __REGULATION__
(Thema: __THEME__). Input: REVIEW UNITS (algorithmisch vor-gebuendelte Control-Gruppen), jede
kann MEHRERE unterschiedliche Pflichten enthalten.
AUFGABE: Zerlege die Review Units in die KLEINSTE MENGE regulatorisch UNTERSCHIEDLICHER Legal
Obligations. Regeln:
- Nichts zusammenfuehren nur wegen aehnlicher Woerter.
- Unterschiedliche Rechtsgrundlage => unterschiedliche Obligation.
- Unterschiedliche Applicability => unterschiedliche Obligation.
- Unterschiedliche Evidence-Facette (governance/capability/evidence) => GLEICHE Obligation, andere Facette.
- Unterschiedliche Umsetzung (NIST/OWASP/ISO/BSI) => guidance_basis, KEINE neue Obligation.
- Gleiche Pflicht ueber mehrere Review Units => EINE Obligation (mehrere member_review_units).
Gib AUSSCHLIESSLICH JSON aus:
{"obligations":[{"id":"snake_case","name":"","description":"","tier":"LEGAL_MINIMUM|BEST_PRACTICE|IMPLEMENTATION_GUIDANCE|EVIDENCE","applicability":"universal|conditional:<pred>|domain:<x>","evidence_facets":{"governance":true,"capability":true,"evidence":false},"source_role":"LEGAL_BASIS|GUIDANCE|EVIDENCE|IMPLEMENTATION","legal_basis":[{"source":"__REGULATION__","anchor":"","citation":""}],"guidance_basis":[{"source":"NIST|OWASP|ISO|BSI","anchor":"","role":"best_practice"}],"subdomain":"","member_review_units":["M0"],"source_meta_cluster":"M0","discovery_confidence":0.9}],
"relationships":[{"type":"depends_on|supports|produces_evidence_for|implements|derived_from","from":"id","to":"id","note":""},{"type":"out_of_scope","review_units":["M0"],"note":""}]}
HARTE REGELN:
- tier=LEGAL_MINIMUM NUR mit legal_basis (Primaerrecht). Sonst tier=BEST_PRACTICE, legal_basis=[].
- legal_basis NUR Primaerrecht der Regulierung; NIST/OWASP/ISO/BSI => guidance_basis.
- relationships SPARSAM, gerichtet, nur klar belegbar.
- out_of_scope: Review Units, die NICHT zum Thema gehoeren (andere Regulierung/Domaene)."""
def build_user(units: list[dict]) -> str:
lines = []
for u in units:
t = " | ".join(str(x)[:46] for x in u.get("sample_titles", [])[:6])
lines.append(f"{u['review_unit_id']} (controls={u['n_controls']}): {t}")
return "Review Units:\n" + "\n".join(lines)
def synthesize(units, regulation, theme, model):
import anthropic
key = os.environ["ANTHROPIC_API_KEY"]
sys = SYS.replace("__REGULATION__", regulation).replace("__THEME__", theme)
client = anthropic.Anthropic(api_key=key)
with client.messages.stream(model=model, max_tokens=24000, system=sys,
messages=[{"role": "user", "content": build_user(units)}]) as st:
msg = st.get_final_message()
txt = msg.content[0].text
m = re.search(r"\{.*\}", txt, re.DOTALL)
return json.loads(m.group(0) if m else txt)
def post_process(data, units, regulation, model):
cmap = {u["review_unit_id"]: u["control_ids"] for u in units}
size = {u["review_unit_id"]: u["n_controls"] for u in units}
obls = []
for o in data.get("obligations", []):
rus = [r for r in (o.get("member_review_units") or []) if r in cmap]
members = sorted({c for ru in rus for c in cmap[ru]})
lb = o.get("legal_basis") or []
tier, review = o.get("tier", "BEST_PRACTICE"), "draft"
if tier == "LEGAL_MINIMUM" and not lb:
tier, review = "BEST_PRACTICE", "needs_legal_basis"
smc = o.get("source_meta_cluster") or (rus[0] if rus else "")
obls.append({
"id": o["id"], "name": o.get("name", ""), "description": o.get("description", ""),
"tier": tier, "subdomain": o.get("subdomain", ""),
"applicability": o.get("applicability", "universal"),
"evidence_facets": o.get("evidence_facets", {}), "source_role": o.get("source_role", ""),
"legal_basis": lb, "guidance_basis": o.get("guidance_basis") or [],
"member_review_units": rus, "member_controls": members, "member_count": len(members),
"relationships": [], "citation_anchor_ids": [], "citation_status": "pending_span_anchor",
"review_status": review,
"provenance": {"discovery_confidence": o.get("discovery_confidence"),
"source_meta_cluster": smc, "cluster_size": size.get(smc),
"llm_model": model, "synthesis_version": "v1"}})
rels = [r for r in data.get("relationships", [])
if r.get("type") in SEMANTIC_EDGE_TYPES or r.get("type") == "out_of_scope"]
return {"schema_version": "obligation_registry_v1", "regulation": regulation,
"generated_by": f"obligation_discovery/{model}", "synthesis_version": "v1",
"citation_status": "pending_span_anchor", "obligations": obls, "relationships": rels}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--units", required=True)
ap.add_argument("--regulation", default="CRA")
ap.add_argument("--theme", default="")
ap.add_argument("--model", default="claude-opus-4-8")
ap.add_argument("--out", required=True)
a = ap.parse_args()
units = json.load(open(a.units, encoding="utf-8"))
data = synthesize(units, a.regulation, a.theme, a.model)
reg = post_process(data, units, a.regulation, a.model)
json.dump(reg, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
o = reg["obligations"]
print(f"obligations: {len(o)} | tier: {dict(Counter(x['tier'] for x in o))}")
print(f"written: {a.out}")
if __name__ == "__main__":
main()