# ruff: noqa # mypy: ignore-errors """Rendering helpers for the Reference Scenario Suite generator. Holds the shared mutable output buffers (OUT, ROLLUP) and the small markdown helpers so the generator script (`generate.py`) stays under the LOC budget. Not product code; not imported by the app — only by the generator (run via `PYTHONPATH=. python3 reference_scenarios/generate.py`). """ from __future__ import annotations from typing import List, Tuple Row = Tuple[str, str, str] OUT: List[str] = [] ROLLUP: List[str] = [] def w(s: str = "") -> None: OUT.append(s) def coverage_table(rows: List[Row]) -> None: w("**Architecture Coverage**") w("") w("| Layer | Status | Hinweis |") w("|---|---|---|") for layer, status, note in rows: w("| %s | **%s** | %s |" % (layer, status, note)) ROLLUP.append(status) w("") def reg_map_block(rmap) -> None: w("**Expected Regulatory Map**") w("") w("> " + rmap.executive_summary) w("") for v in rmap.applicable_regulations: obs = ", ".join(o.obligation_id for o in v.obligations) or v.obligations_note w("- **%s** (%s) — Pflichten: %s" % (v.regulation_id, v.name, obs)) for u in rmap.uncertain_regulations: w("- _unsicher_ %s — fehlt: %s" % (u.regulation_id, ", ".join(u.missing_facts) or "-")) for ov in rmap.overlaps: w("- Overlap %s: %s" % (ov.overlap_group_id, ", ".join(ov.shared_obligations))) for ev, ids in rmap.shared_evidence.items(): w("- 1 Nachweis `%s` => %d Pflichten" % (ev, len(ids))) w("") def unsupported_block(rmap) -> None: w("**Expected Unsupported Domains**") w("") if not rmap.unsupported_domains: w("- keine — alle getriggerten Domaenen sind im Korpus") for d in rmap.unsupported_domains: w("- `%s` (Trigger: %s) -> %s" % (d.domain, d.trigger, d.note)) w("") def interp_status(verdict_value: str) -> str: return "PARTIAL" if verdict_value in ("uncertain", "unsupported") else "PASS" def knowledge_intake_section(base_dir) -> None: """Render the Knowledge Intake section (kept here so generate.py stays under the LOC budget).""" import os import yaml from compliance.knowledge_intake import ( DocumentDescriptor, assess_document_impact, build_knowledge_index, ) def _load(sub): d = os.path.join(base_dir, "..", "knowledge", sub) return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8")) for f in sorted(os.listdir(d)) if f.endswith(".yaml")] idx = build_knowledge_index( _load("transition_patterns"), _load("implementation_playbooks"), _load("reference_transition_scenarios"), obligation_index={"CRA": ["cra_obl_1", "cra_obl_2"]}) docs = [ DocumentDescriptor(document_id="ENISA CRA SBOM-FAQ", regulations=["CRA"], keywords=["sbom", "vulnerability"], document_type="faq"), DocumentDescriptor(document_id="EU Umwelt-Leitfaden", regulations=["UmweltVO"], keywords=["wastewater"], document_type="guidance"), DocumentDescriptor(document_id="Marketing-Blog", keywords=["newsletter"], document_type="blog"), ] w("## Knowledge Intake — Impact zuerst, Extraktion später") w("") w('_Vor dem Parser: ein neues Dokument NUR einordnen und seinen Impact auf den bestehenden Wissensbestand bestimmen. „Von N Dokumenten verändern wenige tatsächlich unser Wissen." Deterministisch, keine Extraktion, kein LLM._') w("") w("| Dokument | Impact | betrifft | Empfehlung |") w("|---|---|---|---|") for d in docs: kp = assess_document_impact(d, idx) touch = "neue Domäne" if kp.new_domain else "%dC·%dPB·%dRTS·%dObl" % ( len(kp.affected_capabilities), len(kp.affected_playbooks), len(kp.affected_reference_scenarios), len(kp.affected_obligations)) w("| %s | **%s** | %s | %s |" % (d.document_id, kp.impact_level.value, touch, kp.recommendation.split(" —")[0])) w("") w("**Beispiel-Knowledge-Package** (`%s`): %s" % (docs[0].document_id, assess_document_impact(docs[0], idx).impact_summary)) w("") w('_So entsteht bei jedem neuen Dokument eine Impact-Analyse statt „200 Seiten PDF" — Targeted Updating statt Schreiben._') w("") coverage_table([ ("Knowledge Intake (Klassifikation+Impact)", "PASS", "%d Regelwerke / %d Capabilities im Index" % (len(idx.regulations), len(idx.capability_regulations))), ("Impact-Triage (HIGH/LOW/NONE/new_domain)", "PASS", "3 Beispiel-Dokumente korrekt eingeordnet"), ("Regelwerk-ID-Normalisierung", "TODO", "CRA vs Cyber Resilience Act vereinheitlichen"), ]) def completeness_section() -> None: """Render the Regulatory Completeness section (kept here so generate.py stays under the LOC budget).""" from compliance.completeness import assess_completeness rep = assess_completeness( identified_regulations=["CRA", "MaschinenVO", "EMV", "Environmental", "DataAct"], corpus_status={"CRA": "validated", "MaschinenVO": "validated", "EMV": "unsupported", "Environmental": "unsupported", "DataAct": "validated"}, uncertain=[{"regulation": "DataAct", "deciding_question": "generates_usage_data", "reason": "generates_usage_data = unbekannt"}], assumptions=[{"key": "Funkmodul", "value": "nein"}, {"key": "personenbezogene Nutzungsdaten", "value": "nein"}], assessed_obligations=128) w("## Regulatory Completeness — was wir bewerten konnten, und was bewusst nicht") w("") w('_Interne Qualitätsmaschine (KEIN Confidence-Score): trennt IDENTIFIZIERT von BEWERTET und begründet jede Lücke. Keine Prozentzahl — auditierbar und ehrlich: „Wir zeigen auch, was wir noch nicht wissen und warum."_') w("") w("**%s**" % rep.completeness_summary) w("") w("> %s" % rep.audit_statement) w("") w("- **Bewertet:** %s (%d Pflichten)" % (", ".join(rep.assessed_regulations), rep.assessed_obligations)) w("- **Offen (jeweils begründet):**") for e in rep.exclusions: dq = (" → Rückfrage: `%s`" % e.deciding_question) if e.deciding_question else "" w(" - `%s` — %s `[%s]`%s" % (e.subject, e.reason, e.resolution, dq)) w("- **Annahmen:** %s" % ", ".join("%s=%s" % (a.key, a.value) for a in rep.assumptions)) w("") w("_Sobald der Umwelt-Korpus (ISO 14001 etc.) landet, kippt `Environmental` automatisch von offen auf bewertet — die Completeness Engine dokumentiert den Fortschritt je Domäne._") w("") coverage_table([ ("Regulatory Completeness (auditierbar)", "PASS", rep.completeness_summary), ("Begründete Ausschlüsse (Korpus/Anwendbarkeit)", "PASS", "%d Ausschlüsse, alle mit Grund" % len(rep.exclusions)), ("Fortschritts-Doku je Domäne", "PASS", "Environmental offen→validated bei Korpus-Landung"), ]) def domain_programs_section(base_dir) -> None: """Domain Knowledge Program v1 — per-domain maturity KPI DERIVED from the corpus (computed-not-stored).""" import os import yaml from compliance.knowledge_intake import build_knowledge_index def _load(sub): d = os.path.join(base_dir, "..", "knowledge", sub) return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8")) for f in sorted(os.listdir(d)) if f.endswith(".yaml")] idx = build_knowledge_index(_load("transition_patterns"), _load("implementation_playbooks"), _load("reference_transition_scenarios")) pdir = os.path.join(base_dir, "..", "knowledge", "programs") progs = sorted((yaml.safe_load(open(os.path.join(pdir, f), encoding="utf-8")) for f in sorted(os.listdir(pdir)) if f.endswith(".yaml")), key=lambda p: p.get("backlog_rank", 99)) _ALIAS = {"cyber resilience act": "cra", "maschinenverordnung": "maschinenvo", "iatf": "iatf16949"} def _canon(r): k = str(r).strip().lower() return _ALIAS.get(k, k) def _hits(reg_lists, src): cs = {_canon(s) for s in src} return [k for k, regs in reg_lists.items() if cs & {_canon(x) for x in regs}] def _source_modeled(index, source, canon): c = canon(source) in_tp = any(c in {canon(x) for x in regs} for regs in index.transition_patterns.values()) in_rts = any(c in {canon(x) for x in regs} for regs in index.reference_scenarios.values()) in_pb = any(c in {canon(x) for x in index.capability_regulations.get(cap, [])} for cap in index.playbook_capabilities) return in_tp or in_rts or in_pb w("## Domain Knowledge Program v1 — Reifegrad je Domäne (reproduzierbarer KPI)") w("") w('_Engpass = Domänenmodellierung. Jede Domäne läuft durch DIESELBE 7-Stufen-Produktionsstraße (Domain Model → Requirement Sources → Capability Registry → Transition Patterns → Playbooks → Reference Scenarios → Completeness). Reifegrad aus dem ECHTEN Korpus abgeleitet (computed-not-stored), keine Marketingzahl. Einstieg über Industry, nicht Regelwerk._') w("") w("| Rank | Domäne | Reifegrad (Sources modelliert) | modelliert/total | Korpus TP·PB·RTS |") w("|---|---|---|---|---|") for p in progs: src = p.get("typical_requirement_sources", []) tp, rts = _hits(idx.transition_patterns, src), _hits(idx.reference_scenarios, src) cs = {_canon(s) for s in src} pb = [c for c in idx.playbook_capabilities if cs & {_canon(x) for x in idx.capability_regulations.get(c, [])}] modeled = [s for s in src if _source_modeled(idx, s, _canon)] # sources with >=1 corpus artifact breadth = (len(modeled) / len(src)) if src else 0.0 # honest differentiator (not CRA-shared depth) filled = int(round(breadth * 10)) w("| %d | **%s** | `%s` %d%% | %d/%d | %d·%d·%d |" % ( p.get("backlog_rank", 99), p["name"], "█" * filled + "░" * (10 - filled), int(round(breadth * 100)), len(modeled), len(src), len(tp), len(pb), len(rts))) w("") w('_Industry-Einstieg + ETO-Hypothese: jede Domäne kennt ihre typischen Sources + Zertifikate → vor dem Onboarding „diese Prozesswelt ist wahrscheinlich vorhanden" (Hypothese, nie Wahrheit; speist Company 2A als `inferred`). Backlog nach Kundennutzen, KPI nach echtem Korpusstand — beides bewusst getrennt._') w("") coverage_table([ ("Domain Knowledge Program (7-Stufen-Produktionsstraße)", "PASS", "%d Domänen im Backlog, Industrial Automation #1" % len(progs)), ("Reifegrad-KPI (computed-not-stored)", "PASS", "aus echtem Korpus abgeleitet (TP/PB/RTS je Domäne)"), ("Regelwerk-ID-Normalisierung", "TODO", "Alias CRA/MaschinenVO im KPI — kanonische IDs ausstehend"), ])