feat(obligation): obligation-level aggregation engine

Erste Ausführung des Legal Obligation Layer v1: aggregiert Bewertungen auf
Kriterium-/Control-Ebene zu Findings auf Obligation-Ebene
(Regulation → Legal Obligation → Control → Criterion).

- regulierungs-agnostisch (obligation_id/tier/met/legal_basis/conditional)
- fail-safe: LM applicable=false→NA · keine erfüllt→FAILED · alle→MET · Teil→PARTIAL;
  BP/OPT covered→MET sonst OPEN (nie FAILED); LM unbewertbar→UNDETERMINED (Legacy behalten)
- Redundanz-Kollaps per OR pro legal_basis-Anforderung → kein künstliches PARTIAL
- Applicability als Hook (Prädikat-Engine folgt separat)

Shadow-Benchmark (Opus-GT, 3 Firmen): 38 Control-Findings → 13 Obligation-Findings
(2,9×); ~23 redundante Falsch-Positive strukturell korrigiert, echte Lücken erhalten,
PARTIAL=0. 16/16 Unit-Tests grün.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-24 12:28:03 +02:00
parent b83c3e6e00
commit 402a42d30d
2 changed files with 332 additions and 0 deletions
@@ -0,0 +1,153 @@
"""Unit-Tests Obligation Aggregation Engine (Legal Obligation Layer v1).
Deckt die fail-safe Regeln + den Redundanz-Kollaps ab (echte DSE-Szenarien:
recipients 9×, objection LM+BP, portability OPTIONAL-Format)."""
from compliance.services.obligation_aggregation import (
BP, LM, OPT, CriterionEval, aggregate_obligation, aggregate_obligations,
evals_from_tiered, summarize,
)
def _ce(oid, tier, met, cid, basis="", crit="", cond=None):
return CriterionEval(oid, tier, met, cid, basis, crit, cond)
class TestRedundancyCollapse:
def test_nine_controls_one_confirms_collapses_to_one_met(self):
# recipients_disclosed: 9 Controls, gleiche Anforderung (Art 13(1)(e))
evals = [_ce("recipients_disclosed", LM, i == 4, f"DATA-{i}", "Art. 13(1)(e)")
for i in range(9)]
res = aggregate_obligation("recipients_disclosed", evals)
assert res.status == "MET"
assert res.lm_met == 1 and res.lm_total == 1 # 9 → 1 Anforderung
assert len(res.evidence) == 9
def test_all_nine_absent_fails_once(self):
evals = [_ce("recipients_disclosed", LM, False, f"DATA-{i}", "Art. 13(1)(e)")
for i in range(9)]
res = aggregate_obligation("recipients_disclosed", evals)
assert res.status == "FAILED"
assert res.bucket == "PFLICHT"
class TestPartialMultiFacet:
def test_two_distinct_lm_requirements_one_met_is_partial(self):
evals = [
_ce("transfer", LM, True, "C1", "Art. 13(1)(f)"), # erfüllt
_ce("transfer", LM, False, "C2", "Art. 46"), # fehlt → distinkt
]
res = aggregate_obligation("transfer", evals)
assert res.status == "PARTIAL"
assert res.lm_met == 1 and res.lm_total == 2
def test_both_distinct_requirements_met(self):
evals = [
_ce("transfer", LM, True, "C1", "Art. 13(1)(f)"),
_ce("transfer", LM, True, "C2", "Art. 46"),
]
assert aggregate_obligation("transfer", evals).status == "MET"
class TestApplicability:
def test_conditional_false_is_na(self):
evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")]
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: False)
assert res.status == "NA"
assert res.bucket == "NICHT_ANWENDBAR"
assert res.applicable is False
def test_conditional_true_evaluates_normally(self):
evals = [_ce("transfer", LM, False, "C1", "Art. 44", cond="has_third_country_transfer")]
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: True)
assert res.status == "FAILED"
def test_conditional_unknown_defaults_applicable(self):
evals = [_ce("transfer", LM, True, "C1", "Art. 44", cond="x")]
res = aggregate_obligation("transfer", evals, applicable_fn=lambda c, t: None)
assert res.applicable is True and res.status == "MET"
def test_no_predicate_means_applicable(self):
evals = [_ce("transfer", LM, True, "C1", cond="x")]
assert aggregate_obligation("transfer", evals).applicable is True
class TestUndetermined:
def test_all_lm_none_is_undetermined(self):
evals = [_ce("ob", LM, None, "C1", "b"), _ce("ob", LM, None, "C2", "b")]
res = aggregate_obligation("ob", evals)
assert res.status == "UNDETERMINED"
assert res.bucket == "PFLICHT"
def test_one_determinable_requirement_decides(self):
# eine Anforderung unbestimmt, die andere klar erfüllt → MET über die bewertbare
evals = [_ce("ob", LM, None, "C1", "b1"), _ce("ob", LM, True, "C2", "b2")]
res = aggregate_obligation("ob", evals)
assert res.status == "MET"
assert res.lm_total == 1 # nur die bewertbare Anforderung zählt
class TestBestPracticeOnly:
def test_pure_bp_covered_is_met_recommendation_bucket(self):
evals = [_ce("art20_format", OPT, True, "C1")]
res = aggregate_obligation("art20_format", evals)
assert res.status == "MET"
assert res.bucket == "EMPFEHLUNG"
def test_pure_bp_not_covered_is_open_never_failed(self):
evals = [_ce("art20_format", OPT, False, "C1", crit="JSON/CSV")]
res = aggregate_obligation("art20_format", evals)
assert res.status == "OPEN"
assert res.bucket == "EMPFEHLUNG"
assert len(res.recommendations) == 1
class TestRecommendationsWithinLm:
def test_unmet_bp_in_lm_obligation_becomes_recommendation(self):
# objection_direct_marketing: LM erfüllt + 3 BP teils offen
evals = [
_ce("obj_dm", LM, True, "SEC-8410", "Art. 21(2)", "Recht"),
_ce("obj_dm", BP, False, "SEC-8410", "", "Kontaktweg"),
_ce("obj_dm", BP, True, "SEC-8410", "", "kostenlos"),
]
res = aggregate_obligation("obj_dm", evals)
assert res.status == "MET" and res.bucket == "PFLICHT"
assert len(res.recommendations) == 1
assert res.recommendations[0]["criterion"] == "Kontaktweg"
class TestAdapterAndSummary:
def test_evals_from_tiered_zips_and_skips_no_obligation(self):
tc = [
{"criterion": "Recht", "compliance_tier": "LEGAL_MINIMUM",
"legal_basis": "Art. 21(1)", "obligation_id": "obj_gen"},
{"criterion": "Weg", "compliance_tier": "BEST_PRACTICE",
"legal_basis": "", "obligation_id": "obj_gen"},
{"criterion": "ohne", "compliance_tier": "OPTIONAL"}, # kein obligation_id → skip
]
detail = [{"met": True}, {"met": False}, {"met": True}]
evals = evals_from_tiered("AUTH-2051", tc, detail, conditional="x")
assert len(evals) == 2
assert evals[0].met is True and evals[0].conditional == "x"
assert evals[1].tier == BP and evals[1].met is False
def test_aggregate_obligations_groups_by_id(self):
evals = [
_ce("a", LM, True, "C1", "b"),
_ce("a", LM, True, "C2", "b"),
_ce("b", LM, False, "C3", "b"),
]
results = {r.obligation_id: r for r in aggregate_obligations(evals)}
assert set(results) == {"a", "b"}
assert results["a"].status == "MET"
assert results["b"].status == "FAILED"
def test_summarize_counts_buckets_and_failures(self):
evals = [
_ce("a", LM, False, "C1", "b"), # FAILED Pflicht
_ce("c", OPT, False, "C3", crit="x"), # OPEN Empfehlung
]
s = summarize(aggregate_obligations(evals))
assert s["obligations"] == 2
assert s["pflicht_failed"] == 1
assert s["buckets"]["PFLICHT"] == 1
assert s["buckets"]["EMPFEHLUNG"] == 1