From 402a42d30d9b929ff1e5a7fb30d1c620e282bf87 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 24 Jun 2026 12:28:03 +0200 Subject: [PATCH] feat(obligation): obligation-level aggregation engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Erste Ausführung des Legal Obligation Layer v1: aggregiert Bewertungen auf Kriterium-/Control-Ebene zu Findings auf Obligation-Ebene (Regulation → Legal Obligation → Control → Criterion). - regulierungs-agnostisch (obligation_id/tier/met/legal_basis/conditional) - fail-safe: LM applicable=false→NA · keine erfüllt→FAILED · alle→MET · Teil→PARTIAL; BP/OPT covered→MET sonst OPEN (nie FAILED); LM unbewertbar→UNDETERMINED (Legacy behalten) - Redundanz-Kollaps per OR pro legal_basis-Anforderung → kein künstliches PARTIAL - Applicability als Hook (Prädikat-Engine folgt separat) Shadow-Benchmark (Opus-GT, 3 Firmen): 38 Control-Findings → 13 Obligation-Findings (2,9×); ~23 redundante Falsch-Positive strukturell korrigiert, echte Lücken erhalten, PARTIAL=0. 16/16 Unit-Tests grün. Co-Authored-By: Claude Opus 4.7 --- .../services/obligation_aggregation.py | 179 ++++++++++++++++++ .../tests/test_obligation_aggregation.py | 153 +++++++++++++++ 2 files changed, 332 insertions(+) create mode 100644 backend-compliance/compliance/services/obligation_aggregation.py create mode 100644 backend-compliance/tests/test_obligation_aggregation.py diff --git a/backend-compliance/compliance/services/obligation_aggregation.py b/backend-compliance/compliance/services/obligation_aggregation.py new file mode 100644 index 00000000..ec4d3f3d --- /dev/null +++ b/backend-compliance/compliance/services/obligation_aggregation.py @@ -0,0 +1,179 @@ +"""Obligation Aggregation Engine — Ausführung des Legal Obligation Layer v1. + +Aggregiert Bewertungen auf KRITERIUM-Ebene (pro Control) zu Ergebnissen auf +OBLIGATION-Ebene. Das ist die erstmalige Ausführung des Modells + + Regulation → Legal Obligation → Control → Criterion + +— das Finding entsteht auf der OBLIGATION, nicht pro Control. Damit kollabiert +die im Katalog gemessene Redundanz (portability 11×, recipients 14×): N Controls, +die dieselbe Pflicht prüfen, ergeben EIN Obligation-Finding statt N Control-Findings. + +Regulierungs-agnostisch: kennt nur obligation_id, tier, met, legal_basis, +conditional. DSGVO/CRA/NIS2/DORA/MaschVO/AI-Act speisen dieselbe Funktion. + +Fail-safe (docs-src/development/legal_obligation_layer_v1.md, §Aggregation): + LEGAL_MINIMUM-Obligation: + applicable=false → NA (kein Finding) + keine LM-Anforderung erfüllt → FAILED (Pflicht-Lücke) + alle LM-Anforderungen erfüllt → MET + nur ein Teil erfüllt → PARTIAL + LM nicht bewertbar (Prüfer down) → UNDETERMINED (Aufrufer behält Legacy) + BEST_PRACTICE/OPTIONAL-Obligation (kein LM): + mind. ein Kriterium erfüllt → MET (abgedeckt) + keines → OPEN (nur Empfehlung, NIE FAILED) + +Redundanz-Kollaps: LM-Kriterien EINER Obligation werden zu „Anforderungen" nach +`legal_basis` gruppiert; eine Anforderung gilt als erfüllt, sobald IRGENDEIN Control +sie bestätigt (OR). 9× recipients_disclosed (alle Art 13(1)(e)) = eine Anforderung. +PARTIAL entsteht nur bei mehreren DISTINKTEN LM-Anforderungen (verschiedene +legal_basis) innerhalb einer Obligation. +""" +from __future__ import annotations + +from collections import Counter, defaultdict +from dataclasses import dataclass, field +from typing import Callable, Optional + +LM, BP, OPT = "LEGAL_MINIMUM", "BEST_PRACTICE", "OPTIONAL" +MET, PARTIAL, FAILED = "MET", "PARTIAL", "FAILED" +NA, UNDETERMINED, OPEN = "NA", "UNDETERMINED", "OPEN" +PFLICHT, EMPFEHLUNG, NICHT_ANWENDBAR = "PFLICHT", "EMPFEHLUNG", "NICHT_ANWENDBAR" + +# Predikat-Hook: (conditional, doc_text) → True (anwendbar) / False (→ NA) / None (unbekannt → anwendbar) +ApplicableFn = Callable[[str, str], Optional[bool]] + + +@dataclass(frozen=True) +class CriterionEval: + """Eine Kriteriums-Bewertung eines Controls, einer Obligation zugeordnet.""" + obligation_id: str + tier: str # LEGAL_MINIMUM / BEST_PRACTICE / OPTIONAL + met: Optional[bool] # True erfüllt · False fehlt · None unbestimmt + control_id: str + legal_basis: str = "" + criterion: str = "" + conditional: Optional[str] = None # Applicability-Prädikat der Obligation + + +@dataclass +class ObligationResult: + obligation_id: str + status: str # MET / PARTIAL / FAILED / NA / UNDETERMINED / OPEN + bucket: str # PFLICHT / EMPFEHLUNG / NICHT_ANWENDBAR + tier: str # bestimmende Tier der Obligation + applicable: bool + evidence: list[str] # beitragende control_ids + lm_met: int # erfüllte LM-Anforderungen + lm_total: int # distinkte LM-Anforderungen (bewertbar) + recommendations: list[dict] = field(default_factory=list) + + +def _governing_tier(evals: list[CriterionEval]) -> str: + tiers = {e.tier for e in evals} + if LM in tiers: + return LM + return BP if BP in tiers else OPT + + +def _requirement_state(evals: list[CriterionEval]) -> Optional[bool]: + """Zustand EINER LM-Anforderung über alle prüfenden Controls (OR/Redundanz): + True (irgendwer bestätigt) · None (alle unbestimmt) · False (bewertet, fehlt).""" + if any(e.met is True for e in evals): + return True + if all(e.met is None for e in evals): + return None + return False + + +def _recommendations(evals: list[CriterionEval]) -> list[dict]: + """Nicht erfüllte BEST_PRACTICE/OPTIONAL-Kriterien → Empfehlungen.""" + return [{"criterion": e.criterion, "tier": e.tier, "legal_basis": e.legal_basis, + "control_id": e.control_id} + for e in evals if e.tier in (BP, OPT) and e.met is False] + + +def aggregate_obligation(obligation_id: str, evals: list[CriterionEval], *, + applicable_fn: Optional[ApplicableFn] = None, + doc_text: str = "") -> ObligationResult: + evidence = sorted({e.control_id for e in evals if e.control_id}) + conditional = next((e.conditional for e in evals if e.conditional), None) + tier = _governing_tier(evals) + recs = _recommendations(evals) + + applicable = True + if applicable_fn is not None and conditional: + verdict = applicable_fn(conditional, doc_text) + applicable = True if verdict is None else bool(verdict) + if not applicable: + return ObligationResult(obligation_id, NA, NICHT_ANWENDBAR, tier, False, + evidence, 0, 0, recs) + + lm_evals = [e for e in evals if e.tier == LM] + if lm_evals: + reqs: dict[str, list[CriterionEval]] = defaultdict(list) + for e in lm_evals: + reqs[e.legal_basis or obligation_id].append(e) + states = [_requirement_state(v) for v in reqs.values()] + determinable = [s for s in states if s is not None] + if not determinable: + return ObligationResult(obligation_id, UNDETERMINED, PFLICHT, LM, True, + evidence, 0, len(states), recs) + met = sum(1 for s in determinable if s) + total = len(determinable) + status = MET if met == total else (FAILED if met == 0 else PARTIAL) + return ObligationResult(obligation_id, status, PFLICHT, LM, True, + evidence, met, total, recs) + + # Reine BEST_PRACTICE/OPTIONAL-Obligation: nie Pflicht, nie FAILED. + covered = any(e.met is True for e in evals) + return ObligationResult(obligation_id, MET if covered else OPEN, EMPFEHLUNG, + tier, True, evidence, 0, 0, recs) + + +def aggregate_obligations(evals: list[CriterionEval], *, + applicable_fn: Optional[ApplicableFn] = None, + doc_text: str = "") -> list[ObligationResult]: + """Flache Kriteriums-Liste → ein ObligationResult je obligation_id.""" + groups: dict[str, list[CriterionEval]] = defaultdict(list) + for e in evals: + if e.obligation_id: + groups[e.obligation_id].append(e) + return [aggregate_obligation(oid, g, applicable_fn=applicable_fn, doc_text=doc_text) + for oid, g in groups.items()] + + +def evals_from_tiered(control_id: str, tiered_criteria: list[dict], + detail: list[dict], conditional: Optional[str] = None + ) -> list[CriterionEval]: + """Adapter: tiered_criteria (obligation_id/tier/legal_basis) + das + evaluate_tiered-`detail` (met pro Index, gleiche Reihenfolge) → CriterionEvals. + `conditional` kommt aus der Control-`applicability` (gilt für die Obligation).""" + out: list[CriterionEval] = [] + for i, c in enumerate(tiered_criteria or []): + oid = c.get("obligation_id") + if not oid: + continue + d = detail[i] if i < len(detail) else {} + out.append(CriterionEval( + obligation_id=oid, + tier=(c.get("compliance_tier") or "").upper(), + met=d.get("met"), + control_id=control_id, + legal_basis=c.get("legal_basis") or "", + criterion=c.get("criterion") or "", + conditional=conditional, + )) + return out + + +def summarize(results: list[ObligationResult]) -> dict: + """Phase-C-Kennzahlen: Obligation-Anzahl + Verteilung nach Bucket/Status.""" + return { + "obligations": len(results), + "buckets": dict(Counter(r.bucket for r in results)), + "statuses": dict(Counter(r.status for r in results)), + "pflicht_failed": sum(1 for r in results if r.bucket == PFLICHT and r.status == FAILED), + "pflicht_partial": sum(1 for r in results if r.bucket == PFLICHT and r.status == PARTIAL), + "recommendations": sum(len(r.recommendations) for r in results), + } diff --git a/backend-compliance/tests/test_obligation_aggregation.py b/backend-compliance/tests/test_obligation_aggregation.py new file mode 100644 index 00000000..b99dda4d --- /dev/null +++ b/backend-compliance/tests/test_obligation_aggregation.py @@ -0,0 +1,153 @@ +"""Unit-Tests Obligation Aggregation Engine (Legal Obligation Layer v1). + +Deckt die fail-safe Regeln + den Redundanz-Kollaps ab (echte DSE-Szenarien: +recipients 9×, objection LM+BP, portability OPTIONAL-Format).""" +from compliance.services.obligation_aggregation import ( + BP, LM, OPT, CriterionEval, aggregate_obligation, aggregate_obligations, + evals_from_tiered, summarize, +) + + +def _ce(oid, tier, met, cid, basis="", crit="", cond=None): + return CriterionEval(oid, tier, met, cid, basis, crit, cond) + + +class TestRedundancyCollapse: + def test_nine_controls_one_confirms_collapses_to_one_met(self): + # recipients_disclosed: 9 Controls, gleiche Anforderung (Art 13(1)(e)) + evals = [_ce("recipients_disclosed", LM, i == 4, f"DATA-{i}", "Art. 13(1)(e)") + for i in range(9)] + res = aggregate_obligation("recipients_disclosed", evals) + assert res.status == "MET" + assert res.lm_met == 1 and res.lm_total == 1 # 9 → 1 Anforderung + assert len(res.evidence) == 9 + + def test_all_nine_absent_fails_once(self): + evals = [_ce("recipients_disclosed", LM, False, f"DATA-{i}", "Art. 13(1)(e)") + for i in range(9)] + res = aggregate_obligation("recipients_disclosed", evals) + assert res.status == "FAILED" + assert res.bucket == "PFLICHT" + + +class TestPartialMultiFacet: + def test_two_distinct_lm_requirements_one_met_is_partial(self): + evals = [ + _ce("transfer", LM, True, "C1", "Art. 13(1)(f)"), # erfüllt + _ce("transfer", LM, False, "C2", "Art. 46"), # fehlt → distinkt + ] + res = aggregate_obligation("transfer", evals) + assert res.status == "PARTIAL" + assert res.lm_met == 1 and res.lm_total == 2 + + def test_both_distinct_requirements_met(self): + evals = [ + _ce("transfer", LM, True, "C1", "Art. 13(1)(f)"), + _ce("transfer", LM, True, "C2", "Art. 46"), + ] + assert aggregate_obligation("transfer", evals).status == "MET" + + +class TestApplicability: + def test_conditional_false_is_na(self): + evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")] + res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: False) + assert res.status == "NA" + assert res.bucket == "NICHT_ANWENDBAR" + assert res.applicable is False + + def test_conditional_true_evaluates_normally(self): + evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")] + res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: True) + assert res.status == "FAILED" + + def test_conditional_unknown_defaults_applicable(self): + evals = [_ce("transfer", LM, True, "C1", "Art. 44", cond="x")] + res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: None) + assert res.applicable is True and res.status == "MET" + + def test_no_predicate_means_applicable(self): + evals = [_ce("transfer", LM, True, "C1", cond="x")] + assert aggregate_obligation("transfer", evals).applicable is True + + +class TestUndetermined: + def test_all_lm_none_is_undetermined(self): + evals = [_ce("ob", LM, None, "C1", "b"), _ce("ob", LM, None, "C2", "b")] + res = aggregate_obligation("ob", evals) + assert res.status == "UNDETERMINED" + assert res.bucket == "PFLICHT" + + def test_one_determinable_requirement_decides(self): + # eine Anforderung unbestimmt, die andere klar erfüllt → MET über die bewertbare + evals = [_ce("ob", LM, None, "C1", "b1"), _ce("ob", LM, True, "C2", "b2")] + res = aggregate_obligation("ob", evals) + assert res.status == "MET" + assert res.lm_total == 1 # nur die bewertbare Anforderung zählt + + +class TestBestPracticeOnly: + def test_pure_bp_covered_is_met_recommendation_bucket(self): + evals = [_ce("art20_format", OPT, True, "C1")] + res = aggregate_obligation("art20_format", evals) + assert res.status == "MET" + assert res.bucket == "EMPFEHLUNG" + + def test_pure_bp_not_covered_is_open_never_failed(self): + evals = [_ce("art20_format", OPT, False, "C1", crit="JSON/CSV")] + res = aggregate_obligation("art20_format", evals) + assert res.status == "OPEN" + assert res.bucket == "EMPFEHLUNG" + assert len(res.recommendations) == 1 + + +class TestRecommendationsWithinLm: + def test_unmet_bp_in_lm_obligation_becomes_recommendation(self): + # objection_direct_marketing: LM erfüllt + 3 BP teils offen + evals = [ + _ce("obj_dm", LM, True, "SEC-8410", "Art. 21(2)", "Recht"), + _ce("obj_dm", BP, False, "SEC-8410", "", "Kontaktweg"), + _ce("obj_dm", BP, True, "SEC-8410", "", "kostenlos"), + ] + res = aggregate_obligation("obj_dm", evals) + assert res.status == "MET" and res.bucket == "PFLICHT" + assert len(res.recommendations) == 1 + assert res.recommendations[0]["criterion"] == "Kontaktweg" + + +class TestAdapterAndSummary: + def test_evals_from_tiered_zips_and_skips_no_obligation(self): + tc = [ + {"criterion": "Recht", "compliance_tier": "LEGAL_MINIMUM", + "legal_basis": "Art. 21(1)", "obligation_id": "obj_gen"}, + {"criterion": "Weg", "compliance_tier": "BEST_PRACTICE", + "legal_basis": "", "obligation_id": "obj_gen"}, + {"criterion": "ohne", "compliance_tier": "OPTIONAL"}, # kein obligation_id → skip + ] + detail = [{"met": True}, {"met": False}, {"met": True}] + evals = evals_from_tiered("AUTH-2051", tc, detail, conditional="x") + assert len(evals) == 2 + assert evals[0].met is True and evals[0].conditional == "x" + assert evals[1].tier == BP and evals[1].met is False + + def test_aggregate_obligations_groups_by_id(self): + evals = [ + _ce("a", LM, True, "C1", "b"), + _ce("a", LM, True, "C2", "b"), + _ce("b", LM, False, "C3", "b"), + ] + results = {r.obligation_id: r for r in aggregate_obligations(evals)} + assert set(results) == {"a", "b"} + assert results["a"].status == "MET" + assert results["b"].status == "FAILED" + + def test_summarize_counts_buckets_and_failures(self): + evals = [ + _ce("a", LM, False, "C1", "b"), # FAILED Pflicht + _ce("c", OPT, False, "C3", crit="x"), # OPEN Empfehlung + ] + s = summarize(aggregate_obligations(evals)) + assert s["obligations"] == 2 + assert s["pflicht_failed"] == 1 + assert s["buckets"]["PFLICHT"] == 1 + assert s["buckets"]["EMPFEHLUNG"] == 1