18f5d0cb05
Customers don't buy "EMV domain"; they buy "we have ISO 9001, help us with the CRA". The sellable unit of knowledge is the TRANSITION (from -> to), not the law and not the capability. This reframes the backlog from "model EMV next" to "the top demanded transitions". No new runtime framework (ADR-010). - knowledge/programs/transitions.yaml: the Operational Knowledge backlog — the ~20-30 actually demanded transitions (of ~N*(N-1) possible) with priority. ISO27001->CRA, ISO9001->CRA, ISO9001->MaschinenVO (all 5-star), IEC62443->CRA, TISAX->CRA, ISO27001/IEC62443->NIS2, ISO14001->Umweltrecht. - Transition Coverage KPI (reference suite, computed-not-stored): per transition a status DERIVED from the transition-pattern corpus (reviewed/validated/proven -> Gold, draft -> 🟡, none -> ⚪). Honest current state: ISO27001->CRA ✅ reviewed, ISO9001->CRA 🟡 draft, rest ⚪. Highest-priority gap = ISO9001->MaschinenVO (the next Track-B work) — a far stronger product indicator than "EMV 30% modelled". - Three knowledge layers documented: Regulatory -> Operational (transitions/playbooks/deltas, the biggest differentiator) -> Verification (Vision V2). A domain is a TRANSITION PROGRAM with two tracks: Track A breadth (model sources, @Legal-KG/@Execution) + Track B product (transitions/playbooks/RTS per source, @Reasoning). - ADR-010: the transition is the unit of knowledge; Transition Coverage KPI; three layers; two tracks. 10 program/transition-contract tests, check-loc 0. Knowledge data + ADR + reference harness = non-runtime -> no deploy (ADR-001). No new module, no runtime change. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
266 lines
14 KiB
Python
266 lines
14 KiB
Python
# ruff: noqa
|
|
# mypy: ignore-errors
|
|
"""Rendering helpers for the Reference Scenario Suite generator.
|
|
|
|
Holds the shared mutable output buffers (OUT, ROLLUP) and the small markdown helpers so the
|
|
generator script (`generate.py`) stays under the LOC budget. Not product code; not imported by
|
|
the app — only by the generator (run via `PYTHONPATH=. python3 reference_scenarios/generate.py`).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import List, Tuple
|
|
|
|
Row = Tuple[str, str, str]
|
|
OUT: List[str] = []
|
|
ROLLUP: List[str] = []
|
|
|
|
|
|
def w(s: str = "") -> None:
|
|
OUT.append(s)
|
|
|
|
|
|
def coverage_table(rows: List[Row]) -> None:
|
|
w("**Architecture Coverage**")
|
|
w("")
|
|
w("| Layer | Status | Hinweis |")
|
|
w("|---|---|---|")
|
|
for layer, status, note in rows:
|
|
w("| %s | **%s** | %s |" % (layer, status, note))
|
|
ROLLUP.append(status)
|
|
w("")
|
|
|
|
|
|
def reg_map_block(rmap) -> None:
|
|
w("**Expected Regulatory Map**")
|
|
w("")
|
|
w("> " + rmap.executive_summary)
|
|
w("")
|
|
for v in rmap.applicable_regulations:
|
|
obs = ", ".join(o.obligation_id for o in v.obligations) or v.obligations_note
|
|
w("- **%s** (%s) — Pflichten: %s" % (v.regulation_id, v.name, obs))
|
|
for u in rmap.uncertain_regulations:
|
|
w("- _unsicher_ %s — fehlt: %s" % (u.regulation_id, ", ".join(u.missing_facts) or "-"))
|
|
for ov in rmap.overlaps:
|
|
w("- Overlap %s: %s" % (ov.overlap_group_id, ", ".join(ov.shared_obligations)))
|
|
for ev, ids in rmap.shared_evidence.items():
|
|
w("- 1 Nachweis `%s` => %d Pflichten" % (ev, len(ids)))
|
|
w("")
|
|
|
|
|
|
def unsupported_block(rmap) -> None:
|
|
w("**Expected Unsupported Domains**")
|
|
w("")
|
|
if not rmap.unsupported_domains:
|
|
w("- keine — alle getriggerten Domaenen sind im Korpus")
|
|
for d in rmap.unsupported_domains:
|
|
w("- `%s` (Trigger: %s) -> %s" % (d.domain, d.trigger, d.note))
|
|
w("")
|
|
|
|
|
|
def interp_status(verdict_value: str) -> str:
|
|
return "PARTIAL" if verdict_value in ("uncertain", "unsupported") else "PASS"
|
|
|
|
|
|
def knowledge_intake_section(base_dir) -> None:
|
|
"""Render the Knowledge Intake section (kept here so generate.py stays under the LOC budget)."""
|
|
import os
|
|
import yaml
|
|
from compliance.knowledge_intake import (
|
|
DocumentDescriptor, assess_document_impact, build_knowledge_index,
|
|
)
|
|
|
|
def _load(sub):
|
|
d = os.path.join(base_dir, "..", "knowledge", sub)
|
|
return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8"))
|
|
for f in sorted(os.listdir(d)) if f.endswith(".yaml")]
|
|
|
|
idx = build_knowledge_index(
|
|
_load("transition_patterns"), _load("implementation_playbooks"),
|
|
_load("reference_transition_scenarios"), obligation_index={"CRA": ["cra_obl_1", "cra_obl_2"]})
|
|
docs = [
|
|
DocumentDescriptor(document_id="ENISA CRA SBOM-FAQ", regulations=["CRA"], keywords=["sbom", "vulnerability"], document_type="faq"),
|
|
DocumentDescriptor(document_id="EU Umwelt-Leitfaden", regulations=["UmweltVO"], keywords=["wastewater"], document_type="guidance"),
|
|
DocumentDescriptor(document_id="Marketing-Blog", keywords=["newsletter"], document_type="blog"),
|
|
]
|
|
w("## Knowledge Intake — Impact zuerst, Extraktion später")
|
|
w("")
|
|
w('_Vor dem Parser: ein neues Dokument NUR einordnen und seinen Impact auf den bestehenden Wissensbestand bestimmen. „Von N Dokumenten verändern wenige tatsächlich unser Wissen." Deterministisch, keine Extraktion, kein LLM._')
|
|
w("")
|
|
w("| Dokument | Impact | betrifft | Empfehlung |")
|
|
w("|---|---|---|---|")
|
|
for d in docs:
|
|
kp = assess_document_impact(d, idx)
|
|
touch = "neue Domäne" if kp.new_domain else "%dC·%dPB·%dRTS·%dObl" % (
|
|
len(kp.affected_capabilities), len(kp.affected_playbooks),
|
|
len(kp.affected_reference_scenarios), len(kp.affected_obligations))
|
|
w("| %s | **%s** | %s | %s |" % (d.document_id, kp.impact_level.value, touch, kp.recommendation.split(" —")[0]))
|
|
w("")
|
|
w("**Beispiel-Knowledge-Package** (`%s`): %s" % (docs[0].document_id, assess_document_impact(docs[0], idx).impact_summary))
|
|
w("")
|
|
w('_So entsteht bei jedem neuen Dokument eine Impact-Analyse statt „200 Seiten PDF" — Targeted Updating statt Schreiben._')
|
|
w("")
|
|
coverage_table([
|
|
("Knowledge Intake (Klassifikation+Impact)", "PASS", "%d Regelwerke / %d Capabilities im Index" % (len(idx.regulations), len(idx.capability_regulations))),
|
|
("Impact-Triage (HIGH/LOW/NONE/new_domain)", "PASS", "3 Beispiel-Dokumente korrekt eingeordnet"),
|
|
("Regelwerk-ID-Normalisierung", "TODO", "CRA vs Cyber Resilience Act vereinheitlichen"),
|
|
])
|
|
|
|
|
|
def completeness_section() -> None:
|
|
"""Render the Regulatory Completeness section (kept here so generate.py stays under the LOC budget)."""
|
|
from compliance.completeness import assess_completeness
|
|
|
|
rep = assess_completeness(
|
|
identified_regulations=["CRA", "MaschinenVO", "EMV", "Environmental", "DataAct"],
|
|
corpus_status={"CRA": "validated", "MaschinenVO": "validated", "EMV": "unsupported",
|
|
"Environmental": "unsupported", "DataAct": "validated"},
|
|
uncertain=[{"regulation": "DataAct", "deciding_question": "generates_usage_data", "reason": "generates_usage_data = unbekannt"}],
|
|
assumptions=[{"key": "Funkmodul", "value": "nein"}, {"key": "personenbezogene Nutzungsdaten", "value": "nein"}],
|
|
assessed_obligations=128)
|
|
w("## Regulatory Completeness — was wir bewerten konnten, und was bewusst nicht")
|
|
w("")
|
|
w('_Interne Qualitätsmaschine (KEIN Confidence-Score): trennt IDENTIFIZIERT von BEWERTET und begründet jede Lücke. Keine Prozentzahl — auditierbar und ehrlich: „Wir zeigen auch, was wir noch nicht wissen und warum."_')
|
|
w("")
|
|
w("**%s**" % rep.completeness_summary)
|
|
w("")
|
|
w("> %s" % rep.audit_statement)
|
|
w("")
|
|
w("- **Bewertet:** %s (%d Pflichten)" % (", ".join(rep.assessed_regulations), rep.assessed_obligations))
|
|
w("- **Offen (jeweils begründet):**")
|
|
for e in rep.exclusions:
|
|
dq = (" → Rückfrage: `%s`" % e.deciding_question) if e.deciding_question else ""
|
|
w(" - `%s` — %s `[%s]`%s" % (e.subject, e.reason, e.resolution, dq))
|
|
w("- **Annahmen:** %s" % ", ".join("%s=%s" % (a.key, a.value) for a in rep.assumptions))
|
|
w("")
|
|
w("_Sobald der Umwelt-Korpus (ISO 14001 etc.) landet, kippt `Environmental` automatisch von offen auf bewertet — die Completeness Engine dokumentiert den Fortschritt je Domäne._")
|
|
w("")
|
|
coverage_table([
|
|
("Regulatory Completeness (auditierbar)", "PASS", rep.completeness_summary),
|
|
("Begründete Ausschlüsse (Korpus/Anwendbarkeit)", "PASS", "%d Ausschlüsse, alle mit Grund" % len(rep.exclusions)),
|
|
("Fortschritts-Doku je Domäne", "PASS", "Environmental offen→validated bei Korpus-Landung"),
|
|
])
|
|
|
|
|
|
def domain_programs_section(base_dir) -> None:
|
|
"""Domain Knowledge Program v1 — per-domain maturity KPI DERIVED from the corpus (computed-not-stored)."""
|
|
import os
|
|
import yaml
|
|
from compliance.knowledge_intake import build_knowledge_index
|
|
|
|
def _load(sub):
|
|
d = os.path.join(base_dir, "..", "knowledge", sub)
|
|
return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8"))
|
|
for f in sorted(os.listdir(d)) if f.endswith(".yaml")]
|
|
|
|
idx = build_knowledge_index(_load("transition_patterns"), _load("implementation_playbooks"),
|
|
_load("reference_transition_scenarios"))
|
|
pdir = os.path.join(base_dir, "..", "knowledge", "programs")
|
|
_all = [yaml.safe_load(open(os.path.join(pdir, f), encoding="utf-8"))
|
|
for f in sorted(os.listdir(pdir)) if f.endswith(".yaml")]
|
|
progs = sorted((p for p in _all if "backlog_rank" in p), key=lambda p: p["backlog_rank"]) # domain programs only
|
|
|
|
_ALIAS = {"cyber resilience act": "cra", "maschinenverordnung": "maschinenvo", "iatf": "iatf16949"}
|
|
|
|
def _canon(r):
|
|
k = str(r).strip().lower()
|
|
return _ALIAS.get(k, k)
|
|
|
|
def _hits(reg_lists, src):
|
|
cs = {_canon(s) for s in src}
|
|
return [k for k, regs in reg_lists.items() if cs & {_canon(x) for x in regs}]
|
|
|
|
def _source_modeled(index, source, canon):
|
|
c = canon(source)
|
|
in_tp = any(c in {canon(x) for x in regs} for regs in index.transition_patterns.values())
|
|
in_rts = any(c in {canon(x) for x in regs} for regs in index.reference_scenarios.values())
|
|
in_pb = any(c in {canon(x) for x in index.capability_regulations.get(cap, [])} for cap in index.playbook_capabilities)
|
|
return in_tp or in_rts or in_pb
|
|
|
|
w("## Domain Knowledge Program v1 — Reifegrad je Domäne (reproduzierbarer KPI)")
|
|
w("")
|
|
w('_Engpass = Domänenmodellierung. Jede Domäne läuft durch DIESELBE 7-Stufen-Produktionsstraße (Domain Model → Requirement Sources → Capability Registry → Transition Patterns → Playbooks → Reference Scenarios → Completeness). Reifegrad aus dem ECHTEN Korpus abgeleitet (computed-not-stored), keine Marketingzahl. Einstieg über Industry, nicht Regelwerk._')
|
|
w("")
|
|
w("| Rank | Domäne | Reifegrad (Sources modelliert) | modelliert/total | Korpus TP·PB·RTS |")
|
|
w("|---|---|---|---|---|")
|
|
for p in progs:
|
|
src = p.get("typical_requirement_sources", [])
|
|
tp, rts = _hits(idx.transition_patterns, src), _hits(idx.reference_scenarios, src)
|
|
cs = {_canon(s) for s in src}
|
|
pb = [c for c in idx.playbook_capabilities if cs & {_canon(x) for x in idx.capability_regulations.get(c, [])}]
|
|
modeled = [s for s in src if _source_modeled(idx, s, _canon)] # sources with >=1 corpus artifact
|
|
breadth = (len(modeled) / len(src)) if src else 0.0 # honest differentiator (not CRA-shared depth)
|
|
filled = int(round(breadth * 10))
|
|
w("| %d | **%s** | `%s` %d%% | %d/%d | %d·%d·%d |" % (
|
|
p.get("backlog_rank", 99), p["name"], "█" * filled + "░" * (10 - filled),
|
|
int(round(breadth * 100)), len(modeled), len(src), len(tp), len(pb), len(rts)))
|
|
w("")
|
|
w('_Industry-Einstieg + ETO-Hypothese: jede Domäne kennt ihre typischen Sources + Zertifikate → vor dem Onboarding „diese Prozesswelt ist wahrscheinlich vorhanden" (Hypothese, nie Wahrheit; speist Company 2A als `inferred`). Backlog nach Kundennutzen, KPI nach echtem Korpusstand — beides bewusst getrennt._')
|
|
w("")
|
|
coverage_table([
|
|
("Domain Knowledge Program (7-Stufen-Produktionsstraße)", "PASS", "%d Domänen im Backlog, Industrial Automation #1" % len(progs)),
|
|
("Reifegrad-KPI (computed-not-stored)", "PASS", "aus echtem Korpus abgeleitet (TP/PB/RTS je Domäne)"),
|
|
("Regelwerk-ID-Normalisierung", "TODO", "Alias CRA/MaschinenVO im KPI — kanonische IDs ausstehend"),
|
|
])
|
|
|
|
|
|
def transition_coverage_section(base_dir) -> None:
|
|
"""Transition Coverage — the TRANSITION is the unit of knowledge; status DERIVED from the corpus."""
|
|
import os
|
|
import yaml
|
|
|
|
kdir = os.path.join(base_dir, "..", "knowledge")
|
|
with open(os.path.join(kdir, "programs", "transitions.yaml"), encoding="utf-8") as h:
|
|
backlog = yaml.safe_load(h)
|
|
tpdir = os.path.join(kdir, "transition_patterns")
|
|
pats = []
|
|
for f in sorted(os.listdir(tpdir)):
|
|
if not f.endswith(".yaml"):
|
|
continue
|
|
d = yaml.safe_load(open(os.path.join(tpdir, f), encoding="utf-8"))
|
|
goal = d.get("transition_goal", {})
|
|
frm = goal.get("from", {}).get("standard", "")
|
|
to = goal.get("to")
|
|
tos = [it.get("regulation") or it.get("framework") or it.get("target")
|
|
for it in (to if isinstance(to, list) else [to]) if isinstance(it, dict)]
|
|
pats.append((frm, [t for t in tos if t], str(d.get("status", "draft"))))
|
|
|
|
_ALIAS = {"isoiec27001": "iso27001", "isoiec62443": "iec62443",
|
|
"cyberresilienceact": "cra", "maschinenverordnung": "maschinenvo"}
|
|
|
|
def _c(s):
|
|
k = "".join(ch for ch in str(s).lower() if ch.isalnum())
|
|
return _ALIAS.get(k, k)
|
|
|
|
_RANK = {"draft": 1, "reviewed": 2, "validated": 3, "proven": 4}
|
|
_ICON = {0: "⚪ nicht begonnen", 1: "🟡 Draft", 2: "✅ reviewed", 3: "✅ validated", 4: "✅ Gold"}
|
|
|
|
def _status(frm, to):
|
|
best = 0
|
|
for pf, ptos, st in pats:
|
|
if _c(pf) == _c(frm) and _c(to) in {_c(x) for x in ptos}:
|
|
best = max(best, _RANK.get(st, 1))
|
|
return best
|
|
|
|
w("## Transition Coverage — die Transition ist die Wissenseinheit (Operational Knowledge)")
|
|
w("")
|
|
w('_Der Kunde kauft nicht „EMV-Domain", sondern „wir haben ISO 9001 — helfen Sie uns beim CRA". Die Wissenseinheit ist die TRANSITION (nicht das Gesetz). Status je Transition aus dem echten Pattern-Korpus abgeleitet (computed-not-stored). Drei Ebenen: Regulatory → **Operational (hier, größter Differenzierer)** → Verification (Vision V2)._')
|
|
w("")
|
|
w("| Prio | Transition | Status |")
|
|
w("|---|---|---|")
|
|
rows = sorted(backlog["transitions"], key=lambda t: -t["priority"])
|
|
done = 0
|
|
for t in rows:
|
|
s = _status(t["from"], t["to"])
|
|
done += 1 if s else 0
|
|
w("| %s | `%s → %s` | %s |" % ("⭐" * t["priority"], t["from"], t["to"], _ICON[s]))
|
|
w("")
|
|
gaps = [t for t in rows if _status(t["from"], t["to"]) == 0]
|
|
if gaps:
|
|
w('**Größte Lücke (Track B als Nächstes):** `%s → %s` (%s) — höchstnachgefragte Transition OHNE Pattern. Stärkerer Produktindikator als „EMV 30%% modelliert".' % (gaps[0]["from"], gaps[0]["to"], "⭐" * gaps[0]["priority"]))
|
|
w("")
|
|
coverage_table([
|
|
("Transition Coverage (Operational Knowledge)", "PASS", "%d von %d Top-Transitionen mit Pattern" % (done, len(rows))),
|
|
("Wissenseinheit = Transition (nicht Gesetz)", "PASS", "verkauft wird der Übergang, z. B. ISO9001→CRA"),
|
|
("3 Ebenen Regulatory→Operational→Verification", "PASS", "Operational = größter Differenzierer (ADR-010)"),
|
|
])
|