feat(rci): Regulatory Change Intelligence foundation (delta over the stored map)

RCI/Delta as a read-/reasoning layer ON TOP of the product-first pipeline. Answers
"what changes relative to my existing Regulatory Map?" — NOT "what does the new
law say in general". No UI, no ingestion (newsletter/mailbox), no RAG, no new
regulations/controls, no legal evaluation outside the stored map.

- 4 core objects (compliance/rci/schemas.py): ComplianceBaseline (snapshot of
  profile + map + registry obligations + required/present evidence), RegulatoryChange
  (simulated/provided INPUT), ObligationDelta (delta_type NEW|CHANGED|REMOVED|
  ALREADY_COVERED|NEEDS_REVIEW|NOT_APPLICABLE), ChangeImpactSummary. delta_type is a
  THIRD vocabulary, disjoint from ClaimCoverage (Welt 1) and ComplianceStatus (Welt 2).
- create_baseline() snapshots the existing pipeline once; assess_change() computes
  deltas deterministically against the snapshot (no re-evaluation).
- 12 tests = the 5 acceptance questions (affects product? new/changed? already
  covered by evidence? needs human review? not relevant?) + repeal/uncertain-reg/
  missing-evidence/boundary. Existing pipeline tests stay green; mypy clean; LOC ok.
- App/reasoning types only — no compliance-meta-model classes (freeze v1.0 untouched).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-26 13:45:23 +02:00
parent da466b3821
commit a5687bbc65
5 changed files with 432 additions and 0 deletions
@@ -0,0 +1,34 @@
"""Regulatory Change Intelligence (RCI) — delta layer over the product-first map.
Answers "what changes relative to my existing Regulatory Map?" — NOT "what does
the new law say in general". Snapshot the pipeline into a ComplianceBaseline, then
assess a (simulated/provided) RegulatoryChange into per-obligation deltas + a
management ChangeImpactSummary. Read/reasoning only — no UI, no ingestion, no RAG,
no new regulations/controls, no legal evaluation outside the stored map.
"""
from __future__ import annotations
from .baseline import create_baseline
from .delta_engine import assess_change
from .schemas import (
ChangeAssessment,
ChangeImpactSummary,
ChangeType,
ComplianceBaseline,
DeltaType,
ObligationDelta,
RegulatoryChange,
)
__all__ = [
"create_baseline",
"assess_change",
"ComplianceBaseline",
"RegulatoryChange",
"ObligationDelta",
"ChangeImpactSummary",
"ChangeAssessment",
"DeltaType",
"ChangeType",
]
@@ -0,0 +1,44 @@
"""Snapshot the current product-first pipeline into a ComplianceBaseline.
This is the ONLY place RCI runs the pipeline — to freeze a point-in-time map +
registry-linked obligations + their required evidence. Everything downstream
(delta computation) works purely against this snapshot, never re-evaluating.
"""
from __future__ import annotations
from typing import Dict, List, Optional
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
from compliance.profile.to_reasoning import to_reasoning_profile
from compliance.reasoning.obligation_engine import derive_obligations
from compliance.regulatory_map.renderer import render_regulatory_map
from .schemas import ComplianceBaseline
def create_baseline(
profile: CanonicalProductRegulatoryProfile,
evidence_refs: Optional[Dict[str, List[str]]] = None,
baseline_id: str = "baseline",
created_at: Optional[str] = None,
) -> ComplianceBaseline:
reg_map = render_regulatory_map(profile)
obligations = derive_obligations(to_reasoning_profile(profile)).applicable_obligations
applicable: List[str] = []
required: Dict[str, List[str]] = {}
for ob in obligations:
if ob.registry_anchor: # only registry-linked obligations enter the baseline
applicable.append(ob.obligation_id)
required[ob.obligation_id] = list(ob.required_evidence)
return ComplianceBaseline(
baseline_id=baseline_id,
product_profile_snapshot=profile,
regulatory_map_snapshot=reg_map,
applicable_obligations=applicable,
obligation_evidence_required=required,
evidence_refs=dict(evidence_refs or {}),
created_at=created_at,
)
@@ -0,0 +1,114 @@
"""RCI delta engine — assess a RegulatoryChange against a ComplianceBaseline.
Answers "what changes relative to my existing Map?" deterministically, working
ONLY against the stored baseline (no re-evaluation of scope, no new legal
assessment outside the map). Per-obligation classification -> ObligationDelta;
aggregate -> ChangeImpactSummary.
"""
from __future__ import annotations
from typing import List, Tuple
from compliance.reasoning.enums import Confidence
from .schemas import (
ChangeAssessment,
ChangeImpactSummary,
ChangeType,
ComplianceBaseline,
DeltaType,
ObligationDelta,
RegulatoryChange,
)
_ACTION = {DeltaType.NEW, DeltaType.CHANGED, DeltaType.NEEDS_REVIEW}
def _classify(
in_base: bool, has_ev: bool, change_type: ChangeType, rel_app: bool, rel_unc: bool
) -> Tuple[DeltaType, str, Confidence]:
if not (rel_app or rel_unc):
return DeltaType.NOT_APPLICABLE, "Die Änderung betrifft kein Regelwerk Ihrer Map.", Confidence.HIGH
if rel_unc and not rel_app:
return (
DeltaType.NEEDS_REVIEW,
"Betrifft ein für Ihr Produkt noch UNSICHERES Regelwerk — erst Anwendbarkeit klären.",
Confidence.LOW,
)
if change_type == ChangeType.REPEAL:
if in_base:
return DeltaType.REMOVED, "Regelwerk/Pflicht aufgehoben — entfällt für Ihr Produkt.", Confidence.HIGH
return DeltaType.NOT_APPLICABLE, "Aufhebung betrifft keine Ihrer bestehenden Pflichten.", Confidence.HIGH
if not in_base:
return DeltaType.NEW, "Neue Pflicht durch die Änderung — bisher nicht in Ihrer Map.", Confidence.MEDIUM
if change_type == ChangeType.GUIDANCE_UPDATE:
if has_ev:
return (
DeltaType.ALREADY_COVERED,
"Bestehende Pflicht mit vorhandenen Nachweisen — Leitlinien-Update vermutlich abgedeckt.",
Confidence.MEDIUM,
)
return DeltaType.NEEDS_REVIEW, "Bestehende Pflicht ohne Nachweis — Leitlinien-Update prüfen.", Confidence.MEDIUM
return DeltaType.CHANGED, "Bestehende Pflicht inhaltlich geändert — Umsetzung und Nachweis prüfen.", Confidence.MEDIUM
def assess_change(baseline: ComplianceBaseline, change: RegulatoryChange) -> ChangeAssessment:
snap = baseline.regulatory_map_snapshot
app_regs = {v.regulation_id for v in snap.applicable_regulations}
unc_regs = {v.regulation_id for v in snap.uncertain_regulations}
base_obs = set(baseline.applicable_obligations)
affected = set(change.affected_regulations)
rel_app = bool(affected & app_regs)
rel_unc = bool(affected & unc_regs)
affects_product = rel_app or rel_unc
deltas: List[ObligationDelta] = []
for ob in change.affected_obligations:
present = baseline.evidence_refs.get(ob, [])
required = baseline.obligation_evidence_required.get(ob, [])
dt, reason, conf = _classify(ob in base_obs, bool(present), change.change_type, rel_app, rel_unc)
missing = [e for e in required if e not in present] if dt in _ACTION else []
deltas.append(
ObligationDelta(
obligation_id=ob,
delta_type=dt,
reason=reason,
affected_evidence=list(present),
missing_evidence=missing,
confidence=conf,
)
)
return ChangeAssessment(
change_id=change.change_id,
affects_product=affects_product,
deltas=deltas,
summary=_summary(deltas, [d.domain for d in snap.unsupported_domains]),
)
def _ids(deltas: List[ObligationDelta], *types: DeltaType) -> List[str]:
wanted = set(types)
return [d.obligation_id for d in deltas if d.delta_type in wanted]
def _summary(deltas: List[ObligationDelta], unsupported: List[str]) -> ChangeImpactSummary:
n_new = len(_ids(deltas, DeltaType.NEW))
n_changed = len(_ids(deltas, DeltaType.CHANGED))
n_removed = len(_ids(deltas, DeltaType.REMOVED))
n_covered = len(_ids(deltas, DeltaType.ALREADY_COVERED))
n_review = len(_ids(deltas, DeltaType.NEEDS_REVIEW, DeltaType.CHANGED))
n_na = len(_ids(deltas, DeltaType.NOT_APPLICABLE))
return ChangeImpactSummary(
what_changed=(
"%d neu, %d geändert, %d entfällt, %d bereits abgedeckt, %d zu prüfen, %d nicht relevant."
% (n_new, n_changed, n_removed, n_covered, n_review, n_na)
),
what_matters_for_this_product=_ids(deltas, *_ACTION),
already_covered=_ids(deltas, DeltaType.ALREADY_COVERED),
needs_review=_ids(deltas, DeltaType.NEEDS_REVIEW, DeltaType.CHANGED),
not_relevant=_ids(deltas, DeltaType.NOT_APPLICABLE),
unsupported_domains=unsupported,
)
@@ -0,0 +1,92 @@
"""Regulatory Change Intelligence (RCI) — domain objects.
RCI is a read-/reasoning layer ON TOP of the product-first pipeline. It answers
"what changes relative to my existing Regulatory Map?" — NOT "what does the new
law say in general". A RegulatoryChange is simulated/provided INPUT (no ingestion,
no newsletter/mailbox, no RAG); the delta is computed against a stored
ComplianceBaseline (snapshot of the map).
`delta_type` is a THIRD vocabulary — distinct from `ClaimCoverage` (Welt 1, what
the customer claims) and `ComplianceStatus` (Welt 2, verified evidence). The three
must never be conflated. These are application/reasoning types, NOT
compliance-meta-model classes (architecture freeze v1.0 untouched).
"""
from __future__ import annotations
from enum import Enum
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from compliance.profile.canonical import CanonicalProductRegulatoryProfile
from compliance.reasoning.enums import AuthorityLevel, Confidence
from compliance.regulatory_map.schemas import RegulatoryMap
class DeltaType(str, Enum):
NEW = "new" # obligation now applies that was not in the baseline
CHANGED = "changed" # existing obligation substantively modified
REMOVED = "removed" # obligation no longer applies (repeal)
ALREADY_COVERED = "already_covered" # existing obligation, evidence likely suffices
NEEDS_REVIEW = "needs_review" # a human must check
NOT_APPLICABLE = "not_applicable" # change does not touch this product's map
class ChangeType(str, Enum):
NEW_REGULATION = "new_regulation"
AMENDMENT = "amendment"
REPEAL = "repeal"
GUIDANCE_UPDATE = "guidance_update"
# ── stored snapshot ──────────────────────────────────────────────────────
class ComplianceBaseline(BaseModel):
baseline_id: str
product_profile_snapshot: CanonicalProductRegulatoryProfile
regulatory_map_snapshot: RegulatoryMap
applicable_obligations: List[str] = Field(default_factory=list) # registry-linked obligation_ids
# required evidence per obligation (derived) — to compute missing_evidence
obligation_evidence_required: Dict[str, List[str]] = Field(default_factory=dict)
# evidence the customer ALREADY has, per obligation (provided)
evidence_refs: Dict[str, List[str]] = Field(default_factory=dict)
created_at: Optional[str] = None
# ── simulated/provided change (INPUT — never ingested) ───────────────────
class RegulatoryChange(BaseModel):
change_id: str
source: str = "simulated"
affected_regulations: List[str] = Field(default_factory=list)
affected_obligations: List[str] = Field(default_factory=list)
change_type: ChangeType
effective_date: Optional[str] = None
authority_level: AuthorityLevel = AuthorityLevel.LEGAL_TEXT
summary: str = ""
# ── per-obligation delta ─────────────────────────────────────────────────
class ObligationDelta(BaseModel):
obligation_id: str
delta_type: DeltaType
reason: str
affected_evidence: List[str] = Field(default_factory=list) # evidence already present for it
missing_evidence: List[str] = Field(default_factory=list) # required but not yet present
confidence: Confidence
# ── management-level summary ──────────────────────────────────────────────
class ChangeImpactSummary(BaseModel):
what_changed: str = ""
what_matters_for_this_product: List[str] = Field(default_factory=list) # need action
already_covered: List[str] = Field(default_factory=list)
needs_review: List[str] = Field(default_factory=list)
not_relevant: List[str] = Field(default_factory=list)
unsupported_domains: List[str] = Field(default_factory=list)
class ChangeAssessment(BaseModel):
change_id: str
affects_product: bool
deltas: List[ObligationDelta] = Field(default_factory=list)
summary: ChangeImpactSummary
+148
View File
@@ -0,0 +1,148 @@
"""Tests for Regulatory Change Intelligence (RCI).
Acceptance: a simulated RegulatoryChange against a stored ComplianceBaseline can
answer: (1) does it affect this product? (2) which obligations are new/changed?
(3) which are likely already covered by existing evidence? (4) what must a human
review? (5) what is not relevant?
"""
from __future__ import annotations
from compliance.profile.canonical import (
CanonicalLifecyclePhase,
CanonicalProductRegulatoryProfile,
CanonicalProductType,
EconomicOperatorRole,
)
from compliance.rci import (
ChangeType,
DeltaType,
RegulatoryChange,
assess_change,
create_baseline,
)
PROFILE = CanonicalProductRegulatoryProfile(
name="Industriespülmaschine",
product_type=CanonicalProductType.MACHINERY,
markets=["EU", "DE"],
economic_operator_role=EconomicOperatorRole.MANUFACTURER,
lifecycle_phase=CanonicalLifecyclePhase.PLACING_ON_MARKET,
is_machine=True,
is_component=False,
has_software_updates=True,
has_embedded_software=True,
has_remote_access=True,
technologies=["cloud", "ota_updates"],
)
# Evidence the customer already has, per obligation.
EVIDENCE = {"provide_security_updates": ["policy", "ticket"], "sbom_creation": ["sbom"]}
BASELINE = create_baseline(PROFILE, EVIDENCE, baseline_id="b1")
def _change(obs, regs=("CRA",), ctype=ChangeType.AMENDMENT, cid="c"):
return RegulatoryChange(
change_id=cid, affected_regulations=list(regs), affected_obligations=list(obs), change_type=ctype
)
def _by_id(assessment):
return {d.obligation_id: d for d in assessment.deltas}
# Baseline snapshots the registry-linked obligations from the frozen map.
def test_baseline_snapshots_registry_obligations():
assert "sbom_creation" in BASELINE.applicable_obligations
assert "provide_security_updates" in BASELINE.applicable_obligations
assert BASELINE.regulatory_map_snapshot.scope_resolved is True
# 1 + 2. affects the product + flags a NEW obligation.
def test_affects_product_and_new_obligation():
a = assess_change(BASELINE, _change(["cra_new_requirement_xyz"], cid="c1"))
assert a.affects_product is True
assert _by_id(a)["cra_new_requirement_xyz"].delta_type == DeltaType.NEW
# 2. an existing obligation amended -> CHANGED.
def test_existing_obligation_changed():
a = assess_change(BASELINE, _change(["sbom_creation"], cid="c2"))
assert _by_id(a)["sbom_creation"].delta_type == DeltaType.CHANGED
# 3. existing obligation with evidence + guidance update -> ALREADY_COVERED.
def test_already_covered_by_evidence():
a = assess_change(BASELINE, _change(["provide_security_updates"], ctype=ChangeType.GUIDANCE_UPDATE, cid="c3"))
assert _by_id(a)["provide_security_updates"].delta_type == DeltaType.ALREADY_COVERED
assert a.summary.already_covered == ["provide_security_updates"]
# 4. what a human must review (existing obligation without evidence).
def test_needs_review():
a = assess_change(BASELINE, _change(["vuln_handling_process"], ctype=ChangeType.GUIDANCE_UPDATE, cid="c4"))
assert _by_id(a)["vuln_handling_process"].delta_type == DeltaType.NEEDS_REVIEW
assert "vuln_handling_process" in a.summary.needs_review
assert "vuln_handling_process" in a.summary.what_matters_for_this_product
# 5. a change to a regulation NOT in the map -> not relevant.
def test_not_relevant_offmap_regulation():
a = assess_change(BASELINE, _change(["psd2_strong_customer_auth"], regs=["PSD2"], ctype=ChangeType.NEW_REGULATION, cid="c5"))
assert a.affects_product is False
assert _by_id(a)["psd2_strong_customer_auth"].delta_type == DeltaType.NOT_APPLICABLE
assert a.summary.not_relevant == ["psd2_strong_customer_auth"]
# repeal removes an existing obligation.
def test_repeal_removes_existing():
a = assess_change(BASELINE, _change(["sbom_creation"], ctype=ChangeType.REPEAL, cid="c6"))
assert _by_id(a)["sbom_creation"].delta_type == DeltaType.REMOVED
# missing evidence is computed against the obligation's required evidence.
def test_missing_evidence_on_changed():
a = assess_change(BASELINE, _change(["sbom_creation"], cid="c7")) # requires sbom+repo_scan, has sbom
d = _by_id(a)["sbom_creation"]
assert "sbom" in d.affected_evidence
assert "repo_scan" in d.missing_evidence
# a change to an UNCERTAIN regulation -> needs review (resolve applicability first).
def test_uncertain_regulation_needs_review():
a = assess_change(BASELINE, _change(["red_cyber_req"], regs=["RED"], cid="c8"))
assert a.affects_product is True # RED is in the map's uncertain bucket
assert _by_id(a)["red_cyber_req"].delta_type == DeltaType.NEEDS_REVIEW
# RCI answers "vs my map", not "what does the law say" — and works only on the snapshot.
def test_works_against_stored_map_no_reevaluation():
# a change with no affected_obligations still resolves affects_product from the map
a = assess_change(BASELINE, RegulatoryChange(change_id="c9", affected_regulations=["CRA"], affected_obligations=[], change_type=ChangeType.AMENDMENT))
assert a.affects_product is True
assert a.deltas == []
# delta_type is a THIRD vocabulary, disjoint from ClaimCoverage (Welt 1).
def test_delta_vocabulary_distinct_from_claimcoverage():
from compliance.reasoning.enums import ClaimCoverage
assert {d.value for d in DeltaType}.isdisjoint({c.value for c in ClaimCoverage})
# the management summary aggregates the five buckets coherently.
def test_summary_buckets():
a = assess_change(
BASELINE,
RegulatoryChange(
change_id="c10",
affected_regulations=["CRA"],
affected_obligations=["cra_new_one", "sbom_creation", "provide_security_updates"],
change_type=ChangeType.AMENDMENT,
),
)
s = a.summary
assert "cra_new_one" in s.what_matters_for_this_product # NEW
assert "sbom_creation" in s.needs_review # CHANGED -> review
assert s.what_changed # non-empty management line