Files
breakpilot-compliance/backend-compliance/reference_scenarios/generate.py
T
Benjamin Admin a0f72fc39b feat(rts): extend Reference Transition Scenarios to multi-regulation (CRA + MaschinenVO)
Roadmap item 2: the RTS now pin MaschinenVO + convergence Expected Outcomes, so the
convergence USP is a living regression, not just a one-off section.

- RTS-003 (machine + ISMS, networked): full multi-regulation archetype — maschinenvo
  expected_delta + convergence expected_multi_target (links TP-ISO27001-CRA-MaschinenVO-v1).
  Generator runs the convergence pattern through RS-005: 4/4 machine-safety delta MISSING +
  4/4 expected multi-target caps converge. PASS.
- RTS-001 (component): MaschinenVO modeled as `uncertain` (a pure component is usually not a
  machine; deciding question is_safety_component) — engine must never assert it applies. Honest,
  parallel to the Data-Act handling.
- RTS-002 (machine, QMS-only): MaschinenVO `applies` (is_machine) but LOW convergence — no ISMS
  means the cyber side is entirely delta, so few caps are shared. The honest contrast that the
  convergence USP rewards companies who already run an ISMS.
- generator: per-RTS maschinenvo/convergence Soll-Ist checks; convergence pattern run once and
  reused. Data Act stays `uncertain` everywhere, never asserted.

All 3 RTS PASS. 18 tests (transition+company), mypy --strict clean, check-loc 0.
Non-runtime (knowledge + reference harness) -> no deploy (ADR-001).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-27 09:26:01 +02:00

435 lines
26 KiB
Python

# ruff: noqa
# mypy: ignore-errors
"""Reference Scenario Suite v1 — generator (living regression reference).
Runs the REAL deployed engines for three real customer scenarios and emits a
markdown reference artifact. Per scenario it derives an "Architecture Coverage"
table from the actual run, so when a new domain lands the cells flip automatically
(e.g. Sz2/Environmental UNSUPPORTED -> PASS). This is the objective measure of
"is BreakPilot better than six months ago" — not LOC, not features.
Run: cd backend-compliance && PYTHONPATH=. python3 reference_scenarios/generate.py
Not product code; not imported by the app.
"""
from __future__ import annotations
from typing import List, Tuple
from compliance.profile.canonical import (
CanonicalProductRegulatoryProfile as P, CanonicalProductType as PT,
EconomicOperatorRole as Role, CanonicalLifecyclePhase as LP,
ProductComponent as Comp, ComponentKind as CK, EnvironmentalImpact as Env,
)
from compliance.product_scope.orchestrator import resolve_product_scope
from compliance.product_scope.schemas import ScopeStatus
from compliance.regulatory_map.renderer import render_regulatory_map
from compliance.interpretation_map.adapter import interpret_in_map
from compliance.rci import create_baseline, assess_change, RegulatoryChange, ChangeType
from compliance.company import (
CompanyContext, Certification, Declaration, ExistingEvidence,
CapabilityMappingEntry, build_company_profile,
)
from compliance.capability import (
CapabilityRegistry, CapabilityCandidate, CapabilityRelation, RelationType,
EvidenceKind, mint_capability, evaluate_relation,
)
from compliance.reasoning.enums import Confidence
from compliance.transition_reasoning import (
TransitionContext, TransitionGoal, TargetType, TargetRequirement, assess_transition, CoverageStatus,
regulatory_convergence,
)
import os
import yaml
Row = Tuple[str, str, str]
OUT: List[str] = []
ROLLUP: List[str] = []
def w(s: str = "") -> None:
OUT.append(s)
def coverage_table(rows: List[Row]) -> None:
w("**Architecture Coverage**")
w("")
w("| Layer | Status | Hinweis |")
w("|---|---|---|")
for layer, status, note in rows:
w("| %s | **%s** | %s |" % (layer, status, note))
ROLLUP.append(status)
w("")
def reg_map_block(rmap) -> None:
w("**Expected Regulatory Map**")
w("")
w("> " + rmap.executive_summary)
w("")
for v in rmap.applicable_regulations:
obs = ", ".join(o.obligation_id for o in v.obligations) or v.obligations_note
w("- **%s** (%s) — Pflichten: %s" % (v.regulation_id, v.name, obs))
for u in rmap.uncertain_regulations:
w("- _unsicher_ %s — fehlt: %s" % (u.regulation_id, ", ".join(u.missing_facts) or "-"))
for ov in rmap.overlaps:
w("- Overlap %s: %s" % (ov.overlap_group_id, ", ".join(ov.shared_obligations)))
for ev, ids in rmap.shared_evidence.items():
w("- 1 Nachweis `%s` => %d Pflichten" % (ev, len(ids)))
w("")
def unsupported_block(rmap) -> None:
w("**Expected Unsupported Domains**")
w("")
if not rmap.unsupported_domains:
w("- keine — alle getriggerten Domaenen sind im Korpus")
for d in rmap.unsupported_domains:
w("- `%s` (Trigger: %s) -> %s" % (d.domain, d.trigger, d.note))
w("")
ISO_MAP = {"ISO27001": CapabilityMappingEntry(
capability_ids=["cap_incident_response", "cap_supplier_management", "cap_asset_management"],
confidence=Confidence.MEDIUM)}
def interp_status(verdict_value: str) -> str:
return "PARTIAL" if verdict_value in ("uncertain", "unsupported") else "PASS"
w("# Reference Scenario Suite v1")
w("")
w("> **Kein Doku-Artefakt — die erste Ground Truth / Living Reference Suite.** Erzeugt aus den "
"REALEN deployten Engines (aktueller deployter main) via `reference_scenarios/generate.py`. Jede "
"`Architecture Coverage`-Zelle ist aus dem echten Lauf ABGELEITET; sobald eine Domaene landet, "
"kippt die Zelle automatisch (z. B. Sz2/Environmental UNSUPPORTED -> PASS). Beantwortet dauerhaft: "
'„Ist BreakPilot besser als vor sechs Monaten?" — anhand echter Kundensituationen, nicht LOC.')
w("")
w("Synthetische `Cert->Capability`-Mappings sind als ILLUSTRATIV markiert; die echte Tabelle gehoert "
"Compliance Execution (siehe Master Capability Registry).")
w("")
# ── Scenario 1 ────────────────────────────────────────────────────────────
w("## Szenario 1 — Maschinenbauer mit ISMS + SBOM + Remote Access")
w("")
w('**Frage:** „Was gilt fuer uns, und reicht das?"')
w("")
p1 = P(name="Industrielle Verpackungsmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"],
economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET,
is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True,
has_remote_access=True, connected_to_internet=True, has_security_function=True,
has_sbom=True, has_incident_response=True, technologies=["cloud", "ota_updates"],
existing_certifications=["ISO27001"])
sc1 = resolve_product_scope(p1)
rmap1 = render_regulatory_map(p1)
w("**Input:** Maschine, vernetzt (Remote/Cloud), Firmware, Rolle Hersteller, Maerkte EU/DE; "
"Company: ISMS (ISO27001) + SBOM + Incident Response.")
w("")
reg_map_block(rmap1)
w("**Input — Company Context** _(ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management)_")
w("")
ctx1 = CompanyContext(company_id="maschinenbau",
certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")],
declarations=[Declaration(capability_id="cap_patch_management")],
evidence=[ExistingEvidence(evidence_id="sbom.json", evidence_type="sbom", proves_capability_id="cap_sbom_management")])
prof1 = build_company_profile(ctx1, ISO_MAP)
for c in prof1.candidate_capabilities:
w("- candidate: %s%s (%s)" % (c.capability_id, c.verification_status.value, c.source))
for c in prof1.confirmed_capabilities:
w("- CONFIRMED: %s%s (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources)))
w("")
ie1 = interpret_in_map(rmap1, "Wir liefern Sicherheitsupdates fuenf Jahre lang, damit ist der CRA erfuellt.")
w("**Expected Interpretation**")
w("")
w('> Auslegung „5 Jahre Updates -> CRA erfuellt" -> **%s**' % ie1.assessment.value)
w("> " + ie1.explanation)
w("")
base1 = create_baseline(p1, {"sbom_creation": ["sbom"], "provide_security_updates": ["policy"]}, baseline_id="s1")
a1 = assess_change(base1, RegulatoryChange(change_id="cra-2026-amd", affected_regulations=["CRA"],
affected_obligations=["cra_new_disclosure_duty", "sbom_creation", "vuln_handling_process"],
change_type=ChangeType.AMENDMENT))
w("**Expected RCI** _(CRA-Novelle gegen gespeicherte Baseline)_")
w("")
w("> affects_product = %s%s" % (a1.affects_product, a1.summary.what_changed))
for d in a1.deltas:
w("- %s -> **%s** (fehlende Nachweise: %s)" % (d.obligation_id, d.delta_type.value, ", ".join(d.missing_evidence) or "-"))
w("")
unsupported_block(rmap1)
w("**Known Gaps:** Interpretation kennt kein CRA-Muster (RS-001) · MaschinenVO/EMV-Pflichten nicht "
"registry-verlinkt (RS-004) · cap/MCAP/Pflicht-Evidence nicht gejoint = Company-Gap (RS-003).")
w("")
coverage_table([
("Company Context", "PASS", "ISO27001 + SBOM + Declaration"),
("Product Profile", "PASS", "Maschine, vernetzt, Firmware"),
("Navigator", "PASS", "ready_for_scope"),
("Scope", "PASS" if sc1.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", "CRA/MaschinenVO/EMV + 3 unsicher"),
("Regulatory Map", "PASS", "Overlaps + 1-Nachweis-N-Pflichten"),
("CRA Obligations", "PASS", "12 registry-verlinkt"),
("MaschinenVO/EMV Obligations", "PARTIAL", "Scope ja, Pflichten nicht verlinkt → RS-004"),
("Interpretation", interp_status(ie1.assessment.value), "kein CRA-Muster → RS-001"),
("RCI", "PASS", "1 neu, 2 geaendert"),
("Company Gap", "TODO", "cap↔MCAP↔Pflicht nicht gejoint → RS-003"),
("Environmental", "N/A", "keine Umwelt-Trigger"),
])
# ── Scenario 2 ────────────────────────────────────────────────────────────
w("## Szenario 2 — Industriespuelmaschine mit Abwasser/Chemikalien")
w("")
w('**Frage:** „Welche Umweltbereiche sind noch nicht abgedeckt?"')
w("")
p2 = P(name="Industriespuelmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"],
economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET,
is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True,
components=[Comp(name="Dosierpumpe", kind=CK.CHEMICAL_DOSING), Comp(name="Abwasserauslass", kind=CK.WASTEWATER_OUTLET)],
environmental=Env(discharges_to_wastewater=True, uses_cleaning_chemicals=True, consumes_energy_or_water=True))
sc2 = resolve_product_scope(p2)
rmap2 = render_regulatory_map(p2)
w("**Input:** Maschine mit Chemikalien-Dosierung + Abwasserauslass; Umwelt-Trigger gesetzt.")
w("")
reg_map_block(rmap2)
unsupported_block(rmap2)
ie2 = interpret_in_map(rmap2, "Unsere Abwassereinleitung mit Reinigungschemikalien ist sicher abgedeckt.")
w("**Expected Interpretation** _(Umwelt -> bewusst nicht bewertet)_")
w("")
w("> " + ie2.explanation)
w("")
w("**Known Gaps:** Abwasser/Chemikalien/Energie sind `unsupported_domain` — Environmental Corpus fehlt (RS-002).")
w("")
env_status = "UNSUPPORTED" if rmap2.unsupported_domains else "PASS"
coverage_table([
("Product Profile", "PASS", "Maschine + Umwelt-Komponenten"),
("Scope", "PASS" if sc2.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", ""),
("Regulatory Map", "PASS", "CRA/MaschinenVO/EMV"),
("Environmental (Abwasser/Chemikalien/Energie)", env_status, 'ehrlich „noch nicht im Korpus" → RS-002'),
("Interpretation (Umwelt)", "PARTIAL", "future_corpus_needed statt Scheinsicherheit"),
])
# ── Scenario 3 ────────────────────────────────────────────────────────────
w("## Szenario 3 — ISO27001-zertifiziertes Unternehmen")
w("")
w('**Frage:** „Welche Capabilities sind inferred, declared oder confirmed?"')
w("")
w("_ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management_")
w("")
ctx3 = CompanyContext(company_id="iso-kunde",
certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")],
declarations=[Declaration(capability_id="cap_patch_management", statement="Wir machen Patch Management")],
evidence=[ExistingEvidence(evidence_id="patch-policy.pdf", evidence_type="policy", proves_capability_id="cap_patch_management")])
prof3 = build_company_profile(ctx3, ISO_MAP)
w("**Expected Company Capability Profile** _(4-Zustands-Trust-Model)_")
w("")
for c in prof3.candidate_capabilities:
w("- %s — **%s** (Quelle: %s)" % (c.capability_id, c.verification_status.value, c.source))
for c in prof3.confirmed_capabilities:
w("- %s — **%s** (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources)))
w("")
reg = CapabilityRegistry()
m = mint_capability(reg, CapabilityCandidate(raw_term="Incident Response"), category="Security")
ca = evaluate_relation(CapabilityRelation(relation_id="r1", source="certification:ISO27001", target_capability_id=m.capability_id, relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION))
cb = evaluate_relation(CapabilityRelation(relation_id="r2", source="ir-runbook.pdf", target_capability_id=m.capability_id, relationship_type=RelationType.CONFIRMS, evidence_kind=EvidenceKind.ARTIFACT))
w("**Expected Master Capability Registry** _(computed confidence, policy-versioniert)_")
w("")
w("- ISO27001 *supports* %s -> %s/%s (policy %s) — computed, nicht gespeichert" % (m.capability_id, ca.status.value, ca.confidence.value, ca.policy_version))
w("- ir-runbook.pdf *confirms* %s -> %s/%s — nur echtes Artefakt erreicht confirmed" % (m.capability_id, cb.status.value, cb.confidence.value))
w("")
w("**Known Gaps:** `cap_*` (Company 2A) und `MCAP-*` (Registry) sind noch nicht verlinkt (RS-003).")
w("")
coverage_table([
("Company Context", "PASS", "ISO27001 + Declaration + Evidence"),
("Trust-State (declared/inferred/confirmed)", "PASS", "Zertifizierung nie confirmed"),
("Master Capability Registry", "PASS", "computed confidence, policy-versioniert"),
("cap ↔ MCAP Linking", "TODO", "zwei Vokabulare unverbunden → RS-003"),
])
# ── Scenario 4 — Transition (RS-005 Planning Engine + Knowledge Pattern) ───
w("## Szenario 4 — Transition (RS-005 fährt JEDEN Knowledge Pattern)")
w("")
w("_Genericity-Beweis: derselbe Algorithmus trägt jeden Transition Knowledge Pattern, nicht nur den CRA._")
w("")
_pat_dir = os.path.join(os.path.dirname(__file__), "..", "knowledge", "transition_patterns")
_pat_files = sorted(f for f in os.listdir(_pat_dir)
if f.startswith("transition_pattern_") and f.endswith(".yaml"))
_t_rows: List[Row] = []
for _pf in _pat_files:
with open(os.path.join(_pat_dir, _pf), encoding="utf-8") as _f:
PAT = yaml.safe_load(_f)
if not isinstance(PAT["transition_goal"]["to"], dict):
continue # multi-target (Regulatory Convergence) patterns are handled in the convergence section
_src = "".join(c for c in PAT["transition_goal"]["from"]["standard"] if c.isalnum()) # e.g. ISOIEC27001
_tgt = PAT["transition_goal"]["to"].get("regulation") or PAT["transition_goal"]["to"].get("framework") or "TARGET"
_have = [a["capability"] for a in PAT["likely_covered"]]
_map = {_src: CapabilityMappingEntry(capability_ids=_have, confidence=Confidence.MEDIUM)}
_profile = build_company_profile(
CompanyContext(company_id="kunde", certifications=[Certification(certification_id=_src)]), _map)
_reqs = [TargetRequirement(capability_id=a["capability"], question_intent="verify_existence",
expected_evidence=a.get("expected_evidence", [])) for a in PAT["likely_covered"]]
_reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"),
expected_evidence=d.get("expected_evidence", [])) for d in PAT["delta_requirements"]]
_tc = TransitionContext(company_id="kunde", known_certifications=[_src],
target=TransitionGoal(target_id=_tgt, target_type=TargetType.REGULATION, label=_tgt))
_a = assess_transition(_tc, _reqs, _profile)
_carried = len(_a.coverage) == len(_reqs) and len(_a.question_requests) > 0
_n_high = sum(1 for _r in _a.question_requests if _r.priority.value == "high")
w("**%s%s** _(%s, status=%s)_" % (PAT["transition_goal"]["from"]["standard"], _tgt, PAT["id"], PAT["status"]))
w("> %s" % _a.summary.headline)
w("- Delta zuerst (HIGH): %s" % (", ".join(_r.capability_id for _r in _a.question_requests if _r.priority.value == "high") or "—"))
w("- vermutlich abgedeckt: %s" % (", ".join(_a.summary.probably_covered) or "—"))
w("- Pattern getragen: **%s** (%d caps → %d coverage + %d requests)"
% ("ja" if _carried else "NEIN", len(_reqs), len(_a.coverage), len(_a.question_requests)))
w("")
_t_rows.append(("Transition %s%s" % (_src, _tgt), "PASS" if _carried else "PARTIAL",
"%s · %d HIGH-Delta + %d zu bestätigen" % (PAT["status"], _n_high, len(_a.summary.probably_covered))))
_t_rows.append(("RS-005.1 Renderer (Fragetext)", "TODO", "verschoben — Engine liefert nur Requests"))
coverage_table(_t_rows)
# ── Reference Transition Scenarios (RTS) — canonical regression (Soll/Ist + knowledge) ───
w("## Reference Transition Scenarios (RTS) — kanonische Regression (Soll/Ist)")
w("")
w('_Anonymisierte Archetypen (KEINE Firmennamen). Jeder RTS pinnt ein Expected Outcome; jeder Commit muss es reproduzieren (identisch oder besser). Data Act = `uncertain`, nie fix „gilt/gilt-nicht"._')
w("")
_rts_dir = os.path.join(os.path.dirname(__file__), "..", "knowledge", "reference_transition_scenarios")
_pat_d2 = os.path.join(os.path.dirname(__file__), "..", "knowledge", "transition_patterns")
_pat_by_id = {}
for _pf in sorted(os.listdir(_pat_d2)):
if _pf.startswith("transition_pattern_") and _pf.endswith(".yaml"):
with open(os.path.join(_pat_d2, _pf), encoding="utf-8") as _h:
_pp = yaml.safe_load(_h)
_pat_by_id[_pp["id"]] = _pp
# Convergence pattern (multi-target) — run once through RS-005; reused by RTS-003 + the convergence section.
_CONV_ID = "TP-ISO27001-CRA-MaschinenVO-v1"
_MV_IDS = {"MaschinenVO", "MachineryRegulation", "Maschinenverordnung"}
CP = _pat_by_id.get(_CONV_ID)
_conv = None
_cp_missing = set() # type: ignore[var-annotated]
_all_t = [] # type: ignore[var-annotated]
_delta_t = {} # type: ignore[var-annotated]
if CP:
_all_t = sorted({t for it in CP["likely_covered"] + CP["delta_requirements"] for t in it.get("covers_targets", [])})
_delta_t = {d["capability"]: d.get("covers_targets", []) for d in CP["delta_requirements"]}
_conv = regulatory_convergence(_delta_t, _all_t)
_cp_have = [a["capability"] for a in CP["likely_covered"]]
_cp_map = {"ISO27001": CapabilityMappingEntry(capability_ids=_cp_have, confidence=Confidence.MEDIUM)}
_cp_prof = build_company_profile(
CompanyContext(company_id="conv", certifications=[Certification(certification_id="ISO27001")]), _cp_map)
_cp_reqs = [TargetRequirement(capability_id=a["capability"]) for a in CP["likely_covered"]]
_cp_reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"))
for d in CP["delta_requirements"]]
_cp_a = assess_transition(TransitionContext(company_id="conv", target=TransitionGoal(target_id="CRA+MaschinenVO")), _cp_reqs, _cp_prof)
_cp_missing = {c.capability_id for c in _cp_a.coverage if c.status == CoverageStatus.MISSING}
_rts_rows: List[Row] = []
for _rf in sorted(f for f in os.listdir(_rts_dir) if f.startswith("RTS-") and f.endswith(".yaml")):
with open(os.path.join(_rts_dir, _rf), encoding="utf-8") as _f:
RTS = yaml.safe_load(_f)
_cra = RTS["expected_outcome"]["cra"]
_pat = _pat_by_id[_cra["pattern"]]
_src = RTS["reference_company"]["known_certifications"][0]
_have = [a["capability"] for a in _pat["likely_covered"]]
_map = {_src: CapabilityMappingEntry(capability_ids=_have, confidence=Confidence.MEDIUM)}
_profile = build_company_profile(
CompanyContext(company_id="rts", certifications=[Certification(certification_id=_src)]), _map)
_reqs = [TargetRequirement(capability_id=a["capability"]) for a in _pat["likely_covered"]]
_reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"))
for d in _pat["delta_requirements"]]
_a = assess_transition(TransitionContext(company_id="rts", target=TransitionGoal(target_id="CRA")), _reqs, _profile)
_actual_missing = {c.capability_id for c in _a.coverage if c.status == CoverageStatus.MISSING}
_exp_delta = set(_cra["expected_delta_at_least"])
_exp_cov = set(_cra["expected_likely_covered_at_least"])
_delta_ok = _exp_delta <= _actual_missing
_cov_ok = _exp_cov <= set(_a.summary.probably_covered)
# Data Act: the engine must SURFACE uncertainty, never ASSERT "applies".
_tr = RTS["reference_company"]["product_traits"]
_P = P(name="rts", product_type=PT.MACHINERY if _tr.get("is_machine") else PT.HARDWARE,
markets=_tr.get("market", ["EU"]), economic_operator_role=Role.MANUFACTURER,
lifecycle_phase=LP.PLACING_ON_MARKET, is_machine=_tr.get("is_machine"),
is_component=_tr.get("is_component"), has_embedded_software=_tr.get("has_embedded_software"),
connected_to_internet=_tr.get("connected_to_internet"), has_remote_access=_tr.get("has_remote_access"),
generates_usage_data=_tr.get("generates_usage_data"))
_rmap = render_regulatory_map(_P)
_appl = {v.regulation_id for v in _rmap.applicable_regulations}
_unc = {v.regulation_id for v in _rmap.uncertain_regulations}
_da = "applicable" if "DataAct" in _appl else ("uncertain" if "DataAct" in _unc else "excluded/absent")
_da_ok = "DataAct" not in _appl # never wrongly asserted as applicable
_ok = _delta_ok and _cov_ok and _da_ok
w("**%s** — %s" % (RTS["id"], RTS["archetype"]))
w("> Start %s → CRA. %s" % ("+".join(RTS["reference_company"]["known_certifications"]), _a.summary.headline))
w("- Expected Delta erfüllt: **%s** (%d/%d Soll-Delta in der Ist-Lücke)" % ("ja" if _delta_ok else "NEIN", len(_exp_delta & _actual_missing), len(_exp_delta)))
w("- Expected likely_covered erfüllt: **%s**" % ("ja" if _cov_ok else "NEIN"))
w("- Data Act: Engine sagt **%s** (Soll: uncertain; nie asserted) → %s" % (_da, "ok" if _da_ok else "FEHLER (asserted!)"))
# MaschinenVO — multi-regulation target. uncertain (component) vs applies (machine); convergence only with an ISMS.
_mv = RTS["expected_outcome"].get("maschinenvo")
_mv_ok = True
_mv_tag = "—"
if _mv and _mv.get("expectation") == "uncertain":
_mv_ok = not (_appl & _MV_IDS) # a component: never wrongly asserted as in-scope
_mv_tag = "uncertain(ok)" if _mv_ok else "uncertain(FEHLER:asserted)"
w("- MaschinenVO: Soll **uncertain** (Komponente, deciding: is_safety_component) → Engine asserted nicht: %s" % ("ok" if _mv_ok else "FEHLER"))
elif _mv: # expectation: applies (is_machine: true)
_mv_exp = set(_mv.get("expected_delta_at_least", []))
if _mv.get("convergence_pattern") == _CONV_ID and CP:
_mv_ok = _mv_exp <= _cp_missing
_mv_tag = "applies %d/%d Safety-Delta" % (len(_mv_exp & _cp_missing), len(_mv_exp))
w("- MaschinenVO **gilt** (is_machine): %d/%d Safety-Delta in der Ist-Lücke (Convergence-Pattern) → %s" % (len(_mv_exp & _cp_missing), len(_mv_exp), "ok" if _mv_ok else "NEIN"))
else:
_mv_ok = bool(_mv_exp)
_mv_tag = "applies (geringe Konvergenz, kein ISMS)"
w("- MaschinenVO **gilt** (is_machine): Safety-Delta %s — **geringe Konvergenz ohne ISMS** (RS-004 reg-map-Gate offen)" % (", ".join(sorted(_mv_exp)) or "—"))
# Convergence — the USP: capabilities covering CRA AND MaschinenVO at once.
_cv = RTS["expected_outcome"].get("convergence")
_cv_ok = True
if _cv and _conv:
_cv_exp = set(_cv.get("expected_multi_target_at_least", []))
_cv_hit = _cv_exp & set(_conv.multi_target_capabilities)
_cv_ok = _cv_exp <= set(_conv.multi_target_capabilities)
w("- Konvergenz CRA∩MaschinenVO: %d/%d erwartete Multi-Target-Caps → %s (%s)" % (len(_cv_hit), len(_cv_exp), "ok" if _cv_ok else "NEIN", _conv.headline))
_ok = _ok and _mv_ok and _cv_ok
w("")
_rts_rows.append(("%s (%s→CRA%s)" % (RTS["id"], _src, "+MaschVO" if _mv else ""), "PASS" if _ok else "PARTIAL",
"%d/%d Delta-Soll · likely_covered %s · DataAct=%s · MaschVO=%s%s" % (
len(_exp_delta & _actual_missing), len(_exp_delta), "ok" if _cov_ok else "NEIN", _da, _mv_tag,
" · Konvergenz ok" if (_cv and _cv_ok) else "")))
coverage_table(_rts_rows)
# ── Regulatory Convergence — CRA + MaschinenVO (the multi-regulation USP) ───
w("## Regulatory Convergence — CRA + MaschinenVO (Cross-Regulation Capability Mapping)")
w("")
w("_Der USP: welche Capability deckt MEHRERE Regelwerke gleichzeitig? (Convergence Pattern, RTS-003-Archetyp.)_")
w("")
assert _conv is not None and CP is not None # precomputed before the RTS loop (convergence pattern on disk)
w("**Cross-Regulation Capability Mapping (Delta):** %s" % _conv.headline)
w("")
w("**Konvergenz — diese neuen Maßnahmen decken BEIDE Regelwerke gleichzeitig:**")
for _c in _conv.multi_target_capabilities:
w("- `%s`" % _c)
w("")
w("**Pro Regelwerk benötigt (Delta):** " + ", ".join("%s=%d" % (k, v) for k, v in _conv.per_target_count.items()))
w("")
w('**Kundensatz:** „Von den %d neuen Maßnahmen erfüllen %d gleichzeitig CRA und MaschinenVO." (heute liefert das praktisch kein Tool)'
% (len(_delta_t), len(_conv.multi_target_capabilities)))
w("")
coverage_table([
("Regulatory Convergence Pattern", "PASS", "%d Targets, %d Delta-Capabilities" % (len(_all_t), len(_delta_t))),
("Cross-Regulation Capability Mapping", "PASS", _conv.headline),
])
# ── Epics + roll-up ───────────────────────────────────────────────────────
w("## Gaps → Epics (Backlog — nur erfasst, NICHT implementiert)")
w("")
w("| Epic | Titel | schliesst Coverage-Luecke |")
w("|---|---|---|")
w("| RS-001 | Interpretation Pattern Library | Sz1 Interpretation PARTIAL -> PASS (CRA-Muster) |")
w("| RS-002 | Environmental Corpus (Pilotdomaene) | Sz2 Environmental UNSUPPORTED -> PASS |")
w("| RS-003 | Capability Linking (cap↔MCAP) + Company-Gap | Sz1/Sz3 Company Gap TODO -> PASS |")
w("| RS-004 | MaschinenVO/EMV Registry Linking | Sz1/Sz2 MaschinenVO/EMV PARTIAL -> PASS |")
w("")
total = len(ROLLUP)
npass = ROLLUP.count("PASS")
w("## Suite-Status (Roll-up)")
w("")
w("- Coverage-Zellen gesamt: **%d**" % total)
w("- PASS: **%d** · PARTIAL: %d · UNSUPPORTED: %d · TODO: %d · N/A: %d · NEEDS_FACTS: %d"
% (npass, ROLLUP.count("PARTIAL"), ROLLUP.count("UNSUPPORTED"), ROLLUP.count("TODO"),
ROLLUP.count("N/A"), ROLLUP.count("NEEDS_FACTS")))
w("- Fortschritt = PASS-Anteil steigt, wenn Epics RS-001…004 landen (objektiver Maßstab, kein LOC).")
print("\n".join(OUT))