Files
breakpilot-compliance/backend-compliance/compliance/services/obligation_aggregation.py
T
Benjamin Admin 402a42d30d feat(obligation): obligation-level aggregation engine
Erste Ausführung des Legal Obligation Layer v1: aggregiert Bewertungen auf
Kriterium-/Control-Ebene zu Findings auf Obligation-Ebene
(Regulation → Legal Obligation → Control → Criterion).

- regulierungs-agnostisch (obligation_id/tier/met/legal_basis/conditional)
- fail-safe: LM applicable=false→NA · keine erfüllt→FAILED · alle→MET · Teil→PARTIAL;
  BP/OPT covered→MET sonst OPEN (nie FAILED); LM unbewertbar→UNDETERMINED (Legacy behalten)
- Redundanz-Kollaps per OR pro legal_basis-Anforderung → kein künstliches PARTIAL
- Applicability als Hook (Prädikat-Engine folgt separat)

Shadow-Benchmark (Opus-GT, 3 Firmen): 38 Control-Findings → 13 Obligation-Findings
(2,9×); ~23 redundante Falsch-Positive strukturell korrigiert, echte Lücken erhalten,
PARTIAL=0. 16/16 Unit-Tests grün.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 12:28:03 +02:00

180 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Obligation Aggregation Engine — Ausführung des Legal Obligation Layer v1.
Aggregiert Bewertungen auf KRITERIUM-Ebene (pro Control) zu Ergebnissen auf
OBLIGATION-Ebene. Das ist die erstmalige Ausführung des Modells
Regulation → Legal Obligation → Control → Criterion
— das Finding entsteht auf der OBLIGATION, nicht pro Control. Damit kollabiert
die im Katalog gemessene Redundanz (portability 11×, recipients 14×): N Controls,
die dieselbe Pflicht prüfen, ergeben EIN Obligation-Finding statt N Control-Findings.
Regulierungs-agnostisch: kennt nur obligation_id, tier, met, legal_basis,
conditional. DSGVO/CRA/NIS2/DORA/MaschVO/AI-Act speisen dieselbe Funktion.
Fail-safe (docs-src/development/legal_obligation_layer_v1.md, §Aggregation):
LEGAL_MINIMUM-Obligation:
applicable=false → NA (kein Finding)
keine LM-Anforderung erfüllt → FAILED (Pflicht-Lücke)
alle LM-Anforderungen erfüllt → MET
nur ein Teil erfüllt → PARTIAL
LM nicht bewertbar (Prüfer down) → UNDETERMINED (Aufrufer behält Legacy)
BEST_PRACTICE/OPTIONAL-Obligation (kein LM):
mind. ein Kriterium erfüllt → MET (abgedeckt)
keines → OPEN (nur Empfehlung, NIE FAILED)
Redundanz-Kollaps: LM-Kriterien EINER Obligation werden zu „Anforderungen" nach
`legal_basis` gruppiert; eine Anforderung gilt als erfüllt, sobald IRGENDEIN Control
sie bestätigt (OR). 9× recipients_disclosed (alle Art 13(1)(e)) = eine Anforderung.
PARTIAL entsteht nur bei mehreren DISTINKTEN LM-Anforderungen (verschiedene
legal_basis) innerhalb einer Obligation.
"""
from __future__ import annotations
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from typing import Callable, Optional
LM, BP, OPT = "LEGAL_MINIMUM", "BEST_PRACTICE", "OPTIONAL"
MET, PARTIAL, FAILED = "MET", "PARTIAL", "FAILED"
NA, UNDETERMINED, OPEN = "NA", "UNDETERMINED", "OPEN"
PFLICHT, EMPFEHLUNG, NICHT_ANWENDBAR = "PFLICHT", "EMPFEHLUNG", "NICHT_ANWENDBAR"
# Predikat-Hook: (conditional, doc_text) → True (anwendbar) / False (→ NA) / None (unbekannt → anwendbar)
ApplicableFn = Callable[[str, str], Optional[bool]]
@dataclass(frozen=True)
class CriterionEval:
"""Eine Kriteriums-Bewertung eines Controls, einer Obligation zugeordnet."""
obligation_id: str
tier: str # LEGAL_MINIMUM / BEST_PRACTICE / OPTIONAL
met: Optional[bool] # True erfüllt · False fehlt · None unbestimmt
control_id: str
legal_basis: str = ""
criterion: str = ""
conditional: Optional[str] = None # Applicability-Prädikat der Obligation
@dataclass
class ObligationResult:
obligation_id: str
status: str # MET / PARTIAL / FAILED / NA / UNDETERMINED / OPEN
bucket: str # PFLICHT / EMPFEHLUNG / NICHT_ANWENDBAR
tier: str # bestimmende Tier der Obligation
applicable: bool
evidence: list[str] # beitragende control_ids
lm_met: int # erfüllte LM-Anforderungen
lm_total: int # distinkte LM-Anforderungen (bewertbar)
recommendations: list[dict] = field(default_factory=list)
def _governing_tier(evals: list[CriterionEval]) -> str:
tiers = {e.tier for e in evals}
if LM in tiers:
return LM
return BP if BP in tiers else OPT
def _requirement_state(evals: list[CriterionEval]) -> Optional[bool]:
"""Zustand EINER LM-Anforderung über alle prüfenden Controls (OR/Redundanz):
True (irgendwer bestätigt) · None (alle unbestimmt) · False (bewertet, fehlt)."""
if any(e.met is True for e in evals):
return True
if all(e.met is None for e in evals):
return None
return False
def _recommendations(evals: list[CriterionEval]) -> list[dict]:
"""Nicht erfüllte BEST_PRACTICE/OPTIONAL-Kriterien → Empfehlungen."""
return [{"criterion": e.criterion, "tier": e.tier, "legal_basis": e.legal_basis,
"control_id": e.control_id}
for e in evals if e.tier in (BP, OPT) and e.met is False]
def aggregate_obligation(obligation_id: str, evals: list[CriterionEval], *,
applicable_fn: Optional[ApplicableFn] = None,
doc_text: str = "") -> ObligationResult:
evidence = sorted({e.control_id for e in evals if e.control_id})
conditional = next((e.conditional for e in evals if e.conditional), None)
tier = _governing_tier(evals)
recs = _recommendations(evals)
applicable = True
if applicable_fn is not None and conditional:
verdict = applicable_fn(conditional, doc_text)
applicable = True if verdict is None else bool(verdict)
if not applicable:
return ObligationResult(obligation_id, NA, NICHT_ANWENDBAR, tier, False,
evidence, 0, 0, recs)
lm_evals = [e for e in evals if e.tier == LM]
if lm_evals:
reqs: dict[str, list[CriterionEval]] = defaultdict(list)
for e in lm_evals:
reqs[e.legal_basis or obligation_id].append(e)
states = [_requirement_state(v) for v in reqs.values()]
determinable = [s for s in states if s is not None]
if not determinable:
return ObligationResult(obligation_id, UNDETERMINED, PFLICHT, LM, True,
evidence, 0, len(states), recs)
met = sum(1 for s in determinable if s)
total = len(determinable)
status = MET if met == total else (FAILED if met == 0 else PARTIAL)
return ObligationResult(obligation_id, status, PFLICHT, LM, True,
evidence, met, total, recs)
# Reine BEST_PRACTICE/OPTIONAL-Obligation: nie Pflicht, nie FAILED.
covered = any(e.met is True for e in evals)
return ObligationResult(obligation_id, MET if covered else OPEN, EMPFEHLUNG,
tier, True, evidence, 0, 0, recs)
def aggregate_obligations(evals: list[CriterionEval], *,
applicable_fn: Optional[ApplicableFn] = None,
doc_text: str = "") -> list[ObligationResult]:
"""Flache Kriteriums-Liste → ein ObligationResult je obligation_id."""
groups: dict[str, list[CriterionEval]] = defaultdict(list)
for e in evals:
if e.obligation_id:
groups[e.obligation_id].append(e)
return [aggregate_obligation(oid, g, applicable_fn=applicable_fn, doc_text=doc_text)
for oid, g in groups.items()]
def evals_from_tiered(control_id: str, tiered_criteria: list[dict],
detail: list[dict], conditional: Optional[str] = None
) -> list[CriterionEval]:
"""Adapter: tiered_criteria (obligation_id/tier/legal_basis) + das
evaluate_tiered-`detail` (met pro Index, gleiche Reihenfolge) → CriterionEvals.
`conditional` kommt aus der Control-`applicability` (gilt für die Obligation)."""
out: list[CriterionEval] = []
for i, c in enumerate(tiered_criteria or []):
oid = c.get("obligation_id")
if not oid:
continue
d = detail[i] if i < len(detail) else {}
out.append(CriterionEval(
obligation_id=oid,
tier=(c.get("compliance_tier") or "").upper(),
met=d.get("met"),
control_id=control_id,
legal_basis=c.get("legal_basis") or "",
criterion=c.get("criterion") or "",
conditional=conditional,
))
return out
def summarize(results: list[ObligationResult]) -> dict:
"""Phase-C-Kennzahlen: Obligation-Anzahl + Verteilung nach Bucket/Status."""
return {
"obligations": len(results),
"buckets": dict(Counter(r.bucket for r in results)),
"statuses": dict(Counter(r.status for r in results)),
"pflicht_failed": sum(1 for r in results if r.bucket == PFLICHT and r.status == FAILED),
"pflicht_partial": sum(1 for r in results if r.bucket == PFLICHT and r.status == PARTIAL),
"recommendations": sum(len(r.recommendations) for r in results),
}