Files
breakpilot-compliance/backend-compliance/reference_scenarios/generate.py
T
Benjamin Admin b6cfc0a503 feat(knowledge-production): Playbook Draft Generator — prepare the corpus deterministically
The bottleneck is not content, it is knowledge PRODUCTION. Instead of writing 200 playbooks by
hand, generate drafts deterministically from data the software already owns, then have an expert
review them. Mirrors the legal pipeline (Gesetz -> Parser -> Obligation -> Review) for BreakPilot's
own knowledge: new Capability -> Registry -> Transition Pattern -> Playbook Draft Generator ->
Expert Review -> versioned Playbook.

- compliance/knowledge_production/: generate_playbook_draft(capability, requirement, control_links)
  + drafts_from_pattern(pattern) -> one PlaybookDraft per delta capability. Owned fields (why /
  closes_regulations / expected_evidence / typical_controls) are assembled with per-field provenance;
  the practitioner know-how (tools / process_steps / how_others) is left as an explicit TODO.
- DraftStatus lifecycle (Freigabestatus): draft_generated -> in_review -> reviewed -> validated ->
  proven. Deterministic, NO LLM in the core (any model enrichment stays offline/advisory/propose-only).
- ADR-005: extends "the engine does not change, the corpus grows" with "and the corpus is not written
  by hand — it is deterministically prepared, then curated".
- reference suite: "Knowledge Production" section turns the convergence pattern into 12 auto-assembled
  drafts (why/closes/evidence filled, tools/steps TODO) -> review 12 drafts, don't write 12 playbooks.

10 tests (50 with playbook/optimization/transition/company), mypy --strict clean, check-loc 0.
Product code with no app caller + ADR/reference = non-runtime -> no deploy (ADR-001). Freeze-safe.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-27 13:31:31 +02:00

487 lines
31 KiB
Python

# ruff: noqa
# mypy: ignore-errors
"""Reference Scenario Suite v1 — generator (living regression reference).
Runs the REAL deployed engines for three real customer scenarios and emits a
markdown reference artifact. Per scenario it derives an "Architecture Coverage"
table from the actual run, so when a new domain lands the cells flip automatically
(e.g. Sz2/Environmental UNSUPPORTED -> PASS). This is the objective measure of
"is BreakPilot better than six months ago" — not LOC, not features.
Run: cd backend-compliance && PYTHONPATH=. python3 reference_scenarios/generate.py
Not product code; not imported by the app.
"""
from __future__ import annotations
from typing import List, Tuple
from compliance.profile.canonical import (
CanonicalProductRegulatoryProfile as P, CanonicalProductType as PT,
EconomicOperatorRole as Role, CanonicalLifecyclePhase as LP,
ProductComponent as Comp, ComponentKind as CK, EnvironmentalImpact as Env,
)
from compliance.product_scope.orchestrator import resolve_product_scope
from compliance.product_scope.schemas import ScopeStatus
from compliance.regulatory_map.renderer import render_regulatory_map
from compliance.interpretation_map.adapter import interpret_in_map
from compliance.rci import create_baseline, assess_change, RegulatoryChange, ChangeType
from compliance.company import (
CompanyContext, Certification, Declaration, ExistingEvidence,
CapabilityMappingEntry, build_company_profile,
)
from compliance.capability import (
CapabilityRegistry, CapabilityCandidate, CapabilityRelation, RelationType,
EvidenceKind, mint_capability, evaluate_relation,
)
from compliance.reasoning.enums import Confidence
from compliance.transition_reasoning import (
TransitionContext, TransitionGoal, TargetType, TargetRequirement, assess_transition, CoverageStatus,
regulatory_convergence,
)
from compliance.optimization import roadmap_from_delta, select_within_budget
from compliance.playbook import playbooks_for_plan
from compliance.knowledge_production import drafts_from_pattern
import os
import yaml
from _helpers import ( # noqa: E402 (script-dir module; keeps generate.py under the LOC budget)
OUT, ROLLUP, Row, w, coverage_table, reg_map_block, unsupported_block, interp_status,
)
ISO_MAP = {"ISO27001": CapabilityMappingEntry(
capability_ids=["cap_incident_response", "cap_supplier_management", "cap_asset_management"],
confidence=Confidence.MEDIUM)}
w("# Reference Scenario Suite v1")
w("")
w("> **Kein Doku-Artefakt — die erste Ground Truth / Living Reference Suite.** Erzeugt aus den "
"REALEN deployten Engines (aktueller deployter main) via `reference_scenarios/generate.py`. Jede "
"`Architecture Coverage`-Zelle ist aus dem echten Lauf ABGELEITET; sobald eine Domaene landet, "
"kippt die Zelle automatisch (z. B. Sz2/Environmental UNSUPPORTED -> PASS). Beantwortet dauerhaft: "
'„Ist BreakPilot besser als vor sechs Monaten?" — anhand echter Kundensituationen, nicht LOC.')
w("")
w("Synthetische `Cert->Capability`-Mappings sind als ILLUSTRATIV markiert; die echte Tabelle gehoert "
"Compliance Execution (siehe Master Capability Registry).")
w("")
# ── Scenario 1 ────────────────────────────────────────────────────────────
w("## Szenario 1 — Maschinenbauer mit ISMS + SBOM + Remote Access")
w("")
w('**Frage:** „Was gilt fuer uns, und reicht das?"')
w("")
p1 = P(name="Industrielle Verpackungsmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"],
economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET,
is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True,
has_remote_access=True, connected_to_internet=True, has_security_function=True,
has_sbom=True, has_incident_response=True, technologies=["cloud", "ota_updates"],
existing_certifications=["ISO27001"])
sc1 = resolve_product_scope(p1)
rmap1 = render_regulatory_map(p1)
w("**Input:** Maschine, vernetzt (Remote/Cloud), Firmware, Rolle Hersteller, Maerkte EU/DE; "
"Company: ISMS (ISO27001) + SBOM + Incident Response.")
w("")
reg_map_block(rmap1)
w("**Input — Company Context** _(ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management)_")
w("")
ctx1 = CompanyContext(company_id="maschinenbau",
certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")],
declarations=[Declaration(capability_id="cap_patch_management")],
evidence=[ExistingEvidence(evidence_id="sbom.json", evidence_type="sbom", proves_capability_id="cap_sbom_management")])
prof1 = build_company_profile(ctx1, ISO_MAP)
for c in prof1.candidate_capabilities:
w("- candidate: %s%s (%s)" % (c.capability_id, c.verification_status.value, c.source))
for c in prof1.confirmed_capabilities:
w("- CONFIRMED: %s%s (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources)))
w("")
ie1 = interpret_in_map(rmap1, "Wir liefern Sicherheitsupdates fuenf Jahre lang, damit ist der CRA erfuellt.")
w("**Expected Interpretation**")
w("")
w('> Auslegung „5 Jahre Updates -> CRA erfuellt" -> **%s**' % ie1.assessment.value)
w("> " + ie1.explanation)
w("")
base1 = create_baseline(p1, {"sbom_creation": ["sbom"], "provide_security_updates": ["policy"]}, baseline_id="s1")
a1 = assess_change(base1, RegulatoryChange(change_id="cra-2026-amd", affected_regulations=["CRA"],
affected_obligations=["cra_new_disclosure_duty", "sbom_creation", "vuln_handling_process"],
change_type=ChangeType.AMENDMENT))
w("**Expected RCI** _(CRA-Novelle gegen gespeicherte Baseline)_")
w("")
w("> affects_product = %s%s" % (a1.affects_product, a1.summary.what_changed))
for d in a1.deltas:
w("- %s -> **%s** (fehlende Nachweise: %s)" % (d.obligation_id, d.delta_type.value, ", ".join(d.missing_evidence) or "-"))
w("")
unsupported_block(rmap1)
w("**Known Gaps:** Interpretation kennt kein CRA-Muster (RS-001) · MaschinenVO/EMV-Pflichten nicht "
"registry-verlinkt (RS-004) · cap/MCAP/Pflicht-Evidence nicht gejoint = Company-Gap (RS-003).")
w("")
coverage_table([
("Company Context", "PASS", "ISO27001 + SBOM + Declaration"),
("Product Profile", "PASS", "Maschine, vernetzt, Firmware"),
("Navigator", "PASS", "ready_for_scope"),
("Scope", "PASS" if sc1.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", "CRA/MaschinenVO/EMV + 3 unsicher"),
("Regulatory Map", "PASS", "Overlaps + 1-Nachweis-N-Pflichten"),
("CRA Obligations", "PASS", "12 registry-verlinkt"),
("MaschinenVO/EMV Obligations", "PARTIAL", "Scope ja, Pflichten nicht verlinkt → RS-004"),
("Interpretation", interp_status(ie1.assessment.value), "kein CRA-Muster → RS-001"),
("RCI", "PASS", "1 neu, 2 geaendert"),
("Company Gap", "TODO", "cap↔MCAP↔Pflicht nicht gejoint → RS-003"),
("Environmental", "N/A", "keine Umwelt-Trigger"),
])
# ── Scenario 2 ────────────────────────────────────────────────────────────
w("## Szenario 2 — Industriespuelmaschine mit Abwasser/Chemikalien")
w("")
w('**Frage:** „Welche Umweltbereiche sind noch nicht abgedeckt?"')
w("")
p2 = P(name="Industriespuelmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"],
economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET,
is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True,
components=[Comp(name="Dosierpumpe", kind=CK.CHEMICAL_DOSING), Comp(name="Abwasserauslass", kind=CK.WASTEWATER_OUTLET)],
environmental=Env(discharges_to_wastewater=True, uses_cleaning_chemicals=True, consumes_energy_or_water=True))
sc2 = resolve_product_scope(p2)
rmap2 = render_regulatory_map(p2)
w("**Input:** Maschine mit Chemikalien-Dosierung + Abwasserauslass; Umwelt-Trigger gesetzt.")
w("")
reg_map_block(rmap2)
unsupported_block(rmap2)
ie2 = interpret_in_map(rmap2, "Unsere Abwassereinleitung mit Reinigungschemikalien ist sicher abgedeckt.")
w("**Expected Interpretation** _(Umwelt -> bewusst nicht bewertet)_")
w("")
w("> " + ie2.explanation)
w("")
w("**Known Gaps:** Abwasser/Chemikalien/Energie sind `unsupported_domain` — Environmental Corpus fehlt (RS-002).")
w("")
env_status = "UNSUPPORTED" if rmap2.unsupported_domains else "PASS"
coverage_table([
("Product Profile", "PASS", "Maschine + Umwelt-Komponenten"),
("Scope", "PASS" if sc2.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", ""),
("Regulatory Map", "PASS", "CRA/MaschinenVO/EMV"),
("Environmental (Abwasser/Chemikalien/Energie)", env_status, 'ehrlich „noch nicht im Korpus" → RS-002'),
("Interpretation (Umwelt)", "PARTIAL", "future_corpus_needed statt Scheinsicherheit"),
])
# ── Scenario 3 ────────────────────────────────────────────────────────────
w("## Szenario 3 — ISO27001-zertifiziertes Unternehmen")
w("")
w('**Frage:** „Welche Capabilities sind inferred, declared oder confirmed?"')
w("")
w("_ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management_")
w("")
ctx3 = CompanyContext(company_id="iso-kunde",
certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")],
declarations=[Declaration(capability_id="cap_patch_management", statement="Wir machen Patch Management")],
evidence=[ExistingEvidence(evidence_id="patch-policy.pdf", evidence_type="policy", proves_capability_id="cap_patch_management")])
prof3 = build_company_profile(ctx3, ISO_MAP)
w("**Expected Company Capability Profile** _(4-Zustands-Trust-Model)_")
w("")
for c in prof3.candidate_capabilities:
w("- %s — **%s** (Quelle: %s)" % (c.capability_id, c.verification_status.value, c.source))
for c in prof3.confirmed_capabilities:
w("- %s — **%s** (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources)))
w("")
reg = CapabilityRegistry()
m = mint_capability(reg, CapabilityCandidate(raw_term="Incident Response"), category="Security")
ca = evaluate_relation(CapabilityRelation(relation_id="r1", source="certification:ISO27001", target_capability_id=m.capability_id, relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION))
cb = evaluate_relation(CapabilityRelation(relation_id="r2", source="ir-runbook.pdf", target_capability_id=m.capability_id, relationship_type=RelationType.CONFIRMS, evidence_kind=EvidenceKind.ARTIFACT))
w("**Expected Master Capability Registry** _(computed confidence, policy-versioniert)_")
w("")
w("- ISO27001 *supports* %s -> %s/%s (policy %s) — computed, nicht gespeichert" % (m.capability_id, ca.status.value, ca.confidence.value, ca.policy_version))
w("- ir-runbook.pdf *confirms* %s -> %s/%s — nur echtes Artefakt erreicht confirmed" % (m.capability_id, cb.status.value, cb.confidence.value))
w("")
w("**Known Gaps:** `cap_*` (Company 2A) und `MCAP-*` (Registry) sind noch nicht verlinkt (RS-003).")
w("")
coverage_table([
("Company Context", "PASS", "ISO27001 + Declaration + Evidence"),
("Trust-State (declared/inferred/confirmed)", "PASS", "Zertifizierung nie confirmed"),
("Master Capability Registry", "PASS", "computed confidence, policy-versioniert"),
("cap ↔ MCAP Linking", "TODO", "zwei Vokabulare unverbunden → RS-003"),
])
# ── Scenario 4 — Transition (RS-005 Planning Engine + Knowledge Pattern) ───
w("## Szenario 4 — Transition (RS-005 fährt JEDEN Knowledge Pattern)")
w("")
w("_Genericity-Beweis: derselbe Algorithmus trägt jeden Transition Knowledge Pattern, nicht nur den CRA._")
w("")
_pat_dir = os.path.join(os.path.dirname(__file__), "..", "knowledge", "transition_patterns")
_pat_files = sorted(f for f in os.listdir(_pat_dir)
if f.startswith("transition_pattern_") and f.endswith(".yaml"))
_t_rows: List[Row] = []
for _pf in _pat_files:
with open(os.path.join(_pat_dir, _pf), encoding="utf-8") as _f:
PAT = yaml.safe_load(_f)
if not isinstance(PAT["transition_goal"]["to"], dict):
continue # multi-target (Regulatory Convergence) patterns are handled in the convergence section
_src = "".join(c for c in PAT["transition_goal"]["from"]["standard"] if c.isalnum()) # e.g. ISOIEC27001
_tgt = PAT["transition_goal"]["to"].get("regulation") or PAT["transition_goal"]["to"].get("framework") or "TARGET"
_have = [a["capability"] for a in PAT["likely_covered"]]
_map = {_src: CapabilityMappingEntry(capability_ids=_have, confidence=Confidence.MEDIUM)}
_profile = build_company_profile(
CompanyContext(company_id="kunde", certifications=[Certification(certification_id=_src)]), _map)
_reqs = [TargetRequirement(capability_id=a["capability"], question_intent="verify_existence",
expected_evidence=a.get("expected_evidence", [])) for a in PAT["likely_covered"]]
_reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"),
expected_evidence=d.get("expected_evidence", [])) for d in PAT["delta_requirements"]]
_tc = TransitionContext(company_id="kunde", known_certifications=[_src],
target=TransitionGoal(target_id=_tgt, target_type=TargetType.REGULATION, label=_tgt))
_a = assess_transition(_tc, _reqs, _profile)
_carried = len(_a.coverage) == len(_reqs) and len(_a.question_requests) > 0
_n_high = sum(1 for _r in _a.question_requests if _r.priority.value == "high")
w("**%s%s** _(%s, status=%s)_" % (PAT["transition_goal"]["from"]["standard"], _tgt, PAT["id"], PAT["status"]))
w("> %s" % _a.summary.headline)
w("- Delta zuerst (HIGH): %s" % (", ".join(_r.capability_id for _r in _a.question_requests if _r.priority.value == "high") or "—"))
w("- vermutlich abgedeckt: %s" % (", ".join(_a.summary.probably_covered) or "—"))
w("- Pattern getragen: **%s** (%d caps → %d coverage + %d requests)"
% ("ja" if _carried else "NEIN", len(_reqs), len(_a.coverage), len(_a.question_requests)))
w("")
_t_rows.append(("Transition %s%s" % (_src, _tgt), "PASS" if _carried else "PARTIAL",
"%s · %d HIGH-Delta + %d zu bestätigen" % (PAT["status"], _n_high, len(_a.summary.probably_covered))))
_t_rows.append(("RS-005.1 Renderer (Fragetext)", "TODO", "verschoben — Engine liefert nur Requests"))
coverage_table(_t_rows)
# ── Reference Transition Scenarios (RTS) — canonical regression (Soll/Ist + knowledge) ───
w("## Reference Transition Scenarios (RTS) — kanonische Regression (Soll/Ist)")
w("")
w('_Anonymisierte Archetypen (KEINE Firmennamen). Jeder RTS pinnt ein Expected Outcome; jeder Commit muss es reproduzieren (identisch oder besser). Data Act = `uncertain`, nie fix „gilt/gilt-nicht"._')
w("")
_rts_dir = os.path.join(os.path.dirname(__file__), "..", "knowledge", "reference_transition_scenarios")
_pat_d2 = os.path.join(os.path.dirname(__file__), "..", "knowledge", "transition_patterns")
_pat_by_id = {}
for _pf in sorted(os.listdir(_pat_d2)):
if _pf.startswith("transition_pattern_") and _pf.endswith(".yaml"):
with open(os.path.join(_pat_d2, _pf), encoding="utf-8") as _h:
_pp = yaml.safe_load(_h)
_pat_by_id[_pp["id"]] = _pp
# Convergence pattern (multi-target) — run once through RS-005; reused by RTS-003 + the convergence section.
_CONV_ID = "TP-ISO27001-CRA-MaschinenVO-v1"
_MV_IDS = {"MaschinenVO", "MachineryRegulation", "Maschinenverordnung"}
CP = _pat_by_id.get(_CONV_ID)
_conv = None
_cp_missing = set() # type: ignore[var-annotated]
_all_t = [] # type: ignore[var-annotated]
_delta_t = {} # type: ignore[var-annotated]
if CP:
_all_t = sorted({t for it in CP["likely_covered"] + CP["delta_requirements"] for t in it.get("covers_targets", [])})
_delta_t = {d["capability"]: d.get("covers_targets", []) for d in CP["delta_requirements"]}
_conv = regulatory_convergence(_delta_t, _all_t)
_cp_have = [a["capability"] for a in CP["likely_covered"]]
_cp_map = {"ISO27001": CapabilityMappingEntry(capability_ids=_cp_have, confidence=Confidence.MEDIUM)}
_cp_prof = build_company_profile(
CompanyContext(company_id="conv", certifications=[Certification(certification_id="ISO27001")]), _cp_map)
_cp_reqs = [TargetRequirement(capability_id=a["capability"]) for a in CP["likely_covered"]]
_cp_reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"))
for d in CP["delta_requirements"]]
_cp_a = assess_transition(TransitionContext(company_id="conv", target=TransitionGoal(target_id="CRA+MaschinenVO")), _cp_reqs, _cp_prof)
_cp_missing = {c.capability_id for c in _cp_a.coverage if c.status == CoverageStatus.MISSING}
_rts_rows: List[Row] = []
for _rf in sorted(f for f in os.listdir(_rts_dir) if f.startswith("RTS-") and f.endswith(".yaml")):
with open(os.path.join(_rts_dir, _rf), encoding="utf-8") as _f:
RTS = yaml.safe_load(_f)
_cra = RTS["expected_outcome"]["cra"]
_pat = _pat_by_id[_cra["pattern"]]
_src = RTS["reference_company"]["known_certifications"][0]
_have = [a["capability"] for a in _pat["likely_covered"]]
_map = {_src: CapabilityMappingEntry(capability_ids=_have, confidence=Confidence.MEDIUM)}
_profile = build_company_profile(
CompanyContext(company_id="rts", certifications=[Certification(certification_id=_src)]), _map)
_reqs = [TargetRequirement(capability_id=a["capability"]) for a in _pat["likely_covered"]]
_reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"))
for d in _pat["delta_requirements"]]
_a = assess_transition(TransitionContext(company_id="rts", target=TransitionGoal(target_id="CRA")), _reqs, _profile)
_actual_missing = {c.capability_id for c in _a.coverage if c.status == CoverageStatus.MISSING}
_exp_delta = set(_cra["expected_delta_at_least"])
_exp_cov = set(_cra["expected_likely_covered_at_least"])
_delta_ok = _exp_delta <= _actual_missing
_cov_ok = _exp_cov <= set(_a.summary.probably_covered)
# Data Act: the engine must SURFACE uncertainty, never ASSERT "applies".
_tr = RTS["reference_company"]["product_traits"]
_P = P(name="rts", product_type=PT.MACHINERY if _tr.get("is_machine") else PT.HARDWARE,
markets=_tr.get("market", ["EU"]), economic_operator_role=Role.MANUFACTURER,
lifecycle_phase=LP.PLACING_ON_MARKET, is_machine=_tr.get("is_machine"),
is_component=_tr.get("is_component"), has_embedded_software=_tr.get("has_embedded_software"),
connected_to_internet=_tr.get("connected_to_internet"), has_remote_access=_tr.get("has_remote_access"),
generates_usage_data=_tr.get("generates_usage_data"))
_rmap = render_regulatory_map(_P)
_appl = {v.regulation_id for v in _rmap.applicable_regulations}
_unc = {v.regulation_id for v in _rmap.uncertain_regulations}
_da = "applicable" if "DataAct" in _appl else ("uncertain" if "DataAct" in _unc else "excluded/absent")
_da_ok = "DataAct" not in _appl # never wrongly asserted as applicable
_ok = _delta_ok and _cov_ok and _da_ok
w("**%s** — %s" % (RTS["id"], RTS["archetype"]))
w("> Start %s → CRA. %s" % ("+".join(RTS["reference_company"]["known_certifications"]), _a.summary.headline))
w("- Expected Delta erfüllt: **%s** (%d/%d Soll-Delta in der Ist-Lücke)" % ("ja" if _delta_ok else "NEIN", len(_exp_delta & _actual_missing), len(_exp_delta)))
w("- Expected likely_covered erfüllt: **%s**" % ("ja" if _cov_ok else "NEIN"))
w("- Data Act: Engine sagt **%s** (Soll: uncertain; nie asserted) → %s" % (_da, "ok" if _da_ok else "FEHLER (asserted!)"))
# MaschinenVO — multi-regulation target. uncertain (component) vs applies (machine); convergence only with an ISMS.
_mv = RTS["expected_outcome"].get("maschinenvo")
_mv_ok = True
_mv_tag = "—"
if _mv and _mv.get("expectation") == "uncertain":
_mv_ok = not (_appl & _MV_IDS) # a component: never wrongly asserted as in-scope
_mv_tag = "uncertain(ok)" if _mv_ok else "uncertain(FEHLER:asserted)"
w("- MaschinenVO: Soll **uncertain** (Komponente, deciding: is_safety_component) → Engine asserted nicht: %s" % ("ok" if _mv_ok else "FEHLER"))
elif _mv: # expectation: applies (is_machine: true)
_mv_exp = set(_mv.get("expected_delta_at_least", []))
if _mv.get("convergence_pattern") == _CONV_ID and CP:
_mv_ok = _mv_exp <= _cp_missing
_mv_tag = "applies %d/%d Safety-Delta" % (len(_mv_exp & _cp_missing), len(_mv_exp))
w("- MaschinenVO **gilt** (is_machine): %d/%d Safety-Delta in der Ist-Lücke (Convergence-Pattern) → %s" % (len(_mv_exp & _cp_missing), len(_mv_exp), "ok" if _mv_ok else "NEIN"))
else:
_mv_ok = bool(_mv_exp)
_mv_tag = "applies (geringe Konvergenz, kein ISMS)"
w("- MaschinenVO **gilt** (is_machine): Safety-Delta %s — **geringe Konvergenz ohne ISMS** (RS-004 reg-map-Gate offen)" % (", ".join(sorted(_mv_exp)) or "—"))
# Convergence — the USP: capabilities covering CRA AND MaschinenVO at once.
_cv = RTS["expected_outcome"].get("convergence")
_cv_ok = True
if _cv and _conv:
_cv_exp = set(_cv.get("expected_multi_target_at_least", []))
_cv_hit = _cv_exp & set(_conv.multi_target_capabilities)
_cv_ok = _cv_exp <= set(_conv.multi_target_capabilities)
w("- Konvergenz CRA∩MaschinenVO: %d/%d erwartete Multi-Target-Caps → %s (%s)" % (len(_cv_hit), len(_cv_exp), "ok" if _cv_ok else "NEIN", _conv.headline))
_ok = _ok and _mv_ok and _cv_ok
w("")
_rts_rows.append(("%s (%s→CRA%s)" % (RTS["id"], _src, "+MaschVO" if _mv else ""), "PASS" if _ok else "PARTIAL",
"%d/%d Delta-Soll · likely_covered %s · DataAct=%s · MaschVO=%s%s" % (
len(_exp_delta & _actual_missing), len(_exp_delta), "ok" if _cov_ok else "NEIN", _da, _mv_tag,
" · Konvergenz ok" if (_cv and _cv_ok) else "")))
coverage_table(_rts_rows)
# ── Regulatory Convergence — CRA + MaschinenVO (the multi-regulation USP) ───
w("## Regulatory Convergence — CRA + MaschinenVO (Cross-Regulation Capability Mapping)")
w("")
w("_Der USP: welche Capability deckt MEHRERE Regelwerke gleichzeitig? (Convergence Pattern, RTS-003-Archetyp.)_")
w("")
assert _conv is not None and CP is not None # precomputed before the RTS loop (convergence pattern on disk)
w("**Cross-Regulation Capability Mapping (Delta):** %s" % _conv.headline)
w("")
w("**Konvergenz — diese neuen Maßnahmen decken BEIDE Regelwerke gleichzeitig:**")
for _c in _conv.multi_target_capabilities:
w("- `%s`" % _c)
w("")
w("**Pro Regelwerk benötigt (Delta):** " + ", ".join("%s=%d" % (k, v) for k, v in _conv.per_target_count.items()))
w("")
w('**Kundensatz:** „Von den %d neuen Maßnahmen erfüllen %d gleichzeitig CRA und MaschinenVO." (heute liefert das praktisch kein Tool)'
% (len(_delta_t), len(_conv.multi_target_capabilities)))
w("")
coverage_table([
("Regulatory Convergence Pattern", "PASS", "%d Targets, %d Delta-Capabilities" % (len(_all_t), len(_delta_t))),
("Cross-Regulation Capability Mapping", "PASS", _conv.headline),
])
# ── Regulatory Optimization — Roadmap-Renderer über DEMSELBEN Capability Delta ───
w("## Regulatory Optimization — größter regulatorischer Hebel zuerst")
w("")
w("_Dieselbe Berechnung wie die GAP-Analyse, anderer Renderer: das **Capability Delta** (RS-005) wird nach **regulatorischem Hebel** priorisiert (eine Maßnahme schließt N Regelwerke gleichzeitig). Welt-1: % über die IDENTIFIZIERTEN Anforderungen, kein Compliance-Urteil._")
w("")
_opt = roadmap_from_delta(_cp_a, _delta_t) # SAME delta the Interview Renderer turns into questions
_open_reqs = {_m.capability_id: _m.covers for _m in _opt.ranked_measures}
w("**Kompression:** %s" % _opt.headline)
w("")
w("**Top-Maßnahmen nach regulatorischem Hebel (Roadmap):**")
w("")
w("| # | Maßnahme | Hebel | deckt | kumuliert |")
w("|---|---|---|---|---|")
for _i, _m in enumerate(_opt.ranked_measures[:6], 1):
w("| %d | `%s` | **%d** | %s | %d/%d (%.0f%%) |" % (
_i, _m.capability_id, _m.leverage, "+".join(_m.covers),
_m.cumulative_requirements, _opt.total_requirements, _m.cumulative_coverage * 100))
w("")
_bud = select_within_budget(_open_reqs, 5)
w('**Managementsatz:** „Wenn Sie zuerst diese %d Maßnahmen umsetzen, schließen Sie %d von %d identifizierten Anforderungen (%.0f%%) — höchster regulatorischer Hebel." (Hebel skaliert mit jedem weiteren Regelwerk/Convergence-Pattern.)'
% (len(_bud.selected_capabilities), _bud.requirements_closed, _bud.total_requirements, _bud.coverage_ratio * 100))
w("")
w("_Eine Wahrheit, zwei Renderer: dasselbe Capability Delta liefert dem Auditor **Fragen** (Interview) und dem GF **Maßnahmen** (Roadmap)._")
w("")
coverage_table([
("Capability Delta Engine (RS-005)", "PASS", "ein Delta, mehrere Renderer"),
("Roadmap/Management Renderer (Hebel)", "PASS", _opt.headline),
("Budget-Priorisierung", "PASS", "Top-5 → %.0f%% der identifizierten Anforderungen" % (_bud.coverage_ratio * 100)),
])
# ── Implementation Playbook — Berater-Renderer (wie komme ich dort hin?) ───
w("## Implementation Playbook — wie komme ich dort hin? (Berater-Renderer)")
w("")
w('_Nach „was fehlt?" (Delta) und „womit anfangen?" (Hebel) die nächste Ebene: **wie umsetzen?** Pro Maßnahme eine komplette Reise aus kuratiertem Wissen + Hebel + (injizierten) Execution-Links. Inhalt ist der Engpass, nicht die Software._')
w("")
_pb_dir = os.path.join(os.path.dirname(__file__), "..", "knowledge", "implementation_playbooks")
_pb_kb = {}
for _pf in sorted(os.listdir(_pb_dir)):
if _pf.endswith(".yaml"):
with open(os.path.join(_pb_dir, _pf), encoding="utf-8") as _h:
_pd = yaml.safe_load(_h)
_pb_kb[_pd["capability_id"]] = _pd
_pbs = playbooks_for_plan(_opt, _pb_kb) # chain Roadmap -> Playbook over the SAME delta
_have = [p for p in _pbs if p.status != "missing"]
_miss = [p for p in _pbs if p.status == "missing"]
w("**Reise pro Maßnahme (aus der Roadmap):** %d von %d Maßnahmen haben ein Playbook; %d brauchen noch Inhalt (Knowledge Acquisition)." % (len(_have), len(_pbs), len(_miss)))
w("")
_show = next((p for p in _pbs if p.capability_id == "sbom_creation"), None)
if _show:
w("**Beispielreise — `%s`** _(%s, schließt %s)_" % (_show.capability_id, _show.status, "+".join(_show.closes_regulations) or "—"))
w("> **Warum?** %s" % _show.why.strip())
w("- **Tools:** %s" % ", ".join(_show.tools))
w("- **Prozess:** %s" % " → ".join(s.title for s in _show.process_steps))
w("- **Nachweise:** %s" % ", ".join(_show.expected_evidence))
w("- **Wie andere es tun:** %s" % _show.how_others_do_it.strip())
w("")
w("**Roadmap → Implementation (Top-Maßnahmen nach Hebel):**")
w("")
w("| Maßnahme | Hebel | schließt | Playbook |")
w("|---|---|---|---|")
for _p in _pbs[:6]:
w("| `%s` | %d | %s | %s |" % (_p.capability_id, _p.leverage, "+".join(_p.closes_regulations) or "—",
("✓ " + _p.status) if _p.status != "missing" else "**fehlt (Inhalt)**"))
w("")
w("_Derselbe Capability-Strang, neuer Renderer: aus Diagnose wird Beratung. Die `fehlt`-Einträge sind der ehrliche Content-Backlog (höchster Hebel zuerst befüllen)._")
w("")
coverage_table([
("Implementation Playbook Renderer", "PASS", "Reise pro Capability (why/tools/process/evidence/controls)"),
("Roadmap → Playbook (Verkettung)", "PASS", "%d/%d Maßnahmen mit Playbook" % (len(_have), len(_pbs))),
("Playbook-Inhalt (Knowledge)", "TODO" if _miss else "PASS", "%d Capabilities brauchen noch Inhalt" % len(_miss)),
])
# ── Knowledge Production — Playbook Draft Generator (vorbereiten, dann kuratieren) ───
w("## Knowledge Production — Playbook-Entwürfe automatisch assemblieren")
w("")
w("_Der Engpass ist nicht Content, sondern Wissensproduktion. Der Corpus wird nicht von Hand geschrieben, sondern deterministisch aus vorhandenen Daten (Transition Pattern + Leverage + injizierte Controls) vorbereitet — dann fachlich kuratiert (wie Gesetz→Parser→Obligation→Review)._")
w("")
_kp = drafts_from_pattern(CP) if CP else [] # CP = convergence pattern (already loaded)
w("**Aus 1 Pattern → %d Playbook-Entwürfe** (`status: draft_generated`): eigene Felder (Warum/schließt/Nachweise) aus den Daten gefüllt, der Experte ergänzt nur Tools/Prozess/How-others." % len(_kp))
w("")
_kd = next((d for d in _kp if d.capability_id == "sbom_creation"), _kp[0] if _kp else None)
if _kd:
w("**Beispiel-Entwurf — `%s`** _(%s)_" % (_kd.capability_id, _kd.status.value))
w("- **Warum** (aus Pattern): %s" % _kd.why.strip())
w("- **schließt** %s · **Nachweise** %s" % ("+".join(_kd.closes_regulations) or "—", ", ".join(_kd.expected_evidence) or "—"))
w("- **Provenance:** %s" % ", ".join("%s%s" % (k, v) for k, v in _kd.provenance.items()))
w("- **TODO (Experte/Offline-Propose):** %s" % ", ".join(_kd.todo))
w("")
w("_So reviewt der Experte %d Entwürfe statt %d Playbooks zu schreiben. Derselbe Generator bereitet später ISO14001-/IATF-Entwürfe vor, sobald der Corpus da ist._" % (len(_kp), len(_kp)))
w("")
coverage_table([
("Playbook Draft Generator (deterministisch)", "PASS", "%d Entwürfe aus 1 Pattern, kein LLM im Kern" % len(_kp)),
("Provenance + TODO + Freigabestatus", "PASS", "draft_generated→reviewed→validated→proven"),
("Draft-Generatoren neue Domänen (Phase A)", "TODO", "Transition-/Reference-Scenario-Drafts"),
])
# ── Epics + roll-up ───────────────────────────────────────────────────────
w("## Gaps → Epics (Backlog — nur erfasst, NICHT implementiert)")
w("")
w("| Epic | Titel | schliesst Coverage-Luecke |")
w("|---|---|---|")
w("| RS-001 | Interpretation Pattern Library | Sz1 Interpretation PARTIAL -> PASS (CRA-Muster) |")
w("| RS-002 | Environmental Corpus (Pilotdomaene) | Sz2 Environmental UNSUPPORTED -> PASS |")
w("| RS-003 | Capability Linking (cap↔MCAP) + Company-Gap | Sz1/Sz3 Company Gap TODO -> PASS |")
w("| RS-004 | MaschinenVO/EMV Registry Linking | Sz1/Sz2 MaschinenVO/EMV PARTIAL -> PASS |")
w("")
total = len(ROLLUP)
npass = ROLLUP.count("PASS")
w("## Suite-Status (Roll-up)")
w("")
w("- Coverage-Zellen gesamt: **%d**" % total)
w("- PASS: **%d** · PARTIAL: %d · UNSUPPORTED: %d · TODO: %d · N/A: %d · NEEDS_FACTS: %d"
% (npass, ROLLUP.count("PARTIAL"), ROLLUP.count("UNSUPPORTED"), ROLLUP.count("TODO"),
ROLLUP.count("N/A"), ROLLUP.count("NEEDS_FACTS")))
w("- Fortschritt = PASS-Anteil steigt, wenn Epics RS-001…004 landen (objektiver Maßstab, kein LOC).")
print("\n".join(OUT))