feat(optimization): Regulatory Optimization — Roadmap/Management renderer over the Capability Delta

Roadmap item 5. GAP analysis and measure-prioritisation are the SAME computation: Required −
Known = the Capability Delta. The Capability Delta Engine (RS-005) computes it once; renderers
read that ONE delta. Interview Renderer (missing info → questions) was already built; this adds
the Roadmap/Management Renderer (missing capabilities → measures ranked by regulatory leverage).

- compliance/optimization/: regulatory_leverage() + select_within_budget() (pure leverage math)
  + roadmap_from_delta(assessment, ...) — the keystone binding optimization to the RS-005 delta
  (dependency optimization → transition_reasoning, acyclic; the delta engine stays hermetic).
  leverage(measure) = number of regulatory requirements it closes at once (e.g. patch management
  → CRA+MaschinenVO+IEC62443+ISO27001 = 4). No new corpus, no new meta-model class (freeze v1.0).
- Welt-1 honesty: percentages are exact count ratios over the IDENTIFIED requirements (the known
  delta), never "% gesetzeskonform".
- reference suite: "Regulatory Optimization" section runs the SAME convergence delta → ranked
  measures + budget answer + the management sentence "of N identified requirements you close M
  with the top-K measures (X%) — highest regulatory leverage".
- ADR-003: Capability Delta Engine — one delta, many renderers; rename Gap → Capability Delta.

13 optimization tests (31 with transition+company), mypy --strict clean, check-loc 0.
Product code with no app caller + ADR/reference = non-runtime → no deploy (ADR-001).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-27 09:49:38 +02:00
parent ffff9bb592
commit cfafa31ea2
7 changed files with 448 additions and 2 deletions
@@ -0,0 +1,21 @@
"""Regulatory Optimization — the Roadmap / Management renderer of the Capability Delta Engine.
Ranks the OPEN Capability Delta (from RS-005) by regulatory leverage: which measure closes the
most regulatory requirements at once. Answers the Geschäftsführer question "Womit anfangen?".
Pure, deterministic, computed-not-stored. Consumes the RS-005 delta (acyclic dependency); the
delta engine stays hermetic. No new corpus, no new meta-model class (freeze v1.0).
"""
from __future__ import annotations
from .engine import regulatory_leverage, roadmap_from_delta, select_within_budget
from .schemas import BudgetPlan, OptimizationPlan, RankedMeasure
__all__ = [
"regulatory_leverage",
"select_within_budget",
"roadmap_from_delta",
"OptimizationPlan",
"RankedMeasure",
"BudgetPlan",
]
@@ -0,0 +1,134 @@
"""Regulatory Optimization — the Roadmap / Management RENDERER of the Capability Delta Engine.
GAP analysis and measure-prioritisation are TWO VIEWS OF THE SAME COMPUTATION. The Capability
Delta Engine (`compliance/transition_reasoning`, RS-005) computes Required - Known = the
Capability Delta once. Renderers read that ONE delta:
- Interview Renderer (missing INFORMATION -> questions) = `TransitionQuestionRequest` (built)
- Roadmap / Management Renderer (missing CAPABILITIES -> measures by leverage) = THIS module
- Evidence Renderer (missing EVIDENCE -> upload requests) = later
There is one truth, not a Gap engine and a separate Roadmap engine.
A measure (a capability to implement) has *regulatory leverage* = the number of distinct
regulatory requirements it closes AT ONCE (e.g. patch management closes a CRA, a MaschinenVO,
an IEC 62443 and an ISO 27001 requirement -> leverage 4). The product turns from "you have N
obligations" into "of N identified requirements you only need M measures — and these K first".
Fully deterministic, computed-not-stored, NO new corpus. `regulatory_leverage`/`select_within_budget`
are pure math over `capability -> requirements`; `roadmap_from_delta` binds them to the RS-005
delta (dependency optimization -> transition_reasoning, acyclic; the delta engine stays hermetic).
No new graph/meta-model class (freeze v1.0). Python 3.9 compatible.
Honesty (Welt-1): the percentages are exact count ratios over the IDENTIFIED requirements from
the known patterns — never "% gesetzeskonform". Label outputs as "der identifizierten Anforderungen".
"""
from __future__ import annotations
from typing import Dict, List, Optional
from ..transition_reasoning import CoverageStatus, TransitionAssessment
from .schemas import BudgetPlan, OptimizationPlan, RankedMeasure
def _ranked(
capability_requirements: Dict[str, List[str]], in_scope: Optional[List[str]]
) -> List[RankedMeasure]:
"""Rank measures: leverage desc, then capability_id asc (deterministic). Empty covers dropped."""
scope = (
set(in_scope)
if in_scope is not None
else {r for reqs in capability_requirements.values() for r in reqs}
)
measures: List[RankedMeasure] = []
for cap, reqs in capability_requirements.items():
covers = sorted({r for r in reqs if r in scope})
if not covers:
continue # this capability closes nothing in scope -> not a measure here
measures.append(RankedMeasure(capability_id=cap, covers=covers, leverage=len(covers)))
measures.sort(key=lambda m: (-m.leverage, m.capability_id))
total = sum(m.leverage for m in measures)
running = 0
for m in measures:
running += m.leverage
m.cumulative_requirements = running
m.cumulative_coverage = (running / total) if total else 0.0
return measures
def regulatory_leverage(
capability_requirements: Dict[str, List[str]], in_scope: Optional[List[str]] = None
) -> OptimizationPlan:
"""Rank measures by regulatory leverage; report the compression (requirements -> measures).
`capability_requirements`: measure (capability_id) -> the requirement keys it satisfies. A
requirement key is currently a regulation (via `covers_targets`); finer obligation granularity
is a future extension. `in_scope`: restrict the requirement keys counted (default: all seen).
"""
measures = _ranked(capability_requirements, in_scope)
scope = sorted(
set(in_scope)
if in_scope is not None
else {r for reqs in capability_requirements.values() for r in reqs}
)
total = sum(m.leverage for m in measures)
avg = (total / len(measures)) if measures else 0.0
headline = (
"%d identifizierte Anforderungen aus %d Regelwerken -> %d Massnahmen (Ø Hebel %.1f)."
% (total, len(scope), len(measures), avg)
)
return OptimizationPlan(
in_scope_requirements=scope,
total_measures=len(measures),
total_requirements=total,
ranked_measures=measures,
headline=headline,
)
def select_within_budget(
capability_requirements: Dict[str, List[str]],
budget: int,
in_scope: Optional[List[str]] = None,
) -> BudgetPlan:
"""The budget answer: with K measures, pick the K highest-leverage ones and report coverage.
Because each requirement key is closed by exactly one measure here, greedy-by-leverage is the
optimal cover, so ranking == selection. (When requirements become shared across capabilities,
this becomes weighted set-cover; the signature is ready for that.)
"""
measures = _ranked(capability_requirements, in_scope)
total = sum(m.leverage for m in measures)
k = max(0, budget)
selected = measures[:k]
closed = selected[-1].cumulative_requirements if selected else 0
ratio = (closed / total) if total else 0.0
headline = (
"Mit den Top-%d Massnahmen (nach regulatorischem Hebel) schliessen Sie %d von %d "
"identifizierten Anforderungen (%.0f%%)." % (len(selected), closed, total, ratio * 100)
)
return BudgetPlan(
budget=budget,
selected_capabilities=[m.capability_id for m in selected],
requirements_closed=closed,
total_requirements=total,
coverage_ratio=ratio,
headline=headline,
)
def roadmap_from_delta(
assessment: TransitionAssessment,
capability_requirements: Dict[str, List[str]],
in_scope: Optional[List[str]] = None,
open_statuses: Optional[List[CoverageStatus]] = None,
) -> OptimizationPlan:
"""Render the Roadmap view FROM a Capability Delta (an RS-005 `TransitionAssessment`).
Takes the OPEN capabilities of the delta — MISSING by default — and ranks them by regulatory
leverage. This is the same delta the Interview Renderer turns into questions; here it becomes
prioritised measures. The binding that makes "one truth, two renderers" real in code.
"""
statuses = set(open_statuses) if open_statuses is not None else {CoverageStatus.MISSING}
open_caps = [c.capability_id for c in assessment.coverage if c.status in statuses]
delta_reqs = {cap: capability_requirements.get(cap, []) for cap in open_caps}
return regulatory_leverage(delta_reqs, in_scope)
@@ -0,0 +1,48 @@
"""Schemas for the Regulatory Optimization Engine.
These DTOs are *derived views* (computed-not-stored): nothing here is persisted; every value
is recomputed from the input each call. No new meta-model class, no graph (freeze v1.0).
Python 3.9 compatible (no `|` unions).
"""
from __future__ import annotations
from typing import List
from pydantic import BaseModel, Field
class RankedMeasure(BaseModel):
"""One measure (a capability to implement) ranked by its regulatory leverage."""
capability_id: str
covers: List[str] = Field(default_factory=list) # the in-scope requirements it satisfies
leverage: int = 0 # = len(covers): how many it closes at once
cumulative_requirements: int = 0 # running total of requirements closed (ranked order)
cumulative_coverage: float = 0.0 # cumulative_requirements / total_requirements (0..1)
class OptimizationPlan(BaseModel):
"""Measures ranked by regulatory leverage — greatest regulatory effect first.
`total_requirements` counts the IDENTIFIED requirements in scope (the known delta from the
patterns), NOT a company's total legal duties. The percentages are exact count ratios over
this identified set — never a compliance verdict (Welt-1 discipline).
"""
in_scope_requirements: List[str] = Field(default_factory=list) # the distinct requirement keys counted
total_measures: int = 0 # number of distinct measures (delta capabilities)
total_requirements: int = 0 # Sum of leverage = identified requirements closable
ranked_measures: List[RankedMeasure] = Field(default_factory=list)
headline: str = "" # "N identifizierte Anforderungen -> M Massnahmen ..."
class BudgetPlan(BaseModel):
"""The budget answer: with a budget of K measures, which K and how much do they close?"""
budget: int = 0
selected_capabilities: List[str] = Field(default_factory=list)
requirements_closed: int = 0
total_requirements: int = 0
coverage_ratio: float = 0.0 # requirements_closed / total_requirements (0..1)
headline: str = ""
@@ -38,6 +38,7 @@ from compliance.transition_reasoning import (
TransitionContext, TransitionGoal, TargetType, TargetRequirement, assess_transition, CoverageStatus,
regulatory_convergence,
)
from compliance.optimization import roadmap_from_delta, select_within_budget
import os
import yaml
@@ -411,6 +412,36 @@ coverage_table([
("Cross-Regulation Capability Mapping", "PASS", _conv.headline),
])
# ── Regulatory Optimization — Roadmap-Renderer über DEMSELBEN Capability Delta ───
w("## Regulatory Optimization — größter regulatorischer Hebel zuerst")
w("")
w("_Dieselbe Berechnung wie die GAP-Analyse, anderer Renderer: das **Capability Delta** (RS-005) wird nach **regulatorischem Hebel** priorisiert (eine Maßnahme schließt N Regelwerke gleichzeitig). Welt-1: % über die IDENTIFIZIERTEN Anforderungen, kein Compliance-Urteil._")
w("")
_opt = roadmap_from_delta(_cp_a, _delta_t) # SAME delta the Interview Renderer turns into questions
_open_reqs = {_m.capability_id: _m.covers for _m in _opt.ranked_measures}
w("**Kompression:** %s" % _opt.headline)
w("")
w("**Top-Maßnahmen nach regulatorischem Hebel (Roadmap):**")
w("")
w("| # | Maßnahme | Hebel | deckt | kumuliert |")
w("|---|---|---|---|---|")
for _i, _m in enumerate(_opt.ranked_measures[:6], 1):
w("| %d | `%s` | **%d** | %s | %d/%d (%.0f%%) |" % (
_i, _m.capability_id, _m.leverage, "+".join(_m.covers),
_m.cumulative_requirements, _opt.total_requirements, _m.cumulative_coverage * 100))
w("")
_bud = select_within_budget(_open_reqs, 5)
w('**Managementsatz:** „Wenn Sie zuerst diese %d Maßnahmen umsetzen, schließen Sie %d von %d identifizierten Anforderungen (%.0f%%) — höchster regulatorischer Hebel." (Hebel skaliert mit jedem weiteren Regelwerk/Convergence-Pattern.)'
% (len(_bud.selected_capabilities), _bud.requirements_closed, _bud.total_requirements, _bud.coverage_ratio * 100))
w("")
w("_Eine Wahrheit, zwei Renderer: dasselbe Capability Delta liefert dem Auditor **Fragen** (Interview) und dem GF **Maßnahmen** (Roadmap)._")
w("")
coverage_table([
("Capability Delta Engine (RS-005)", "PASS", "ein Delta, mehrere Renderer"),
("Roadmap/Management Renderer (Hebel)", "PASS", _opt.headline),
("Budget-Priorisierung", "PASS", "Top-5 → %.0f%% der identifizierten Anforderungen" % (_bud.coverage_ratio * 100)),
])
# ── Epics + roll-up ───────────────────────────────────────────────────────
w("## Gaps → Epics (Backlog — nur erfasst, NICHT implementiert)")
w("")
@@ -233,6 +233,35 @@ _Der USP: welche Capability deckt MEHRERE Regelwerke gleichzeitig? (Convergence
| Regulatory Convergence Pattern | **PASS** | 2 Targets, 12 Delta-Capabilities |
| Cross-Regulation Capability Mapping | **PASS** | 4 von 12 Capabilities decken >= 2 Regelwerke gleichzeitig ab (CRA + MaschinenVO). |
## Regulatory Optimization — größter regulatorischer Hebel zuerst
_Dieselbe Berechnung wie die GAP-Analyse, anderer Renderer: das **Capability Delta** (RS-005) wird nach **regulatorischem Hebel** priorisiert (eine Maßnahme schließt N Regelwerke gleichzeitig). Welt-1: % über die IDENTIFIZIERTEN Anforderungen, kein Compliance-Urteil._
**Kompression:** 16 identifizierte Anforderungen aus 2 Regelwerken -> 12 Massnahmen (Ø Hebel 1.3).
**Top-Maßnahmen nach regulatorischem Hebel (Roadmap):**
| # | Maßnahme | Hebel | deckt | kumuliert |
|---|---|---|---|---|
| 1 | `ce_conformity_assessment_and_technical_documentation` | **2** | CRA+MaschinenVO | 2/16 (12%) |
| 2 | `product_cyber_risk_assessment` | **2** | CRA+MaschinenVO | 4/16 (25%) |
| 3 | `protection_against_corruption_of_safety_functions` | **2** | CRA+MaschinenVO | 6/16 (38%) |
| 4 | `secure_signed_update_distribution` | **2** | CRA+MaschinenVO | 8/16 (50%) |
| 5 | `coordinated_vulnerability_disclosure` | **1** | CRA | 9/16 (56%) |
| 6 | `exploited_vuln_and_incident_reporting` | **1** | CRA | 10/16 (62%) |
**Managementsatz:** „Wenn Sie zuerst diese 5 Maßnahmen umsetzen, schließen Sie 9 von 16 identifizierten Anforderungen (56%) — höchster regulatorischer Hebel." (Hebel skaliert mit jedem weiteren Regelwerk/Convergence-Pattern.)
_Eine Wahrheit, zwei Renderer: dasselbe Capability Delta liefert dem Auditor **Fragen** (Interview) und dem GF **Maßnahmen** (Roadmap)._
**Architecture Coverage**
| Layer | Status | Hinweis |
|---|---|---|
| Capability Delta Engine (RS-005) | **PASS** | ein Delta, mehrere Renderer |
| Roadmap/Management Renderer (Hebel) | **PASS** | 16 identifizierte Anforderungen aus 2 Regelwerken -> 12 Massnahmen (Ø Hebel 1.3). |
| Budget-Priorisierung | **PASS** | Top-5 → 56% der identifizierten Anforderungen |
## Gaps → Epics (Backlog — nur erfasst, NICHT implementiert)
| Epic | Titel | schliesst Coverage-Luecke |
@@ -244,6 +273,6 @@ _Der USP: welche Capability deckt MEHRERE Regelwerke gleichzeitig? (Convergence
## Suite-Status (Roll-up)
- Coverage-Zellen gesamt: **29**
- PASS: **21** · PARTIAL: 3 · UNSUPPORTED: 1 · TODO: 3 · N/A: 1 · NEEDS_FACTS: 0
- Coverage-Zellen gesamt: **32**
- PASS: **24** · PARTIAL: 3 · UNSUPPORTED: 1 · TODO: 3 · N/A: 1 · NEEDS_FACTS: 0
- Fortschritt = PASS-Anteil steigt, wenn Epics RS-001…004 landen (objektiver Maßstab, kein LOC).
@@ -0,0 +1,127 @@
"""Tests for the Regulatory Optimization renderer (Roadmap / Management view of the Capability Delta).
Acceptance: rank measures by regulatory leverage (most regulatory requirements closed at once),
report the compression (identified requirements -> measures), answer the budget question, and bind
to the SAME RS-005 Capability Delta the Interview Renderer uses. Percentages are over IDENTIFIED
requirements (Welt-1), never "% gesetzeskonform".
"""
from __future__ import annotations
from compliance.optimization import (
BudgetPlan, OptimizationPlan, regulatory_leverage, roadmap_from_delta, select_within_budget,
)
from compliance.transition_reasoning import (
CapabilityCoverage, CoverageStatus, TransitionAssessment, TransitionSummary,
)
# Illustrative leverage spread (the user's patch_management example reaches leverage 4).
CAPS = {
"patch_management": ["CRA", "MaschinenVO", "IEC62443", "ISO27001"], # leverage 4
"access_control": ["CRA", "ISO27001"], # leverage 2
"sbom": ["CRA"], # leverage 1
"machine_guards": ["MaschinenVO"], # leverage 1
}
def test_ranked_by_leverage_desc():
plan = regulatory_leverage(CAPS)
order = [m.capability_id for m in plan.ranked_measures]
assert order[0] == "patch_management" # leverage 4 first
assert order[1] == "access_control" # leverage 2 next
assert plan.ranked_measures[0].leverage == 4
def test_compression_counts():
plan = regulatory_leverage(CAPS)
# total requirements = 4 + 2 + 1 + 1 = 8 closed by 4 measures
assert plan.total_requirements == 8 and plan.total_measures == 4
assert "8 identifizierte Anforderungen" in plan.headline
def test_cumulative_coverage_monotone_to_one():
plan = regulatory_leverage(CAPS)
cums = [m.cumulative_coverage for m in plan.ranked_measures]
assert cums == sorted(cums) # non-decreasing
assert abs(cums[-1] - 1.0) < 1e-9 # full set -> 100%
assert plan.ranked_measures[0].cumulative_requirements == 4
def test_tie_break_deterministic_by_id():
# machine_guards vs sbom both leverage 1 -> alphabetical: machine_guards before sbom
plan = regulatory_leverage(CAPS)
tail = [m.capability_id for m in plan.ranked_measures if m.leverage == 1]
assert tail == ["machine_guards", "sbom"]
def test_budget_picks_highest_leverage():
b = select_within_budget(CAPS, 2)
assert b.selected_capabilities == ["patch_management", "access_control"]
assert b.requirements_closed == 6 and b.total_requirements == 8
assert abs(b.coverage_ratio - 0.75) < 1e-9
assert "6 von 8" in b.headline and "75%" in b.headline
def test_budget_over_and_zero():
assert select_within_budget(CAPS, 99).requirements_closed == 8 # capped at all
z = select_within_budget(CAPS, 0)
assert z.selected_capabilities == [] and z.requirements_closed == 0
def test_in_scope_filter():
# restrict to CRA + MaschinenVO: patch=2, access=1(CRA), sbom=1, guards=1 -> total 5
plan = regulatory_leverage(CAPS, in_scope=["CRA", "MaschinenVO"])
assert plan.total_requirements == 5
assert plan.ranked_measures[0].capability_id == "patch_management"
assert plan.ranked_measures[0].leverage == 2 # only CRA+MaschinenVO counted
def test_deterministic():
a, b = regulatory_leverage(CAPS), regulatory_leverage(CAPS)
assert [m.capability_id for m in a.ranked_measures] == [m.capability_id for m in b.ranked_measures]
assert a.headline == b.headline
def test_empty():
plan = regulatory_leverage({})
assert plan.total_requirements == 0 and plan.ranked_measures == []
assert isinstance(plan, OptimizationPlan)
def test_capability_with_no_in_scope_requirement_dropped():
plan = regulatory_leverage({"x": ["CRA"], "y": ["DataAct"]}, in_scope=["CRA"])
assert [m.capability_id for m in plan.ranked_measures] == ["x"] # y covers nothing in scope
# The keystone: optimization renders the SAME RS-005 delta the interview uses.
def _delta():
return TransitionAssessment(
target_id="CRA+MaschinenVO",
coverage=[
CapabilityCoverage(capability_id="patch_management", status=CoverageStatus.MISSING),
CapabilityCoverage(capability_id="sbom", status=CoverageStatus.MISSING),
CapabilityCoverage(capability_id="access_control", status=CoverageStatus.ALREADY_COVERED),
CapabilityCoverage(capability_id="machine_guards", status=CoverageStatus.MISSING),
],
summary=TransitionSummary(),
)
def test_roadmap_from_delta_uses_only_open_capabilities():
plan = roadmap_from_delta(_delta(), CAPS)
ids = [m.capability_id for m in plan.ranked_measures]
assert "access_control" not in ids # ALREADY_COVERED is not an open measure
assert ids == ["patch_management", "machine_guards", "sbom"] # MISSING ranked by leverage
assert isinstance(plan, OptimizationPlan)
def test_roadmap_from_delta_honours_status_filter():
# include NEEDS_CONFIRMATION too -> still only those present in CAPS contribute requirements
a = _delta()
a.coverage.append(CapabilityCoverage(capability_id="access_control", status=CoverageStatus.NEEDS_CONFIRMATION))
plan = roadmap_from_delta(a, CAPS, open_statuses=[CoverageStatus.MISSING, CoverageStatus.NEEDS_CONFIRMATION])
assert "access_control" in [m.capability_id for m in plan.ranked_measures]
def test_budget_returns_budgetplan_type():
assert isinstance(select_within_budget(CAPS, 1), BudgetPlan)