From e1b270c36ef2a2cba8cc1eb42541624b9ed91b00 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 25 Jun 2026 07:41:45 +0200 Subject: [PATCH] Add obligation discovery pipeline tooling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sichert die validierte Obligation Discovery Pipeline aus /tmp als dauerhaftes, committetes Tooling (scripts/obligation_discovery/) — der eigentliche Vermögenswert. Stufen: precluster (Embedding-Cache + Mikro-Cluster) → meta_cluster (Review Units, Skalierungs-Fix) → synthesize_obligations (Opus, Key aus ENV, Streaming, harte Tier-Regel, Provenance) → validate_registry → merge_review_diff. Reine Helfer in _core.py, 16 Unit-Tests. Doku docs-src/development/obligation_discovery_pipeline_v1.md mit Meilensteinen (SBOM/Vuln reproduziert, Auth 4408→170 Review Units→54→kuriert 29) und der Architekturregel: Runtime deterministisch, Discovery LLM-gestützt. Co-Authored-By: Claude Opus 4.7 --- .../tests/test_obligation_discovery.py | 92 ++++++++++++++ .../obligation_discovery_pipeline_v1.md | 77 ++++++++++++ scripts/obligation_discovery/_core.py | 114 ++++++++++++++++++ .../obligation_discovery/merge_review_diff.py | 36 ++++++ scripts/obligation_discovery/meta_cluster.py | 55 +++++++++ scripts/obligation_discovery/precluster.py | 73 +++++++++++ .../synthesize_obligations.py | 113 +++++++++++++++++ .../obligation_discovery/validate_registry.py | 35 ++++++ 8 files changed, 595 insertions(+) create mode 100644 backend-compliance/tests/test_obligation_discovery.py create mode 100644 docs-src/development/obligation_discovery_pipeline_v1.md create mode 100644 scripts/obligation_discovery/_core.py create mode 100644 scripts/obligation_discovery/merge_review_diff.py create mode 100644 scripts/obligation_discovery/meta_cluster.py create mode 100644 scripts/obligation_discovery/precluster.py create mode 100644 scripts/obligation_discovery/synthesize_obligations.py create mode 100644 scripts/obligation_discovery/validate_registry.py diff --git a/backend-compliance/tests/test_obligation_discovery.py b/backend-compliance/tests/test_obligation_discovery.py new file mode 100644 index 00000000..fc713f13 --- /dev/null +++ b/backend-compliance/tests/test_obligation_discovery.py @@ -0,0 +1,92 @@ +"""Unit-Tests für die reinen Helfer der Obligation Discovery Pipeline (scripts/obligation_discovery/_core.py).""" +import pathlib +import sys + +sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "scripts" / "obligation_discovery")) + +from _core import ( # noqa: E402 + centroid, cosine, greedy_cluster, merge_edges, parse_req, validate_registry, +) + + +class TestParseReq: + def test_list_passthrough(self): + assert parse_req(["a", "b"]) == ["a", "b"] + + def test_python_repr_string(self): + assert parse_req("['x', 'y']") == ["x", "y"] + + def test_json_string(self): + assert parse_req('["x", "y"]') == ["x", "y"] + + def test_plain_string(self): + assert parse_req("just text") == ["just text"] + + +class TestCosine: + def test_identical(self): + assert cosine([1.0, 2.0, 3.0], [1.0, 2.0, 3.0]) > 0.999 + + def test_orthogonal(self): + assert abs(cosine([1.0, 0.0], [0.0, 1.0])) < 1e-9 + + def test_empty(self): + assert cosine([], [1.0]) == 0.0 + + +class TestGreedyCluster: + def test_near_vectors_cluster_far_separate(self): + vecs = [[1.0, 0.0], [0.99, 0.01], [0.0, 1.0]] + clusters = greedy_cluster(vecs, 0.9) + assert len(clusters) == 2 + assert clusters[0]["members"] == [0, 1] + assert clusters[1]["members"] == [2] + + def test_deterministic(self): + vecs = [[1.0, 0.0], [0.5, 0.5], [0.99, 0.0]] + assert greedy_cluster(vecs, 0.8) == greedy_cluster(vecs, 0.8) + + def test_none_vector_isolated(self): + clusters = greedy_cluster([[1.0, 0.0], None], 0.5) + assert clusters[1]["members"] == [1] and clusters[1]["seed"] is None + + +class TestCentroid: + def test_mean(self): + assert centroid([0, 1], [[0.0, 2.0], [2.0, 4.0]]) == [1.0, 3.0] + + +class TestValidateRegistry: + def _reg(self, obls, rels=None): + return {"obligations": obls, "relationships": rels or []} + + def test_lm_without_legal_basis_fails(self): + r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [], "member_controls": ["C1"]}]) + v = validate_registry(r) + assert v["lm_without_legal_basis"] == ["x"] and v["passed"] is False + + def test_clean_passes(self): + r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [{"source": "CRA"}], + "member_controls": ["C1"], "provenance": {"source_meta_cluster": "M0"}}]) + assert validate_registry(r)["passed"] is True + + def test_over8_per_review_unit_flagged(self): + obls = [{"id": f"o{i}", "tier": "BEST_PRACTICE", "member_controls": ["C"], + "provenance": {"source_meta_cluster": "M0"}} for i in range(9)] + v = validate_registry(self._reg(obls)) + assert v["over8_per_review_unit"] == {"M0": 9} and v["passed"] is False + + def test_empty_member_controls_flagged(self): + v = validate_registry(self._reg([{"id": "x", "tier": "BEST_PRACTICE", "member_controls": []}])) + assert v["empty_member_controls"] == ["x"] and v["passed"] is False + + +class TestMergeEdges: + def test_dedup_and_semantic_only(self): + existing = [{"type": "supports", "from": "a", "to": "b"}] + proposed = [{"type": "supports", "from": "a", "to": "b"}, # dup + {"type": "depends_on", "from": "c", "to": "d"}, # new + {"type": "out_of_scope", "clusters": [1]}] # not semantic + merged, added = merge_edges(existing, proposed) + assert added == 1 + assert {"type": "depends_on", "from": "c", "to": "d"} in merged diff --git a/docs-src/development/obligation_discovery_pipeline_v1.md b/docs-src/development/obligation_discovery_pipeline_v1.md new file mode 100644 index 00000000..0961ef26 --- /dev/null +++ b/docs-src/development/obligation_discovery_pipeline_v1.md @@ -0,0 +1,77 @@ +# Obligation Discovery Pipeline v1 + +Ein **generisches Verfahren zur Ableitung einer regulatorischen Ontologie** (Legal Obligation +Registry) aus großen Compliance-Korpora. Validiert über drei Domänen (SBOM, Vulnerability +Handling, Authentication). Erzeugt die zitierfähige Mitte aus +[obligation_registry_v1.md](obligation_registry_v1.md). + +## Architekturregel (nicht verhandelbar) + +``` +RUNTIME bleibt deterministisch (Document → Embedding → LLM-Judge → Finding) +DISCOVERY darf LLM-gestützt sein (Controls → … → LLM-Synthese → Obligation Registry) +``` +Discovery läuft **einmalig/offline** mit dem stärksten Modell; die Runtime-Prüf-Engine wird +davon nicht berührt. Zwei getrennte Probleme, eine gemeinsame Sprache (die Obligation). + +## Stufen (`scripts/obligation_discovery/`) + +| Stufe | Skript | Aufgabe | Key | +|---|---|---|---| +| 1 | `precluster.py` | Controls (scope) → Embedding (gecacht) → **Mikro-Cluster** | – | +| 2 | `meta_cluster.py` | Mikro → **Review Units** (Skalierungs-Fix für große Domänen) | – | +| 3 | `synthesize_obligations.py` | Review Units → Opus → **Obligation Candidates** | ENV | +| 4 | `validate_registry.py` | Belastbarkeits-Checks | – | +| 5 | `merge_review_diff.py` | vorgeschlagene Beziehungskanten dedupliziert mergen | – | + +Reine, unit-getestete Helfer in `_core.py`. Ausführung im `bp-compliance-backend`-Container +(`PYTHONPATH=/app`); der Key kommt aus `ANTHROPIC_API_KEY` (nie hartcodiert). + +## Zwei-Stufen-Clustering = der Skalierungs-Fix + +Ein flacher Single-Threshold-Pre-Cluster + EIN LLM-Synthese-Call skaliert NICHT auf große +Domänen. Lösung: eine Hierarchiestufe. **Review Unit ≠ Meta-Cluster** — die Review Unit ist +das, was der LLM sieht (entkoppelt vom Clustering, später merge/split-bar). + +## Belegte Meilensteine + +| Domäne | Controls | → Cluster/Review Units | → Obligations | vs Ground Truth | +|---|---|---|---|---| +| **SBOM** | 258 | 86 Mikro | 12 (→ 11 final) | manuell ~10 — **reproduziert + verfeinert** | +| **Vulnerability** | 531 | 200 Mikro | 8 | manuell ~7 — **reproduziert** | +| **Authentication** | 4408 | 2134 Mikro → **170 Review Units** | 54 → Kuration **29** | Skalierung — **generalisiert** | + +## Harte Tier-Regel generalisiert + +`LEGAL_MINIMUM` nur mit Primärrechts-Anker (`legal_basis`), sonst `BEST_PRACTICE` / +`IMPLEMENTATION_GUIDANCE` / `EVIDENCE`. Authentication zeigt den Wert: nur **6** harte +Pflichten (CRA fordert „angemessene Authentisierung"), MFA/Passwort/Session/Krypto sind +`guidance_basis`. So kann der Advisor sagen: *„Gesetzlich gefordert ist Schutz vor unbefugtem +Zugriff; MFA ist anerkannte Umsetzung, aber keine CRA-Wortlautpflicht."* + +## Kuration (große Domänen) + +Die Synthese darf über-splitten; ein **key-freier, regelbasierter Kurations-Pass** verdichtet: +Krypto-Mikro-Mechanismen → `guidance_basis`; Prüf-/Nachweis-Themen → `evidence`-Facette; +Mechanismus-Familien bleiben; domänenfremdes (eID/PSD2) → `out_of_scope`; LEGAL_MINIMUM +unangetastet. + +## Lessons + +- Große Opus-Calls brauchen **Streaming** (`messages.stream`); der SDK blockt non-streaming + bei `max_tokens` > ~8k mit „Streaming is required for operations that may take longer than 10 minutes". +- Provenance pro Obligation (`source_meta_cluster`, `discovery_confidence`, `llm_model`, + `synthesis_version`) — für spätere Evolution (CRA-Update, Modellwechsel). +- `>8 Obligations / Review Unit` → automatische Review-Warnung (Over-Split-Indikator). +- Embedding-Cache (pickle) → THR2-Sweeps ohne Re-Embed. + +## End-to-End-Beispiel + +```bash +# im bp-compliance-backend-Container, PYTHONPATH=/app, cwd = scripts/obligation_discovery +python3 precluster.py --scope auth +python3 meta_cluster.py --scope auth --meta-thr 0.62 # → /tmp/auth_review_units.json (inspizieren!) +ANTHROPIC_API_KEY=… python3 synthesize_obligations.py \ + --units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json +python3 validate_registry.py /tmp/auth_registry.json +``` diff --git a/scripts/obligation_discovery/_core.py b/scripts/obligation_discovery/_core.py new file mode 100644 index 00000000..1d694246 --- /dev/null +++ b/scripts/obligation_discovery/_core.py @@ -0,0 +1,114 @@ +"""Reine Helfer der Obligation Discovery Pipeline (keine schweren Imports → unit-testbar). + +Die Pipeline leitet aus großen Compliance-Korpora eine regulatorische Ontologie ab: + Controls → Mikro-Cluster → Meta-Cluster/Review-Units → LLM-Synthese → Obligation Registry. +Architekturregel: RUNTIME bleibt deterministisch; DISCOVERY (dieses Tooling) darf LLM-gestützt +sein und läuft EINMALIG/offline. Siehe docs-src/development/obligation_discovery_pipeline_v1.md. +""" +from __future__ import annotations + +import ast +import json +import math +from typing import Optional + +SEMANTIC_EDGE_TYPES = ("depends_on", "supports", "produces_evidence_for", + "implements", "derived_from") + + +def parse_req(req) -> list: + """requirements-Spalte (JSON ODER Python-Repr ODER String) robust zu Liste.""" + if isinstance(req, list): + return req + if isinstance(req, str): + for fn in (json.loads, ast.literal_eval): + try: + v = fn(req) + return v if isinstance(v, list) else [str(v)] + except Exception: + pass + return [req] + return [] + + +def cosine(a, b) -> float: + if not a or not b: + return 0.0 + dot = sum(x * y for x, y in zip(a, b)) + na = math.sqrt(sum(x * x for x in a)) + nb = math.sqrt(sum(y * y for y in b)) + return dot / (na * nb) if na and nb else 0.0 + + +def greedy_cluster(vecs: list, thr: float) -> list[dict]: + """Single-Pass-Greedy-Clustering: jeder Vektor joint den ersten Cluster, dessen Seed + cosine ≥ thr ist, sonst neuer Cluster. Deterministisch (stabile Reihenfolge).""" + clusters: list[dict] = [] + for i, v in enumerate(vecs): + if not v: + clusters.append({"seed": None, "members": [i]}) + continue + best, best_sim = None, thr + for c in clusters: + if c["seed"] is None: + continue + s = cosine(v, c["seed"]) + if s >= best_sim: + best_sim, best = s, c + if best: + best["members"].append(i) + else: + clusters.append({"seed": v, "members": [i]}) + return clusters + + +def centroid(idxs: list[int], vecs: list) -> Optional[list]: + vs = [vecs[i] for i in idxs if vecs[i]] + if not vs: + return None + n = len(vs) + return [sum(col) / n for col in zip(*vs)] + + +def validate_registry(reg: dict) -> dict: + """Belastbarkeits-Checks (User-Regeln): LEGAL_MINIMUM braucht legal_basis, + member_controls vollständig, out_of_scope separat, >8-Obligations/Review-Unit-Warnung.""" + obls = reg.get("obligations", []) + lm = [o for o in obls if o.get("tier") == "LEGAL_MINIMUM"] + lm_without_basis = [o["id"] for o in lm if not o.get("legal_basis")] + empty_members = [o["id"] for o in obls if not o.get("member_controls")] + per_unit: dict[str, int] = {} + for o in obls: + ru = (o.get("provenance") or {}).get("source_meta_cluster") + if ru: + per_unit[ru] = per_unit.get(ru, 0) + 1 + over8 = {ru: n for ru, n in per_unit.items() if n > 8} + rels = reg.get("relationships", []) + return { + "obligations": len(obls), + "legal_minimum": len(lm), + "lm_without_legal_basis": lm_without_basis, + "empty_member_controls": empty_members, + "over8_per_review_unit": over8, + "out_of_scope": sum(1 for r in rels if r.get("type") == "out_of_scope"), + "semantic_edges": sum(1 for r in rels if r.get("type") in SEMANTIC_EDGE_TYPES), + "passed": not lm_without_basis and not empty_members and not over8, + } + + +def merge_edges(relationships: list[dict], proposed: list[dict]) -> tuple[list[dict], int]: + """Proposed semantische Kanten dedupliziert in relationships mergen. Gibt (merged, added).""" + existing = {(r.get("type"), r.get("from"), r.get("to")) + for r in relationships if r.get("from")} + added = 0 + out = list(relationships) + for e in proposed: + if e.get("type") not in SEMANTIC_EDGE_TYPES: + continue + key = (e["type"], e.get("from"), e.get("to")) + if key in existing or not e.get("from") or not e.get("to"): + continue + out.append(e) + existing.add(key) + added += 1 + return out, added diff --git a/scripts/obligation_discovery/merge_review_diff.py b/scripts/obligation_discovery/merge_review_diff.py new file mode 100644 index 00000000..bc770a5e --- /dev/null +++ b/scripts/obligation_discovery/merge_review_diff.py @@ -0,0 +1,36 @@ +"""Stufe 5 — Review-Diff mergen: vorgeschlagene Beziehungskanten (review_status=proposed) +dedupliziert in die Registry mergen (kein LLM/Key). Kleine Beziehungs-Sprache: +depends_on/supports/produces_evidence_for/implements/derived_from. + + python3 scripts/obligation_discovery/merge_review_diff.py obligations/cra.json /tmp/cra_edges_review.json +""" +from __future__ import annotations + +import argparse +import json + +from _core import SEMANTIC_EDGE_TYPES, merge_edges + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("registry") + ap.add_argument("review_diff") + ap.add_argument("--write", action="store_true", help="in die Registry schreiben (sonst dry-run)") + a = ap.parse_args() + reg = json.load(open(a.registry, encoding="utf-8")) + diff = json.load(open(a.review_diff, encoding="utf-8")) + proposed = diff.get("proposed_edges", diff if isinstance(diff, list) else []) + merged, added = merge_edges(reg.get("relationships", []), proposed) + print(f"proposed: {len(proposed)} | added (dedupliziert): {added}") + if a.write: + reg["relationships"] = merged + reg["relationship_types"] = list(SEMANTIC_EDGE_TYPES) + json.dump(reg, open(a.registry, "w", encoding="utf-8"), ensure_ascii=False, indent=1) + print(f"written: {a.registry}") + else: + print("dry-run (use --write to apply)") + + +if __name__ == "__main__": + main() diff --git a/scripts/obligation_discovery/meta_cluster.py b/scripts/obligation_discovery/meta_cluster.py new file mode 100644 index 00000000..92900e58 --- /dev/null +++ b/scripts/obligation_discovery/meta_cluster.py @@ -0,0 +1,55 @@ +"""Stufe 2 — Meta-Cluster (der Skalierungs-Fix für große Domänen): Mikro-Cluster → +REVIEW UNITS. Review Unit = das, was der LLM-Synthese-Pass sieht (entkoppelt vom Clustering, +später merge/split-bar). Nutzt den Embedding-Cache aus precluster (kein Re-Embed). + + python3 scripts/obligation_discovery/meta_cluster.py --scope auth --meta-thr 0.62 +""" +from __future__ import annotations + +import argparse +import json +import os +import pickle + +from _core import centroid, greedy_cluster + + +def run(scope: str, meta_thr: float, outdir: str) -> None: + micro = json.load(open(os.path.join(outdir, f"{scope}_micro_clusters.json"), encoding="utf-8")) + vecs = pickle.load(open(os.path.join(outdir, f"{scope}_vecs.pkl"), "rb")) + centroids = [centroid(m["member_indices"], vecs) for m in micro] + meta = greedy_cluster(centroids, meta_thr) + print(f"scope={scope} pass-2 (meta-thr={meta_thr}): {len(micro)} micro → {len(meta)} review-units") + + out = [] + for mi, m in enumerate(meta): + ctrl_ids, titles = [], [] + for micro_idx in m["members"]: + mc = micro[micro_idx] + ctrl_ids += mc["control_ids"] + titles.append(mc["titles"][0] if mc["titles"] else "") + out.append({"review_unit_id": f"M{mi}", "n_micro": len(m["members"]), + "n_controls": len(ctrl_ids), "control_ids": ctrl_ids, + "sample_titles": titles[:8]}) + out.sort(key=lambda x: -x["n_controls"]) + path = os.path.join(outdir, f"{scope}_review_units.json") + json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1) + + print("=== top review units (inspect for cross-domain mixing BEFORE synthesis) ===") + for m in out[:12]: + print(f" {m['review_unit_id']:5} ctrl={m['n_controls']:4} micro={m['n_micro']:3} " + f"| {' || '.join(t[:30] for t in m['sample_titles'][:3])}") + print(f"written: {path} ({len(out)} review units)") + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--scope", default="auth") + ap.add_argument("--meta-thr", type=float, default=0.62) + ap.add_argument("--outdir", default="/tmp") + a = ap.parse_args() + run(a.scope, a.meta_thr, a.outdir) + + +if __name__ == "__main__": + main() diff --git a/scripts/obligation_discovery/precluster.py b/scripts/obligation_discovery/precluster.py new file mode 100644 index 00000000..254ea5cb --- /dev/null +++ b/scripts/obligation_discovery/precluster.py @@ -0,0 +1,73 @@ +"""Stufe 1 — Pre-Cluster: Controls (scope) → BGE-M3-Embedding (gecacht) → Mikro-Cluster. +Deterministisch. Im bp-compliance-backend-Container ausführen (PYTHONPATH=/app). + + python3 scripts/obligation_discovery/precluster.py --scope sbom + python3 scripts/obligation_discovery/precluster.py --patterns '%sbom%,%software bill%' --micro-thr 0.78 +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import pickle + +from _core import greedy_cluster, parse_req + +SCOPES = { + "sbom": ["%SBOM%", "%software bill%", "%stückliste%", "%komponentenliste%"], + "vuln": ["%schwachstellenbehandl%", "%schwachstellenmanagement%", "%vulnerability handling%", + "%coordinated vulnerab%", "%vulnerability disclosure%", "%cvd-konzept%"], + "auth": ["%authentisierung%", "%authentifizierung%", "%authentication%"], +} + + +async def run(scope: str, patterns: list[str], micro_thr: float, outdir: str) -> None: + import asyncpg + from compliance.services.mc_embedding_matcher import _embed_texts + + dsn = os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL") + conn = await asyncpg.connect(dsn) + where = " or ".join(f"title ilike ${i+1}" for i in range(len(patterns))) + rows = await conn.fetch( + f"select control_id, title, requirements from compliance.canonical_controls " + f"where {where} order by control_id", *patterns) + await conn.close() + items = [{"control_id": r["control_id"], "title": r["title"] or "", + "embed_text": (r["title"] or "") + ". " + " ".join(parse_req(r["requirements"])[:2])} + for r in rows] + print(f"scope={scope}: {len(items)} controls") + + cache = os.path.join(outdir, f"{scope}_vecs.pkl") + if os.path.exists(cache): + vecs = pickle.load(open(cache, "rb")) + print(f"embeddings from cache ({len(vecs)})") + else: + vecs = await _embed_texts([it["embed_text"] for it in items]) + pickle.dump(vecs, open(cache, "wb")) + print(f"embeddings fresh+cached ({len(vecs)})") + + micro = greedy_cluster(vecs, micro_thr) + print(f"pass-1 (micro-thr={micro_thr}): {len(items)} → {len(micro)} micro-clusters") + out = [{"micro_id": i, "size": len(c["members"]), "member_indices": c["members"], + "control_ids": [items[j]["control_id"] for j in c["members"]], + "titles": [items[j]["title"] for j in c["members"][:6]]} + for i, c in enumerate(micro)] + path = os.path.join(outdir, f"{scope}_micro_clusters.json") + json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1) + print(f"written: {path}") + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--scope", default="sbom") + ap.add_argument("--patterns", default="", help="comma-separated SQL ILIKE patterns (overrides --scope)") + ap.add_argument("--micro-thr", type=float, default=0.78) + ap.add_argument("--outdir", default="/tmp") + a = ap.parse_args() + patterns = [p for p in a.patterns.split(",") if p] or SCOPES[a.scope] + asyncio.run(run(a.scope, patterns, a.micro_thr, a.outdir)) + + +if __name__ == "__main__": + main() diff --git a/scripts/obligation_discovery/synthesize_obligations.py b/scripts/obligation_discovery/synthesize_obligations.py new file mode 100644 index 00000000..027b63d8 --- /dev/null +++ b/scripts/obligation_discovery/synthesize_obligations.py @@ -0,0 +1,113 @@ +"""Stufe 3 — LLM-Synthese: REVIEW UNITS → Obligation Registry (Schema obligation_registry_v1). +Geschärfter Prompt: kleinste Menge regulatorisch UNTERSCHIEDLICHER Obligations. Harte Tier- +Regel in Code erzwungen. Provenance pro Obligation. ANTHROPIC_API_KEY aus ENV (nie hartcodiert). +Große Calls → STREAMING (SDK blockt non-streaming >10min). + + ANTHROPIC_API_KEY=… python3 scripts/obligation_discovery/synthesize_obligations.py \ + --units /tmp/auth_review_units.json --regulation CRA --theme "Authentisierung" --out /tmp/auth_registry.json +""" +from __future__ import annotations + +import argparse +import json +import os +import re +from collections import Counter + +from _core import SEMANTIC_EDGE_TYPES + +SYS = """Du bist Knowledge Engineer und baust eine LEGAL OBLIGATION REGISTRY fuer __REGULATION__ +(Thema: __THEME__). Input: REVIEW UNITS (algorithmisch vor-gebuendelte Control-Gruppen), jede +kann MEHRERE unterschiedliche Pflichten enthalten. + +AUFGABE: Zerlege die Review Units in die KLEINSTE MENGE regulatorisch UNTERSCHIEDLICHER Legal +Obligations. Regeln: +- Nichts zusammenfuehren nur wegen aehnlicher Woerter. +- Unterschiedliche Rechtsgrundlage => unterschiedliche Obligation. +- Unterschiedliche Applicability => unterschiedliche Obligation. +- Unterschiedliche Evidence-Facette (governance/capability/evidence) => GLEICHE Obligation, andere Facette. +- Unterschiedliche Umsetzung (NIST/OWASP/ISO/BSI) => guidance_basis, KEINE neue Obligation. +- Gleiche Pflicht ueber mehrere Review Units => EINE Obligation (mehrere member_review_units). + +Gib AUSSCHLIESSLICH JSON aus: +{"obligations":[{"id":"snake_case","name":"","description":"","tier":"LEGAL_MINIMUM|BEST_PRACTICE|IMPLEMENTATION_GUIDANCE|EVIDENCE","applicability":"universal|conditional:|domain:","evidence_facets":{"governance":true,"capability":true,"evidence":false},"source_role":"LEGAL_BASIS|GUIDANCE|EVIDENCE|IMPLEMENTATION","legal_basis":[{"source":"__REGULATION__","anchor":"","citation":""}],"guidance_basis":[{"source":"NIST|OWASP|ISO|BSI","anchor":"","role":"best_practice"}],"subdomain":"","member_review_units":["M0"],"source_meta_cluster":"M0","discovery_confidence":0.9}], + "relationships":[{"type":"depends_on|supports|produces_evidence_for|implements|derived_from","from":"id","to":"id","note":""},{"type":"out_of_scope","review_units":["M0"],"note":""}]} + +HARTE REGELN: +- tier=LEGAL_MINIMUM NUR mit legal_basis (Primaerrecht). Sonst tier=BEST_PRACTICE, legal_basis=[]. +- legal_basis NUR Primaerrecht der Regulierung; NIST/OWASP/ISO/BSI => guidance_basis. +- relationships SPARSAM, gerichtet, nur klar belegbar. +- out_of_scope: Review Units, die NICHT zum Thema gehoeren (andere Regulierung/Domaene).""" + + +def build_user(units: list[dict]) -> str: + lines = [] + for u in units: + t = " | ".join(str(x)[:46] for x in u.get("sample_titles", [])[:6]) + lines.append(f"{u['review_unit_id']} (controls={u['n_controls']}): {t}") + return "Review Units:\n" + "\n".join(lines) + + +def synthesize(units, regulation, theme, model): + import anthropic + key = os.environ["ANTHROPIC_API_KEY"] + sys = SYS.replace("__REGULATION__", regulation).replace("__THEME__", theme) + client = anthropic.Anthropic(api_key=key) + with client.messages.stream(model=model, max_tokens=24000, system=sys, + messages=[{"role": "user", "content": build_user(units)}]) as st: + msg = st.get_final_message() + txt = msg.content[0].text + m = re.search(r"\{.*\}", txt, re.DOTALL) + return json.loads(m.group(0) if m else txt) + + +def post_process(data, units, regulation, model): + cmap = {u["review_unit_id"]: u["control_ids"] for u in units} + size = {u["review_unit_id"]: u["n_controls"] for u in units} + obls = [] + for o in data.get("obligations", []): + rus = [r for r in (o.get("member_review_units") or []) if r in cmap] + members = sorted({c for ru in rus for c in cmap[ru]}) + lb = o.get("legal_basis") or [] + tier, review = o.get("tier", "BEST_PRACTICE"), "draft" + if tier == "LEGAL_MINIMUM" and not lb: + tier, review = "BEST_PRACTICE", "needs_legal_basis" + smc = o.get("source_meta_cluster") or (rus[0] if rus else "") + obls.append({ + "id": o["id"], "name": o.get("name", ""), "description": o.get("description", ""), + "tier": tier, "subdomain": o.get("subdomain", ""), + "applicability": o.get("applicability", "universal"), + "evidence_facets": o.get("evidence_facets", {}), "source_role": o.get("source_role", ""), + "legal_basis": lb, "guidance_basis": o.get("guidance_basis") or [], + "member_review_units": rus, "member_controls": members, "member_count": len(members), + "relationships": [], "citation_anchor_ids": [], "citation_status": "pending_span_anchor", + "review_status": review, + "provenance": {"discovery_confidence": o.get("discovery_confidence"), + "source_meta_cluster": smc, "cluster_size": size.get(smc), + "llm_model": model, "synthesis_version": "v1"}}) + rels = [r for r in data.get("relationships", []) + if r.get("type") in SEMANTIC_EDGE_TYPES or r.get("type") == "out_of_scope"] + return {"schema_version": "obligation_registry_v1", "regulation": regulation, + "generated_by": f"obligation_discovery/{model}", "synthesis_version": "v1", + "citation_status": "pending_span_anchor", "obligations": obls, "relationships": rels} + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--units", required=True) + ap.add_argument("--regulation", default="CRA") + ap.add_argument("--theme", default="") + ap.add_argument("--model", default="claude-opus-4-8") + ap.add_argument("--out", required=True) + a = ap.parse_args() + units = json.load(open(a.units, encoding="utf-8")) + data = synthesize(units, a.regulation, a.theme, a.model) + reg = post_process(data, units, a.regulation, a.model) + json.dump(reg, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1) + o = reg["obligations"] + print(f"obligations: {len(o)} | tier: {dict(Counter(x['tier'] for x in o))}") + print(f"written: {a.out}") + + +if __name__ == "__main__": + main() diff --git a/scripts/obligation_discovery/validate_registry.py b/scripts/obligation_discovery/validate_registry.py new file mode 100644 index 00000000..bd0707a4 --- /dev/null +++ b/scripts/obligation_discovery/validate_registry.py @@ -0,0 +1,35 @@ +"""Stufe 4 — Validierung: belastbare Registry-Checks (kein LLM/Key). +Prüft die User-Regeln: LEGAL_MINIMUM braucht legal_basis · member_controls vollständig · +out_of_scope separat · >8-Obligations/Review-Unit-Warnung. Exit-Code 1 bei hartem Fehler. + + python3 scripts/obligation_discovery/validate_registry.py obligations/cra_authentication.json +""" +from __future__ import annotations + +import argparse +import json +import sys + +from _core import validate_registry + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("registry") + a = ap.parse_args() + reg = json.load(open(a.registry, encoding="utf-8")) + v = validate_registry(reg) + print(f"=== validate {a.registry} ===") + print(f" obligations: {v['obligations']}") + print(f" LEGAL_MINIMUM: {v['legal_minimum']}") + print(f" LM ohne legal_basis: {v['lm_without_legal_basis'] or 'keine'}") + print(f" member_controls leer: {v['empty_member_controls'] or 'keine'}") + print(f" >8 Obligations/Review-Unit: {v['over8_per_review_unit'] or 'keine'}") + print(f" out_of_scope: {v['out_of_scope']}") + print(f" semantische Kanten: {v['semantic_edges']}") + print(f" PASSED: {v['passed']}") + sys.exit(0 if v["passed"] else 1) + + +if __name__ == "__main__": + main()