Files
breakpilot-compliance/backend-compliance/compliance/rci/delta_engine.py
T
Benjamin Admin a5687bbc65 feat(rci): Regulatory Change Intelligence foundation (delta over the stored map)
RCI/Delta as a read-/reasoning layer ON TOP of the product-first pipeline. Answers
"what changes relative to my existing Regulatory Map?" — NOT "what does the new
law say in general". No UI, no ingestion (newsletter/mailbox), no RAG, no new
regulations/controls, no legal evaluation outside the stored map.

- 4 core objects (compliance/rci/schemas.py): ComplianceBaseline (snapshot of
  profile + map + registry obligations + required/present evidence), RegulatoryChange
  (simulated/provided INPUT), ObligationDelta (delta_type NEW|CHANGED|REMOVED|
  ALREADY_COVERED|NEEDS_REVIEW|NOT_APPLICABLE), ChangeImpactSummary. delta_type is a
  THIRD vocabulary, disjoint from ClaimCoverage (Welt 1) and ComplianceStatus (Welt 2).
- create_baseline() snapshots the existing pipeline once; assess_change() computes
  deltas deterministically against the snapshot (no re-evaluation).
- 12 tests = the 5 acceptance questions (affects product? new/changed? already
  covered by evidence? needs human review? not relevant?) + repeal/uncertain-reg/
  missing-evidence/boundary. Existing pipeline tests stay green; mypy clean; LOC ok.
- App/reasoning types only — no compliance-meta-model classes (freeze v1.0 untouched).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-26 13:45:23 +02:00

115 lines
4.7 KiB
Python

"""RCI delta engine — assess a RegulatoryChange against a ComplianceBaseline.
Answers "what changes relative to my existing Map?" deterministically, working
ONLY against the stored baseline (no re-evaluation of scope, no new legal
assessment outside the map). Per-obligation classification -> ObligationDelta;
aggregate -> ChangeImpactSummary.
"""
from __future__ import annotations
from typing import List, Tuple
from compliance.reasoning.enums import Confidence
from .schemas import (
ChangeAssessment,
ChangeImpactSummary,
ChangeType,
ComplianceBaseline,
DeltaType,
ObligationDelta,
RegulatoryChange,
)
_ACTION = {DeltaType.NEW, DeltaType.CHANGED, DeltaType.NEEDS_REVIEW}
def _classify(
in_base: bool, has_ev: bool, change_type: ChangeType, rel_app: bool, rel_unc: bool
) -> Tuple[DeltaType, str, Confidence]:
if not (rel_app or rel_unc):
return DeltaType.NOT_APPLICABLE, "Die Änderung betrifft kein Regelwerk Ihrer Map.", Confidence.HIGH
if rel_unc and not rel_app:
return (
DeltaType.NEEDS_REVIEW,
"Betrifft ein für Ihr Produkt noch UNSICHERES Regelwerk — erst Anwendbarkeit klären.",
Confidence.LOW,
)
if change_type == ChangeType.REPEAL:
if in_base:
return DeltaType.REMOVED, "Regelwerk/Pflicht aufgehoben — entfällt für Ihr Produkt.", Confidence.HIGH
return DeltaType.NOT_APPLICABLE, "Aufhebung betrifft keine Ihrer bestehenden Pflichten.", Confidence.HIGH
if not in_base:
return DeltaType.NEW, "Neue Pflicht durch die Änderung — bisher nicht in Ihrer Map.", Confidence.MEDIUM
if change_type == ChangeType.GUIDANCE_UPDATE:
if has_ev:
return (
DeltaType.ALREADY_COVERED,
"Bestehende Pflicht mit vorhandenen Nachweisen — Leitlinien-Update vermutlich abgedeckt.",
Confidence.MEDIUM,
)
return DeltaType.NEEDS_REVIEW, "Bestehende Pflicht ohne Nachweis — Leitlinien-Update prüfen.", Confidence.MEDIUM
return DeltaType.CHANGED, "Bestehende Pflicht inhaltlich geändert — Umsetzung und Nachweis prüfen.", Confidence.MEDIUM
def assess_change(baseline: ComplianceBaseline, change: RegulatoryChange) -> ChangeAssessment:
snap = baseline.regulatory_map_snapshot
app_regs = {v.regulation_id for v in snap.applicable_regulations}
unc_regs = {v.regulation_id for v in snap.uncertain_regulations}
base_obs = set(baseline.applicable_obligations)
affected = set(change.affected_regulations)
rel_app = bool(affected & app_regs)
rel_unc = bool(affected & unc_regs)
affects_product = rel_app or rel_unc
deltas: List[ObligationDelta] = []
for ob in change.affected_obligations:
present = baseline.evidence_refs.get(ob, [])
required = baseline.obligation_evidence_required.get(ob, [])
dt, reason, conf = _classify(ob in base_obs, bool(present), change.change_type, rel_app, rel_unc)
missing = [e for e in required if e not in present] if dt in _ACTION else []
deltas.append(
ObligationDelta(
obligation_id=ob,
delta_type=dt,
reason=reason,
affected_evidence=list(present),
missing_evidence=missing,
confidence=conf,
)
)
return ChangeAssessment(
change_id=change.change_id,
affects_product=affects_product,
deltas=deltas,
summary=_summary(deltas, [d.domain for d in snap.unsupported_domains]),
)
def _ids(deltas: List[ObligationDelta], *types: DeltaType) -> List[str]:
wanted = set(types)
return [d.obligation_id for d in deltas if d.delta_type in wanted]
def _summary(deltas: List[ObligationDelta], unsupported: List[str]) -> ChangeImpactSummary:
n_new = len(_ids(deltas, DeltaType.NEW))
n_changed = len(_ids(deltas, DeltaType.CHANGED))
n_removed = len(_ids(deltas, DeltaType.REMOVED))
n_covered = len(_ids(deltas, DeltaType.ALREADY_COVERED))
n_review = len(_ids(deltas, DeltaType.NEEDS_REVIEW, DeltaType.CHANGED))
n_na = len(_ids(deltas, DeltaType.NOT_APPLICABLE))
return ChangeImpactSummary(
what_changed=(
"%d neu, %d geändert, %d entfällt, %d bereits abgedeckt, %d zu prüfen, %d nicht relevant."
% (n_new, n_changed, n_removed, n_covered, n_review, n_na)
),
what_matters_for_this_product=_ids(deltas, *_ACTION),
already_covered=_ids(deltas, DeltaType.ALREADY_COVERED),
needs_review=_ids(deltas, DeltaType.NEEDS_REVIEW, DeltaType.CHANGED),
not_relevant=_ids(deltas, DeltaType.NOT_APPLICABLE),
unsupported_domains=unsupported,
)