"""Obligation Aggregation Engine — Ausführung des Legal Obligation Layer v1. Aggregiert Bewertungen auf KRITERIUM-Ebene (pro Control) zu Ergebnissen auf OBLIGATION-Ebene. Das ist die erstmalige Ausführung des Modells Regulation → Legal Obligation → Control → Criterion — das Finding entsteht auf der OBLIGATION, nicht pro Control. Damit kollabiert die im Katalog gemessene Redundanz (portability 11×, recipients 14×): N Controls, die dieselbe Pflicht prüfen, ergeben EIN Obligation-Finding statt N Control-Findings. Regulierungs-agnostisch: kennt nur obligation_id, tier, met, legal_basis, conditional. DSGVO/CRA/NIS2/DORA/MaschVO/AI-Act speisen dieselbe Funktion. Fail-safe (docs-src/development/legal_obligation_layer_v1.md, §Aggregation): LEGAL_MINIMUM-Obligation: applicable=false → NA (kein Finding) keine LM-Anforderung erfüllt → FAILED (Pflicht-Lücke) alle LM-Anforderungen erfüllt → MET nur ein Teil erfüllt → PARTIAL LM nicht bewertbar (Prüfer down) → UNDETERMINED (Aufrufer behält Legacy) BEST_PRACTICE/OPTIONAL-Obligation (kein LM): mind. ein Kriterium erfüllt → MET (abgedeckt) keines → OPEN (nur Empfehlung, NIE FAILED) Redundanz-Kollaps: LM-Kriterien EINER Obligation werden zu „Anforderungen" nach `legal_basis` gruppiert; eine Anforderung gilt als erfüllt, sobald IRGENDEIN Control sie bestätigt (OR). 9× recipients_disclosed (alle Art 13(1)(e)) = eine Anforderung. PARTIAL entsteht nur bei mehreren DISTINKTEN LM-Anforderungen (verschiedene legal_basis) innerhalb einer Obligation. """ from __future__ import annotations from collections import Counter, defaultdict from dataclasses import dataclass, field from typing import Callable, Optional LM, BP, OPT = "LEGAL_MINIMUM", "BEST_PRACTICE", "OPTIONAL" MET, PARTIAL, FAILED = "MET", "PARTIAL", "FAILED" NA, UNDETERMINED, OPEN = "NA", "UNDETERMINED", "OPEN" PFLICHT, EMPFEHLUNG, NICHT_ANWENDBAR = "PFLICHT", "EMPFEHLUNG", "NICHT_ANWENDBAR" # Predikat-Hook: (conditional, doc_text) → True (anwendbar) / False (→ NA) / None (unbekannt → anwendbar) ApplicableFn = Callable[[str, str], Optional[bool]] @dataclass(frozen=True) class CriterionEval: """Eine Kriteriums-Bewertung eines Controls, einer Obligation zugeordnet.""" obligation_id: str tier: str # LEGAL_MINIMUM / BEST_PRACTICE / OPTIONAL met: Optional[bool] # True erfüllt · False fehlt · None unbestimmt control_id: str legal_basis: str = "" criterion: str = "" conditional: Optional[str] = None # Applicability-Prädikat der Obligation @dataclass class ObligationResult: obligation_id: str status: str # MET / PARTIAL / FAILED / NA / UNDETERMINED / OPEN bucket: str # PFLICHT / EMPFEHLUNG / NICHT_ANWENDBAR tier: str # bestimmende Tier der Obligation applicable: bool evidence: list[str] # beitragende control_ids lm_met: int # erfüllte LM-Anforderungen lm_total: int # distinkte LM-Anforderungen (bewertbar) recommendations: list[dict] = field(default_factory=list) def _governing_tier(evals: list[CriterionEval]) -> str: tiers = {e.tier for e in evals} if LM in tiers: return LM return BP if BP in tiers else OPT def _requirement_state(evals: list[CriterionEval]) -> Optional[bool]: """Zustand EINER LM-Anforderung über alle prüfenden Controls (OR/Redundanz): True (irgendwer bestätigt) · None (alle unbestimmt) · False (bewertet, fehlt).""" if any(e.met is True for e in evals): return True if all(e.met is None for e in evals): return None return False def _recommendations(evals: list[CriterionEval]) -> list[dict]: """Nicht erfüllte BEST_PRACTICE/OPTIONAL-Kriterien → Empfehlungen.""" return [{"criterion": e.criterion, "tier": e.tier, "legal_basis": e.legal_basis, "control_id": e.control_id} for e in evals if e.tier in (BP, OPT) and e.met is False] def aggregate_obligation(obligation_id: str, evals: list[CriterionEval], *, applicable_fn: Optional[ApplicableFn] = None, doc_text: str = "") -> ObligationResult: evidence = sorted({e.control_id for e in evals if e.control_id}) conditional = next((e.conditional for e in evals if e.conditional), None) tier = _governing_tier(evals) recs = _recommendations(evals) applicable = True if applicable_fn is not None and conditional: verdict = applicable_fn(conditional, doc_text) applicable = True if verdict is None else bool(verdict) if not applicable: return ObligationResult(obligation_id, NA, NICHT_ANWENDBAR, tier, False, evidence, 0, 0, recs) lm_evals = [e for e in evals if e.tier == LM] if lm_evals: reqs: dict[str, list[CriterionEval]] = defaultdict(list) for e in lm_evals: reqs[e.legal_basis or obligation_id].append(e) states = [_requirement_state(v) for v in reqs.values()] determinable = [s for s in states if s is not None] if not determinable: return ObligationResult(obligation_id, UNDETERMINED, PFLICHT, LM, True, evidence, 0, len(states), recs) met = sum(1 for s in determinable if s) total = len(determinable) status = MET if met == total else (FAILED if met == 0 else PARTIAL) return ObligationResult(obligation_id, status, PFLICHT, LM, True, evidence, met, total, recs) # Reine BEST_PRACTICE/OPTIONAL-Obligation: nie Pflicht, nie FAILED. covered = any(e.met is True for e in evals) return ObligationResult(obligation_id, MET if covered else OPEN, EMPFEHLUNG, tier, True, evidence, 0, 0, recs) def aggregate_obligations(evals: list[CriterionEval], *, applicable_fn: Optional[ApplicableFn] = None, doc_text: str = "") -> list[ObligationResult]: """Flache Kriteriums-Liste → ein ObligationResult je obligation_id.""" groups: dict[str, list[CriterionEval]] = defaultdict(list) for e in evals: if e.obligation_id: groups[e.obligation_id].append(e) return [aggregate_obligation(oid, g, applicable_fn=applicable_fn, doc_text=doc_text) for oid, g in groups.items()] def evals_from_tiered(control_id: str, tiered_criteria: list[dict], detail: list[dict], conditional: Optional[str] = None ) -> list[CriterionEval]: """Adapter: tiered_criteria (obligation_id/tier/legal_basis) + das evaluate_tiered-`detail` (met pro Index, gleiche Reihenfolge) → CriterionEvals. `conditional` kommt aus der Control-`applicability` (gilt für die Obligation).""" out: list[CriterionEval] = [] for i, c in enumerate(tiered_criteria or []): oid = c.get("obligation_id") if not oid: continue d = detail[i] if i < len(detail) else {} out.append(CriterionEval( obligation_id=oid, tier=(c.get("compliance_tier") or "").upper(), met=d.get("met"), control_id=control_id, legal_basis=c.get("legal_basis") or "", criterion=c.get("criterion") or "", conditional=conditional, )) return out def summarize(results: list[ObligationResult]) -> dict: """Phase-C-Kennzahlen: Obligation-Anzahl + Verteilung nach Bucket/Status.""" return { "obligations": len(results), "buckets": dict(Counter(r.bucket for r in results)), "statuses": dict(Counter(r.status for r in results)), "pflicht_failed": sum(1 for r in results if r.bucket == PFLICHT and r.status == FAILED), "pflicht_partial": sum(1 for r in results if r.bucket == PFLICHT and r.status == PARTIAL), "recommendations": sum(len(r.recommendations) for r in results), }