# ruff: noqa # mypy: ignore-errors """Rendering helpers for the Reference Scenario Suite generator. Holds the shared mutable output buffers (OUT, ROLLUP) and the small markdown helpers so the generator script (`generate.py`) stays under the LOC budget. Not product code; not imported by the app — only by the generator (run via `PYTHONPATH=. python3 reference_scenarios/generate.py`). """ from __future__ import annotations from typing import List, Tuple Row = Tuple[str, str, str] OUT: List[str] = [] ROLLUP: List[str] = [] def w(s: str = "") -> None: OUT.append(s) def coverage_table(rows: List[Row]) -> None: w("**Architecture Coverage**") w("") w("| Layer | Status | Hinweis |") w("|---|---|---|") for layer, status, note in rows: w("| %s | **%s** | %s |" % (layer, status, note)) ROLLUP.append(status) w("") def reg_map_block(rmap) -> None: w("**Expected Regulatory Map**") w("") w("> " + rmap.executive_summary) w("") for v in rmap.applicable_regulations: obs = ", ".join(o.obligation_id for o in v.obligations) or v.obligations_note w("- **%s** (%s) — Pflichten: %s" % (v.regulation_id, v.name, obs)) for u in rmap.uncertain_regulations: w("- _unsicher_ %s — fehlt: %s" % (u.regulation_id, ", ".join(u.missing_facts) or "-")) for ov in rmap.overlaps: w("- Overlap %s: %s" % (ov.overlap_group_id, ", ".join(ov.shared_obligations))) for ev, ids in rmap.shared_evidence.items(): w("- 1 Nachweis `%s` => %d Pflichten" % (ev, len(ids))) w("") def unsupported_block(rmap) -> None: w("**Expected Unsupported Domains**") w("") if not rmap.unsupported_domains: w("- keine — alle getriggerten Domaenen sind im Korpus") for d in rmap.unsupported_domains: w("- `%s` (Trigger: %s) -> %s" % (d.domain, d.trigger, d.note)) w("") def interp_status(verdict_value: str) -> str: return "PARTIAL" if verdict_value in ("uncertain", "unsupported") else "PASS" def knowledge_intake_section(base_dir) -> None: """Render the Knowledge Intake section (kept here so generate.py stays under the LOC budget).""" import os import yaml from compliance.knowledge_intake import ( DocumentDescriptor, assess_document_impact, build_knowledge_index, ) def _load(sub): d = os.path.join(base_dir, "..", "knowledge", sub) return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8")) for f in sorted(os.listdir(d)) if f.endswith(".yaml")] idx = build_knowledge_index( _load("transition_patterns"), _load("implementation_playbooks"), _load("reference_transition_scenarios"), obligation_index={"CRA": ["cra_obl_1", "cra_obl_2"]}) docs = [ DocumentDescriptor(document_id="ENISA CRA SBOM-FAQ", regulations=["CRA"], keywords=["sbom", "vulnerability"], document_type="faq"), DocumentDescriptor(document_id="EU Umwelt-Leitfaden", regulations=["UmweltVO"], keywords=["wastewater"], document_type="guidance"), DocumentDescriptor(document_id="Marketing-Blog", keywords=["newsletter"], document_type="blog"), ] w("## Knowledge Intake — Impact zuerst, Extraktion später") w("") w('_Vor dem Parser: ein neues Dokument NUR einordnen und seinen Impact auf den bestehenden Wissensbestand bestimmen. „Von N Dokumenten verändern wenige tatsächlich unser Wissen." Deterministisch, keine Extraktion, kein LLM._') w("") w("| Dokument | Impact | betrifft | Empfehlung |") w("|---|---|---|---|") for d in docs: kp = assess_document_impact(d, idx) touch = "neue Domäne" if kp.new_domain else "%dC·%dPB·%dRTS·%dObl" % ( len(kp.affected_capabilities), len(kp.affected_playbooks), len(kp.affected_reference_scenarios), len(kp.affected_obligations)) w("| %s | **%s** | %s | %s |" % (d.document_id, kp.impact_level.value, touch, kp.recommendation.split(" —")[0])) w("") w("**Beispiel-Knowledge-Package** (`%s`): %s" % (docs[0].document_id, assess_document_impact(docs[0], idx).impact_summary)) w("") w('_So entsteht bei jedem neuen Dokument eine Impact-Analyse statt „200 Seiten PDF" — Targeted Updating statt Schreiben._') w("") coverage_table([ ("Knowledge Intake (Klassifikation+Impact)", "PASS", "%d Regelwerke / %d Capabilities im Index" % (len(idx.regulations), len(idx.capability_regulations))), ("Impact-Triage (HIGH/LOW/NONE/new_domain)", "PASS", "3 Beispiel-Dokumente korrekt eingeordnet"), ("Regelwerk-ID-Normalisierung", "TODO", "CRA vs Cyber Resilience Act vereinheitlichen"), ]) def completeness_section() -> None: """Render the Regulatory Completeness section (kept here so generate.py stays under the LOC budget).""" from compliance.completeness import assess_completeness rep = assess_completeness( identified_regulations=["CRA", "MaschinenVO", "EMV", "Environmental", "DataAct"], corpus_status={"CRA": "validated", "MaschinenVO": "validated", "EMV": "unsupported", "Environmental": "unsupported", "DataAct": "validated"}, uncertain=[{"regulation": "DataAct", "deciding_question": "generates_usage_data", "reason": "generates_usage_data = unbekannt"}], assumptions=[{"key": "Funkmodul", "value": "nein"}, {"key": "personenbezogene Nutzungsdaten", "value": "nein"}], assessed_obligations=128) w("## Regulatory Completeness — was wir bewerten konnten, und was bewusst nicht") w("") w('_Interne Qualitätsmaschine (KEIN Confidence-Score): trennt IDENTIFIZIERT von BEWERTET und begründet jede Lücke. Keine Prozentzahl — auditierbar und ehrlich: „Wir zeigen auch, was wir noch nicht wissen und warum."_') w("") w("**%s**" % rep.completeness_summary) w("") w("> %s" % rep.audit_statement) w("") w("- **Bewertet:** %s (%d Pflichten)" % (", ".join(rep.assessed_regulations), rep.assessed_obligations)) w("- **Offen (jeweils begründet):**") for e in rep.exclusions: dq = (" → Rückfrage: `%s`" % e.deciding_question) if e.deciding_question else "" w(" - `%s` — %s `[%s]`%s" % (e.subject, e.reason, e.resolution, dq)) w("- **Annahmen:** %s" % ", ".join("%s=%s" % (a.key, a.value) for a in rep.assumptions)) w("") w("_Sobald der Umwelt-Korpus (ISO 14001 etc.) landet, kippt `Environmental` automatisch von offen auf bewertet — die Completeness Engine dokumentiert den Fortschritt je Domäne._") w("") coverage_table([ ("Regulatory Completeness (auditierbar)", "PASS", rep.completeness_summary), ("Begründete Ausschlüsse (Korpus/Anwendbarkeit)", "PASS", "%d Ausschlüsse, alle mit Grund" % len(rep.exclusions)), ("Fortschritts-Doku je Domäne", "PASS", "Environmental offen→validated bei Korpus-Landung"), ]) def domain_programs_section(base_dir) -> None: """Domain Knowledge Program v1 — per-domain maturity KPI DERIVED from the corpus (computed-not-stored).""" import os import yaml from compliance.knowledge_intake import build_knowledge_index def _load(sub): d = os.path.join(base_dir, "..", "knowledge", sub) return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8")) for f in sorted(os.listdir(d)) if f.endswith(".yaml")] idx = build_knowledge_index(_load("transition_patterns"), _load("implementation_playbooks"), _load("reference_transition_scenarios")) pdir = os.path.join(base_dir, "..", "knowledge", "programs") _all = [yaml.safe_load(open(os.path.join(pdir, f), encoding="utf-8")) for f in sorted(os.listdir(pdir)) if f.endswith(".yaml")] progs = sorted((p for p in _all if "backlog_rank" in p), key=lambda p: p["backlog_rank"]) # domain programs only _ALIAS = {"cyber resilience act": "cra", "maschinenverordnung": "maschinenvo", "iatf": "iatf16949"} def _canon(r): k = str(r).strip().lower() return _ALIAS.get(k, k) def _hits(reg_lists, src): cs = {_canon(s) for s in src} return [k for k, regs in reg_lists.items() if cs & {_canon(x) for x in regs}] def _source_modeled(index, source, canon): c = canon(source) in_tp = any(c in {canon(x) for x in regs} for regs in index.transition_patterns.values()) in_rts = any(c in {canon(x) for x in regs} for regs in index.reference_scenarios.values()) in_pb = any(c in {canon(x) for x in index.capability_regulations.get(cap, [])} for cap in index.playbook_capabilities) return in_tp or in_rts or in_pb w("## Domain Knowledge Program v1 — Reifegrad je Domäne (reproduzierbarer KPI)") w("") w('_Engpass = Domänenmodellierung. Jede Domäne läuft durch DIESELBE 7-Stufen-Produktionsstraße (Domain Model → Requirement Sources → Capability Registry → Transition Patterns → Playbooks → Reference Scenarios → Completeness). Reifegrad aus dem ECHTEN Korpus abgeleitet (computed-not-stored), keine Marketingzahl. Einstieg über Industry, nicht Regelwerk._') w("") w("| Rank | Domäne | Reifegrad (Sources modelliert) | modelliert/total | Korpus TP·PB·RTS |") w("|---|---|---|---|---|") for p in progs: src = p.get("typical_requirement_sources", []) tp, rts = _hits(idx.transition_patterns, src), _hits(idx.reference_scenarios, src) cs = {_canon(s) for s in src} pb = [c for c in idx.playbook_capabilities if cs & {_canon(x) for x in idx.capability_regulations.get(c, [])}] modeled = [s for s in src if _source_modeled(idx, s, _canon)] # sources with >=1 corpus artifact breadth = (len(modeled) / len(src)) if src else 0.0 # honest differentiator (not CRA-shared depth) filled = int(round(breadth * 10)) w("| %d | **%s** | `%s` %d%% | %d/%d | %d·%d·%d |" % ( p.get("backlog_rank", 99), p["name"], "█" * filled + "░" * (10 - filled), int(round(breadth * 100)), len(modeled), len(src), len(tp), len(pb), len(rts))) w("") w('_Industry-Einstieg + ETO-Hypothese: jede Domäne kennt ihre typischen Sources + Zertifikate → vor dem Onboarding „diese Prozesswelt ist wahrscheinlich vorhanden" (Hypothese, nie Wahrheit; speist Company 2A als `inferred`). Backlog nach Kundennutzen, KPI nach echtem Korpusstand — beides bewusst getrennt._') w("") coverage_table([ ("Domain Knowledge Program (7-Stufen-Produktionsstraße)", "PASS", "%d Domänen im Backlog, Industrial Automation #1" % len(progs)), ("Reifegrad-KPI (computed-not-stored)", "PASS", "aus echtem Korpus abgeleitet (TP/PB/RTS je Domäne)"), ("Regelwerk-ID-Normalisierung", "TODO", "Alias CRA/MaschinenVO im KPI — kanonische IDs ausstehend"), ]) def transition_coverage_section(base_dir) -> None: """Transition Coverage — the TRANSITION is the unit of knowledge; status DERIVED from the corpus.""" import os import yaml kdir = os.path.join(base_dir, "..", "knowledge") with open(os.path.join(kdir, "programs", "transitions.yaml"), encoding="utf-8") as h: backlog = yaml.safe_load(h) tpdir = os.path.join(kdir, "transition_patterns") pats = [] for f in sorted(os.listdir(tpdir)): if not f.endswith(".yaml"): continue d = yaml.safe_load(open(os.path.join(tpdir, f), encoding="utf-8")) goal = d.get("transition_goal", {}) frm = goal.get("from", {}).get("standard", "") to = goal.get("to") tos = [it.get("regulation") or it.get("framework") or it.get("target") for it in (to if isinstance(to, list) else [to]) if isinstance(it, dict)] pats.append((frm, [t for t in tos if t], str(d.get("status", "draft")))) _ALIAS = {"isoiec27001": "iso27001", "isoiec62443": "iec62443", "cyberresilienceact": "cra", "maschinenverordnung": "maschinenvo"} def _c(s): k = "".join(ch for ch in str(s).lower() if ch.isalnum()) return _ALIAS.get(k, k) _RANK = {"draft": 1, "reviewed": 2, "validated": 3, "proven": 4} _ICON = {0: "⚪ nicht begonnen", 1: "🟡 Draft", 2: "✅ reviewed", 3: "✅ validated", 4: "✅ Gold"} def _status(frm, to): best = 0 for pf, ptos, st in pats: if _c(pf) == _c(frm) and _c(to) in {_c(x) for x in ptos}: best = max(best, _RANK.get(st, 1)) return best w("## Transition Coverage — die Transition ist die Wissenseinheit (Operational Knowledge)") w("") w('_Der Kunde kauft nicht „EMV-Domain", sondern „wir haben ISO 9001 — helfen Sie uns beim CRA". Die Wissenseinheit ist die TRANSITION (nicht das Gesetz). Status je Transition aus dem echten Pattern-Korpus abgeleitet (computed-not-stored). Drei Ebenen: Regulatory → **Operational (hier, größter Differenzierer)** → Verification (Vision V2)._') w("") w("| Prio | Transition | Status |") w("|---|---|---|") rows = sorted(backlog["transitions"], key=lambda t: -t["priority"]) done = 0 for t in rows: s = _status(t["from"], t["to"]) done += 1 if s else 0 w("| %s | `%s → %s` | %s |" % ("⭐" * t["priority"], t["from"], t["to"], _ICON[s])) w("") gaps = [t for t in rows if _status(t["from"], t["to"]) == 0] if gaps: w('**Größte Lücke (Track B als Nächstes):** `%s → %s` (%s) — höchstnachgefragte Transition OHNE Pattern. Stärkerer Produktindikator als „EMV 30%% modelliert".' % (gaps[0]["from"], gaps[0]["to"], "⭐" * gaps[0]["priority"])) w("") coverage_table([ ("Transition Coverage (Operational Knowledge)", "PASS", "%d von %d Top-Transitionen mit Pattern" % (done, len(rows))), ("Wissenseinheit = Transition (nicht Gesetz)", "PASS", "verkauft wird der Übergang, z. B. ISO9001→CRA"), ("3 Ebenen Regulatory→Operational→Verification", "PASS", "Operational = größter Differenzierer (ADR-010)"), ])