Files
breakpilot-compliance/backend-compliance/reference_scenarios/_helpers.py
T
Benjamin Admin ecae5bc7f1 feat(vocabulary): Domain Vocabulary — identity vs representation; regulation aliases fix the KPI normalization
Before the next Journey: the LANGUAGE. With 5 knowledge objects but no vocabulary, the same reise gets
named four different ways (ISO9001->MaschinenVO vs Quality Management->Product Safety vs ...). The spec
answers ONE question: which terms are IDENTITIES and which are REPRESENTATIONS of the same meaning?

- spec docs-src/architecture/domain-vocabulary-spec-v1.md (PROPOSAL): identity hierarchy
  (Requirement RQ / Capability MCAP [Registry 2C] / regulation-source-target / Journey Class MJRN
  [PROVISIONAL] / Journey instance / Playbook MPLB); canonical name + aliases; capability vocabulary =
  the Capability Registry (not rebuilt); reorder Vocabulary -> Transition #2 -> #3 -> Rule of Three.
- knowledge/vocabulary/regulations.yaml: regulation/standard IDENTITIES (id + canonical + aliases).
  SOLVES the regulation-ID normalization the KPIs flagged: CRA == "Cyber Resilience Act" == "Regulation
  (EU) 2024/2847" all resolve to `cra`; ISO9001/QMS -> iso9001; etc. Shared artifact (@Legal-KG/@Execution
  please adopt).
- knowledge/vocabulary/journey_classes.yaml (PROVISIONAL): clusters our transitions into classes
  (Information Security -> Product Cybersecurity; Quality Management -> Product Compliance/Safety).
  Finding: ISO9001->MaschinenVO is an INSTANCE of an existing class (like ISO9001->CRA, ISO13485->MDR),
  not a new kind -> avoids duplication. Journey Class is a new abstraction -> its own Rule of Three (no
  MJRN minting yet).
- reference suite: both KPIs now read aliases from regulations.yaml instead of hard-coded maps; the
  "Regelwerk-ID-Normalisierung" line flips TODO -> PASS. KPI numbers unchanged (vocab is a superset).
- Side effect = Requirements Intelligence: a Tender "Security Patch Procedure" resolves to MCAP-0017.

7 vocabulary tests (17 with domain programs), check-loc 0. Knowledge data + spec + reference harness =
non-runtime -> no deploy (ADR-001). No new module, no runtime change, no minting (Freeze).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-28 08:11:30 +02:00

282 lines
14 KiB
Python

# ruff: noqa
# mypy: ignore-errors
"""Rendering helpers for the Reference Scenario Suite generator.
Holds the shared mutable output buffers (OUT, ROLLUP) and the small markdown helpers so the
generator script (`generate.py`) stays under the LOC budget. Not product code; not imported by
the app — only by the generator (run via `PYTHONPATH=. python3 reference_scenarios/generate.py`).
"""
from __future__ import annotations
from typing import List, Tuple
Row = Tuple[str, str, str]
OUT: List[str] = []
ROLLUP: List[str] = []
def w(s: str = "") -> None:
OUT.append(s)
def coverage_table(rows: List[Row]) -> None:
w("**Architecture Coverage**")
w("")
w("| Layer | Status | Hinweis |")
w("|---|---|---|")
for layer, status, note in rows:
w("| %s | **%s** | %s |" % (layer, status, note))
ROLLUP.append(status)
w("")
def reg_map_block(rmap) -> None:
w("**Expected Regulatory Map**")
w("")
w("> " + rmap.executive_summary)
w("")
for v in rmap.applicable_regulations:
obs = ", ".join(o.obligation_id for o in v.obligations) or v.obligations_note
w("- **%s** (%s) — Pflichten: %s" % (v.regulation_id, v.name, obs))
for u in rmap.uncertain_regulations:
w("- _unsicher_ %s — fehlt: %s" % (u.regulation_id, ", ".join(u.missing_facts) or "-"))
for ov in rmap.overlaps:
w("- Overlap %s: %s" % (ov.overlap_group_id, ", ".join(ov.shared_obligations)))
for ev, ids in rmap.shared_evidence.items():
w("- 1 Nachweis `%s` => %d Pflichten" % (ev, len(ids)))
w("")
def unsupported_block(rmap) -> None:
w("**Expected Unsupported Domains**")
w("")
if not rmap.unsupported_domains:
w("- keine — alle getriggerten Domaenen sind im Korpus")
for d in rmap.unsupported_domains:
w("- `%s` (Trigger: %s) -> %s" % (d.domain, d.trigger, d.note))
w("")
def interp_status(verdict_value: str) -> str:
return "PARTIAL" if verdict_value in ("uncertain", "unsupported") else "PASS"
def knowledge_intake_section(base_dir) -> None:
"""Render the Knowledge Intake section (kept here so generate.py stays under the LOC budget)."""
import os
import yaml
from compliance.knowledge_intake import (
DocumentDescriptor, assess_document_impact, build_knowledge_index,
)
def _load(sub):
d = os.path.join(base_dir, "..", "knowledge", sub)
return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8"))
for f in sorted(os.listdir(d)) if f.endswith(".yaml")]
idx = build_knowledge_index(
_load("transition_patterns"), _load("implementation_playbooks"),
_load("reference_transition_scenarios"), obligation_index={"CRA": ["cra_obl_1", "cra_obl_2"]})
docs = [
DocumentDescriptor(document_id="ENISA CRA SBOM-FAQ", regulations=["CRA"], keywords=["sbom", "vulnerability"], document_type="faq"),
DocumentDescriptor(document_id="EU Umwelt-Leitfaden", regulations=["UmweltVO"], keywords=["wastewater"], document_type="guidance"),
DocumentDescriptor(document_id="Marketing-Blog", keywords=["newsletter"], document_type="blog"),
]
w("## Knowledge Intake — Impact zuerst, Extraktion später")
w("")
w('_Vor dem Parser: ein neues Dokument NUR einordnen und seinen Impact auf den bestehenden Wissensbestand bestimmen. „Von N Dokumenten verändern wenige tatsächlich unser Wissen." Deterministisch, keine Extraktion, kein LLM._')
w("")
w("| Dokument | Impact | betrifft | Empfehlung |")
w("|---|---|---|---|")
for d in docs:
kp = assess_document_impact(d, idx)
touch = "neue Domäne" if kp.new_domain else "%d%dPB·%dRTS·%dObl" % (
len(kp.affected_capabilities), len(kp.affected_playbooks),
len(kp.affected_reference_scenarios), len(kp.affected_obligations))
w("| %s | **%s** | %s | %s |" % (d.document_id, kp.impact_level.value, touch, kp.recommendation.split(" —")[0]))
w("")
w("**Beispiel-Knowledge-Package** (`%s`): %s" % (docs[0].document_id, assess_document_impact(docs[0], idx).impact_summary))
w("")
w('_So entsteht bei jedem neuen Dokument eine Impact-Analyse statt „200 Seiten PDF" — Targeted Updating statt Schreiben._')
w("")
coverage_table([
("Knowledge Intake (Klassifikation+Impact)", "PASS", "%d Regelwerke / %d Capabilities im Index" % (len(idx.regulations), len(idx.capability_regulations))),
("Impact-Triage (HIGH/LOW/NONE/new_domain)", "PASS", "3 Beispiel-Dokumente korrekt eingeordnet"),
("Regelwerk-ID-Normalisierung", "TODO", "CRA vs Cyber Resilience Act vereinheitlichen"),
])
def completeness_section() -> None:
"""Render the Regulatory Completeness section (kept here so generate.py stays under the LOC budget)."""
from compliance.completeness import assess_completeness
rep = assess_completeness(
identified_regulations=["CRA", "MaschinenVO", "EMV", "Environmental", "DataAct"],
corpus_status={"CRA": "validated", "MaschinenVO": "validated", "EMV": "unsupported",
"Environmental": "unsupported", "DataAct": "validated"},
uncertain=[{"regulation": "DataAct", "deciding_question": "generates_usage_data", "reason": "generates_usage_data = unbekannt"}],
assumptions=[{"key": "Funkmodul", "value": "nein"}, {"key": "personenbezogene Nutzungsdaten", "value": "nein"}],
assessed_obligations=128)
w("## Regulatory Completeness — was wir bewerten konnten, und was bewusst nicht")
w("")
w('_Interne Qualitätsmaschine (KEIN Confidence-Score): trennt IDENTIFIZIERT von BEWERTET und begründet jede Lücke. Keine Prozentzahl — auditierbar und ehrlich: „Wir zeigen auch, was wir noch nicht wissen und warum."_')
w("")
w("**%s**" % rep.completeness_summary)
w("")
w("> %s" % rep.audit_statement)
w("")
w("- **Bewertet:** %s (%d Pflichten)" % (", ".join(rep.assessed_regulations), rep.assessed_obligations))
w("- **Offen (jeweils begründet):**")
for e in rep.exclusions:
dq = (" → Rückfrage: `%s`" % e.deciding_question) if e.deciding_question else ""
w(" - `%s` — %s `[%s]`%s" % (e.subject, e.reason, e.resolution, dq))
w("- **Annahmen:** %s" % ", ".join("%s=%s" % (a.key, a.value) for a in rep.assumptions))
w("")
w("_Sobald der Umwelt-Korpus (ISO 14001 etc.) landet, kippt `Environmental` automatisch von offen auf bewertet — die Completeness Engine dokumentiert den Fortschritt je Domäne._")
w("")
coverage_table([
("Regulatory Completeness (auditierbar)", "PASS", rep.completeness_summary),
("Begründete Ausschlüsse (Korpus/Anwendbarkeit)", "PASS", "%d Ausschlüsse, alle mit Grund" % len(rep.exclusions)),
("Fortschritts-Doku je Domäne", "PASS", "Environmental offen→validated bei Korpus-Landung"),
])
def _regulation_aliases(base_dir):
"""Build a normalized alias -> canonical-id map from the Domain Vocabulary (regulations.yaml)."""
import os
import yaml
path = os.path.join(base_dir, "..", "knowledge", "vocabulary", "regulations.yaml")
amap = {}
with open(path, encoding="utf-8") as h:
for r in (yaml.safe_load(h) or {}).get("regulations", []):
for name in [r["canonical"]] + list(r.get("aliases", [])):
amap["".join(c for c in str(name).lower() if c.isalnum())] = r["id"]
return amap
def _canon_reg(s, amap):
"""Canonicalize a regulation string via the vocabulary (replaces the old hard-coded alias maps)."""
return amap.get("".join(c for c in str(s).lower() if c.isalnum()),
"".join(c for c in str(s).lower() if c.isalnum()))
def domain_programs_section(base_dir) -> None:
"""Domain Knowledge Program v1 — per-domain maturity KPI DERIVED from the corpus (computed-not-stored)."""
import os
import yaml
from compliance.knowledge_intake import build_knowledge_index
def _load(sub):
d = os.path.join(base_dir, "..", "knowledge", sub)
return [yaml.safe_load(open(os.path.join(d, f), encoding="utf-8"))
for f in sorted(os.listdir(d)) if f.endswith(".yaml")]
idx = build_knowledge_index(_load("transition_patterns"), _load("implementation_playbooks"),
_load("reference_transition_scenarios"))
pdir = os.path.join(base_dir, "..", "knowledge", "programs")
_all = [yaml.safe_load(open(os.path.join(pdir, f), encoding="utf-8"))
for f in sorted(os.listdir(pdir)) if f.endswith(".yaml")]
progs = sorted((p for p in _all if "backlog_rank" in p), key=lambda p: p["backlog_rank"]) # domain programs only
_amap = _regulation_aliases(base_dir) # Domain Vocabulary (regulations.yaml)
def _canon(r):
return _canon_reg(r, _amap)
def _hits(reg_lists, src):
cs = {_canon(s) for s in src}
return [k for k, regs in reg_lists.items() if cs & {_canon(x) for x in regs}]
def _source_modeled(index, source, canon):
c = canon(source)
in_tp = any(c in {canon(x) for x in regs} for regs in index.transition_patterns.values())
in_rts = any(c in {canon(x) for x in regs} for regs in index.reference_scenarios.values())
in_pb = any(c in {canon(x) for x in index.capability_regulations.get(cap, [])} for cap in index.playbook_capabilities)
return in_tp or in_rts or in_pb
w("## Domain Knowledge Program v1 — Reifegrad je Domäne (reproduzierbarer KPI)")
w("")
w('_Engpass = Domänenmodellierung. Jede Domäne läuft durch DIESELBE 7-Stufen-Produktionsstraße (Domain Model → Requirement Sources → Capability Registry → Transition Patterns → Playbooks → Reference Scenarios → Completeness). Reifegrad aus dem ECHTEN Korpus abgeleitet (computed-not-stored), keine Marketingzahl. Einstieg über Industry, nicht Regelwerk._')
w("")
w("| Rank | Domäne | Reifegrad (Sources modelliert) | modelliert/total | Korpus TP·PB·RTS |")
w("|---|---|---|---|---|")
for p in progs:
src = p.get("typical_requirement_sources", [])
tp, rts = _hits(idx.transition_patterns, src), _hits(idx.reference_scenarios, src)
cs = {_canon(s) for s in src}
pb = [c for c in idx.playbook_capabilities if cs & {_canon(x) for x in idx.capability_regulations.get(c, [])}]
modeled = [s for s in src if _source_modeled(idx, s, _canon)] # sources with >=1 corpus artifact
breadth = (len(modeled) / len(src)) if src else 0.0 # honest differentiator (not CRA-shared depth)
filled = int(round(breadth * 10))
w("| %d | **%s** | `%s` %d%% | %d/%d | %d·%d·%d |" % (
p.get("backlog_rank", 99), p["name"], "█" * filled + "░" * (10 - filled),
int(round(breadth * 100)), len(modeled), len(src), len(tp), len(pb), len(rts)))
w("")
w('_Industry-Einstieg + ETO-Hypothese: jede Domäne kennt ihre typischen Sources + Zertifikate → vor dem Onboarding „diese Prozesswelt ist wahrscheinlich vorhanden" (Hypothese, nie Wahrheit; speist Company 2A als `inferred`). Backlog nach Kundennutzen, KPI nach echtem Korpusstand — beides bewusst getrennt._')
w("")
coverage_table([
("Domain Knowledge Program (7-Stufen-Produktionsstraße)", "PASS", "%d Domänen im Backlog, Industrial Automation #1" % len(progs)),
("Reifegrad-KPI (computed-not-stored)", "PASS", "aus echtem Korpus abgeleitet (TP/PB/RTS je Domäne)"),
("Regelwerk-ID-Normalisierung (Domain Vocabulary)", "PASS", "Aliase aus `vocabulary/regulations.yaml`, nicht mehr hartkodiert"),
])
def transition_coverage_section(base_dir) -> None:
"""Transition Coverage — the TRANSITION is the unit of knowledge; status DERIVED from the corpus."""
import os
import yaml
kdir = os.path.join(base_dir, "..", "knowledge")
with open(os.path.join(kdir, "programs", "transitions.yaml"), encoding="utf-8") as h:
backlog = yaml.safe_load(h)
tpdir = os.path.join(kdir, "transition_patterns")
pats = []
for f in sorted(os.listdir(tpdir)):
if not f.endswith(".yaml"):
continue
d = yaml.safe_load(open(os.path.join(tpdir, f), encoding="utf-8"))
goal = d.get("transition_goal", {})
frm = goal.get("from", {}).get("standard", "")
to = goal.get("to")
tos = [it.get("regulation") or it.get("framework") or it.get("target")
for it in (to if isinstance(to, list) else [to]) if isinstance(it, dict)]
pats.append((frm, [t for t in tos if t], str(d.get("status", "draft"))))
_amap = _regulation_aliases(base_dir) # Domain Vocabulary (regulations.yaml)
def _c(s):
return _canon_reg(s, _amap)
_RANK = {"draft": 1, "reviewed": 2, "validated": 3, "proven": 4}
_ICON = {0: "⚪ nicht begonnen", 1: "🟡 Draft", 2: "✅ reviewed", 3: "✅ validated", 4: "✅ Gold"}
def _status(frm, to):
best = 0
for pf, ptos, st in pats:
if _c(pf) == _c(frm) and _c(to) in {_c(x) for x in ptos}:
best = max(best, _RANK.get(st, 1))
return best
w("## Transition Coverage — die Transition ist die Wissenseinheit (Operational Knowledge)")
w("")
w('_Der Kunde kauft nicht „EMV-Domain", sondern „wir haben ISO 9001 — helfen Sie uns beim CRA". Die Wissenseinheit ist die TRANSITION (nicht das Gesetz). Status je Transition aus dem echten Pattern-Korpus abgeleitet (computed-not-stored). Drei Ebenen: Regulatory → **Operational (hier, größter Differenzierer)** → Verification (Vision V2)._')
w("")
w("| Prio | Transition | Status |")
w("|---|---|---|")
rows = sorted(backlog["transitions"], key=lambda t: -t["priority"])
done = 0
for t in rows:
s = _status(t["from"], t["to"])
done += 1 if s else 0
w("| %s | `%s%s` | %s |" % ("⭐" * t["priority"], t["from"], t["to"], _ICON[s]))
w("")
gaps = [t for t in rows if _status(t["from"], t["to"]) == 0]
if gaps:
w('**Größte Lücke (Track B als Nächstes):** `%s%s` (%s) — höchstnachgefragte Transition OHNE Pattern. Stärkerer Produktindikator als „EMV 30%% modelliert".' % (gaps[0]["from"], gaps[0]["to"], "⭐" * gaps[0]["priority"]))
w("")
coverage_table([
("Transition Coverage (Operational Knowledge)", "PASS", "%d von %d Top-Transitionen mit Pattern" % (done, len(rows))),
("Wissenseinheit = Transition (nicht Gesetz)", "PASS", "verkauft wird der Übergang, z. B. ISO9001→CRA"),
("3 Ebenen Regulatory→Operational→Verification", "PASS", "Operational = größter Differenzierer (ADR-010)"),
])