4bfd552da7
(1) Harden the first Transition Pattern to the gold-standard template per quality checklist: versioned transition_goal (ISO27001:2022 -> CRA, applies 2027-12-11), source_state_variants (certified/isms_introduced/expired/limited_scope), each likely_covered assumption with a typed relationship (supports|partially_supports, never equivalent) + verification + rationale (the Warum) + an auditor-checkable reviewable_claim, delta as missing-capability + needed-info, an explicit rejected_assumptions section, and a determinism_goal. README schema updated to match. (2) New Reference-Suite scenario 4 (Transition): the generator READS the pattern YAML and runs it through the RS-005 Planning Engine + Company 2A -> coverage + question requests. Proves the architecture fully carries the pattern (17 caps -> 17 coverage + 17 requests; 9 HIGH delta = the real CRA gaps, 8 probably-covered from the ISMS). Now a living regression test: every future pattern runs through the same engine. Non-runtime knowledge + reference harness -> no deploy (ADR-001). Next: ISMS->TISAX once approved. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
313 lines
17 KiB
Python
313 lines
17 KiB
Python
# ruff: noqa
|
|
# mypy: ignore-errors
|
|
"""Reference Scenario Suite v1 — generator (living regression reference).
|
|
|
|
Runs the REAL deployed engines for three real customer scenarios and emits a
|
|
markdown reference artifact. Per scenario it derives an "Architecture Coverage"
|
|
table from the actual run, so when a new domain lands the cells flip automatically
|
|
(e.g. Sz2/Environmental UNSUPPORTED -> PASS). This is the objective measure of
|
|
"is BreakPilot better than six months ago" — not LOC, not features.
|
|
|
|
Run: cd backend-compliance && PYTHONPATH=. python3 reference_scenarios/generate.py
|
|
Not product code; not imported by the app.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import List, Tuple
|
|
|
|
from compliance.profile.canonical import (
|
|
CanonicalProductRegulatoryProfile as P, CanonicalProductType as PT,
|
|
EconomicOperatorRole as Role, CanonicalLifecyclePhase as LP,
|
|
ProductComponent as Comp, ComponentKind as CK, EnvironmentalImpact as Env,
|
|
)
|
|
from compliance.product_scope.orchestrator import resolve_product_scope
|
|
from compliance.product_scope.schemas import ScopeStatus
|
|
from compliance.regulatory_map.renderer import render_regulatory_map
|
|
from compliance.interpretation_map.adapter import interpret_in_map
|
|
from compliance.rci import create_baseline, assess_change, RegulatoryChange, ChangeType
|
|
from compliance.company import (
|
|
CompanyContext, Certification, Declaration, ExistingEvidence,
|
|
CapabilityMappingEntry, build_company_profile,
|
|
)
|
|
from compliance.capability import (
|
|
CapabilityRegistry, CapabilityCandidate, CapabilityRelation, RelationType,
|
|
EvidenceKind, mint_capability, evaluate_relation,
|
|
)
|
|
from compliance.reasoning.enums import Confidence
|
|
from compliance.transition_reasoning import (
|
|
TransitionContext, TransitionGoal, TargetType, TargetRequirement, assess_transition,
|
|
)
|
|
import os
|
|
import yaml
|
|
|
|
Row = Tuple[str, str, str]
|
|
OUT: List[str] = []
|
|
ROLLUP: List[str] = []
|
|
|
|
|
|
def w(s: str = "") -> None:
|
|
OUT.append(s)
|
|
|
|
|
|
def coverage_table(rows: List[Row]) -> None:
|
|
w("**Architecture Coverage**")
|
|
w("")
|
|
w("| Layer | Status | Hinweis |")
|
|
w("|---|---|---|")
|
|
for layer, status, note in rows:
|
|
w("| %s | **%s** | %s |" % (layer, status, note))
|
|
ROLLUP.append(status)
|
|
w("")
|
|
|
|
|
|
def reg_map_block(rmap) -> None:
|
|
w("**Expected Regulatory Map**")
|
|
w("")
|
|
w("> " + rmap.executive_summary)
|
|
w("")
|
|
for v in rmap.applicable_regulations:
|
|
obs = ", ".join(o.obligation_id for o in v.obligations) or v.obligations_note
|
|
w("- **%s** (%s) — Pflichten: %s" % (v.regulation_id, v.name, obs))
|
|
for u in rmap.uncertain_regulations:
|
|
w("- _unsicher_ %s — fehlt: %s" % (u.regulation_id, ", ".join(u.missing_facts) or "-"))
|
|
for ov in rmap.overlaps:
|
|
w("- Overlap %s: %s" % (ov.overlap_group_id, ", ".join(ov.shared_obligations)))
|
|
for ev, ids in rmap.shared_evidence.items():
|
|
w("- 1 Nachweis `%s` => %d Pflichten" % (ev, len(ids)))
|
|
w("")
|
|
|
|
|
|
def unsupported_block(rmap) -> None:
|
|
w("**Expected Unsupported Domains**")
|
|
w("")
|
|
if not rmap.unsupported_domains:
|
|
w("- keine — alle getriggerten Domaenen sind im Korpus")
|
|
for d in rmap.unsupported_domains:
|
|
w("- `%s` (Trigger: %s) -> %s" % (d.domain, d.trigger, d.note))
|
|
w("")
|
|
|
|
|
|
ISO_MAP = {"ISO27001": CapabilityMappingEntry(
|
|
capability_ids=["cap_incident_response", "cap_supplier_management", "cap_asset_management"],
|
|
confidence=Confidence.MEDIUM)}
|
|
|
|
|
|
def interp_status(verdict_value: str) -> str:
|
|
return "PARTIAL" if verdict_value in ("uncertain", "unsupported") else "PASS"
|
|
|
|
|
|
w("# Reference Scenario Suite v1")
|
|
w("")
|
|
w("> **Kein Doku-Artefakt — die erste Ground Truth / Living Reference Suite.** Erzeugt aus den "
|
|
"REALEN deployten Engines (aktueller deployter main) via `reference_scenarios/generate.py`. Jede "
|
|
"`Architecture Coverage`-Zelle ist aus dem echten Lauf ABGELEITET; sobald eine Domaene landet, "
|
|
"kippt die Zelle automatisch (z. B. Sz2/Environmental UNSUPPORTED -> PASS). Beantwortet dauerhaft: "
|
|
'„Ist BreakPilot besser als vor sechs Monaten?" — anhand echter Kundensituationen, nicht LOC.')
|
|
w("")
|
|
w("Synthetische `Cert->Capability`-Mappings sind als ILLUSTRATIV markiert; die echte Tabelle gehoert "
|
|
"Compliance Execution (siehe Master Capability Registry).")
|
|
w("")
|
|
|
|
# ── Scenario 1 ────────────────────────────────────────────────────────────
|
|
w("## Szenario 1 — Maschinenbauer mit ISMS + SBOM + Remote Access")
|
|
w("")
|
|
w('**Frage:** „Was gilt fuer uns, und reicht das?"')
|
|
w("")
|
|
p1 = P(name="Industrielle Verpackungsmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"],
|
|
economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET,
|
|
is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True,
|
|
has_remote_access=True, connected_to_internet=True, has_security_function=True,
|
|
has_sbom=True, has_incident_response=True, technologies=["cloud", "ota_updates"],
|
|
existing_certifications=["ISO27001"])
|
|
sc1 = resolve_product_scope(p1)
|
|
rmap1 = render_regulatory_map(p1)
|
|
w("**Input:** Maschine, vernetzt (Remote/Cloud), Firmware, Rolle Hersteller, Maerkte EU/DE; "
|
|
"Company: ISMS (ISO27001) + SBOM + Incident Response.")
|
|
w("")
|
|
reg_map_block(rmap1)
|
|
w("**Input — Company Context** _(ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management)_")
|
|
w("")
|
|
ctx1 = CompanyContext(company_id="maschinenbau",
|
|
certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")],
|
|
declarations=[Declaration(capability_id="cap_patch_management")],
|
|
evidence=[ExistingEvidence(evidence_id="sbom.json", evidence_type="sbom", proves_capability_id="cap_sbom_management")])
|
|
prof1 = build_company_profile(ctx1, ISO_MAP)
|
|
for c in prof1.candidate_capabilities:
|
|
w("- candidate: %s — %s (%s)" % (c.capability_id, c.verification_status.value, c.source))
|
|
for c in prof1.confirmed_capabilities:
|
|
w("- CONFIRMED: %s — %s (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources)))
|
|
w("")
|
|
ie1 = interpret_in_map(rmap1, "Wir liefern Sicherheitsupdates fuenf Jahre lang, damit ist der CRA erfuellt.")
|
|
w("**Expected Interpretation**")
|
|
w("")
|
|
w('> Auslegung „5 Jahre Updates -> CRA erfuellt" -> **%s**' % ie1.assessment.value)
|
|
w("> " + ie1.explanation)
|
|
w("")
|
|
base1 = create_baseline(p1, {"sbom_creation": ["sbom"], "provide_security_updates": ["policy"]}, baseline_id="s1")
|
|
a1 = assess_change(base1, RegulatoryChange(change_id="cra-2026-amd", affected_regulations=["CRA"],
|
|
affected_obligations=["cra_new_disclosure_duty", "sbom_creation", "vuln_handling_process"],
|
|
change_type=ChangeType.AMENDMENT))
|
|
w("**Expected RCI** _(CRA-Novelle gegen gespeicherte Baseline)_")
|
|
w("")
|
|
w("> affects_product = %s — %s" % (a1.affects_product, a1.summary.what_changed))
|
|
for d in a1.deltas:
|
|
w("- %s -> **%s** (fehlende Nachweise: %s)" % (d.obligation_id, d.delta_type.value, ", ".join(d.missing_evidence) or "-"))
|
|
w("")
|
|
unsupported_block(rmap1)
|
|
w("**Known Gaps:** Interpretation kennt kein CRA-Muster (RS-001) · MaschinenVO/EMV-Pflichten nicht "
|
|
"registry-verlinkt (RS-004) · cap/MCAP/Pflicht-Evidence nicht gejoint = Company-Gap (RS-003).")
|
|
w("")
|
|
coverage_table([
|
|
("Company Context", "PASS", "ISO27001 + SBOM + Declaration"),
|
|
("Product Profile", "PASS", "Maschine, vernetzt, Firmware"),
|
|
("Navigator", "PASS", "ready_for_scope"),
|
|
("Scope", "PASS" if sc1.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", "CRA/MaschinenVO/EMV + 3 unsicher"),
|
|
("Regulatory Map", "PASS", "Overlaps + 1-Nachweis-N-Pflichten"),
|
|
("CRA Obligations", "PASS", "12 registry-verlinkt"),
|
|
("MaschinenVO/EMV Obligations", "PARTIAL", "Scope ja, Pflichten nicht verlinkt → RS-004"),
|
|
("Interpretation", interp_status(ie1.assessment.value), "kein CRA-Muster → RS-001"),
|
|
("RCI", "PASS", "1 neu, 2 geaendert"),
|
|
("Company Gap", "TODO", "cap↔MCAP↔Pflicht nicht gejoint → RS-003"),
|
|
("Environmental", "N/A", "keine Umwelt-Trigger"),
|
|
])
|
|
|
|
# ── Scenario 2 ────────────────────────────────────────────────────────────
|
|
w("## Szenario 2 — Industriespuelmaschine mit Abwasser/Chemikalien")
|
|
w("")
|
|
w('**Frage:** „Welche Umweltbereiche sind noch nicht abgedeckt?"')
|
|
w("")
|
|
p2 = P(name="Industriespuelmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"],
|
|
economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET,
|
|
is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True,
|
|
components=[Comp(name="Dosierpumpe", kind=CK.CHEMICAL_DOSING), Comp(name="Abwasserauslass", kind=CK.WASTEWATER_OUTLET)],
|
|
environmental=Env(discharges_to_wastewater=True, uses_cleaning_chemicals=True, consumes_energy_or_water=True))
|
|
sc2 = resolve_product_scope(p2)
|
|
rmap2 = render_regulatory_map(p2)
|
|
w("**Input:** Maschine mit Chemikalien-Dosierung + Abwasserauslass; Umwelt-Trigger gesetzt.")
|
|
w("")
|
|
reg_map_block(rmap2)
|
|
unsupported_block(rmap2)
|
|
ie2 = interpret_in_map(rmap2, "Unsere Abwassereinleitung mit Reinigungschemikalien ist sicher abgedeckt.")
|
|
w("**Expected Interpretation** _(Umwelt -> bewusst nicht bewertet)_")
|
|
w("")
|
|
w("> " + ie2.explanation)
|
|
w("")
|
|
w("**Known Gaps:** Abwasser/Chemikalien/Energie sind `unsupported_domain` — Environmental Corpus fehlt (RS-002).")
|
|
w("")
|
|
env_status = "UNSUPPORTED" if rmap2.unsupported_domains else "PASS"
|
|
coverage_table([
|
|
("Product Profile", "PASS", "Maschine + Umwelt-Komponenten"),
|
|
("Scope", "PASS" if sc2.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", ""),
|
|
("Regulatory Map", "PASS", "CRA/MaschinenVO/EMV"),
|
|
("Environmental (Abwasser/Chemikalien/Energie)", env_status, 'ehrlich „noch nicht im Korpus" → RS-002'),
|
|
("Interpretation (Umwelt)", "PARTIAL", "future_corpus_needed statt Scheinsicherheit"),
|
|
])
|
|
|
|
# ── Scenario 3 ────────────────────────────────────────────────────────────
|
|
w("## Szenario 3 — ISO27001-zertifiziertes Unternehmen")
|
|
w("")
|
|
w('**Frage:** „Welche Capabilities sind inferred, declared oder confirmed?"')
|
|
w("")
|
|
w("_ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management_")
|
|
w("")
|
|
ctx3 = CompanyContext(company_id="iso-kunde",
|
|
certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")],
|
|
declarations=[Declaration(capability_id="cap_patch_management", statement="Wir machen Patch Management")],
|
|
evidence=[ExistingEvidence(evidence_id="patch-policy.pdf", evidence_type="policy", proves_capability_id="cap_patch_management")])
|
|
prof3 = build_company_profile(ctx3, ISO_MAP)
|
|
w("**Expected Company Capability Profile** _(4-Zustands-Trust-Model)_")
|
|
w("")
|
|
for c in prof3.candidate_capabilities:
|
|
w("- %s — **%s** (Quelle: %s)" % (c.capability_id, c.verification_status.value, c.source))
|
|
for c in prof3.confirmed_capabilities:
|
|
w("- %s — **%s** (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources)))
|
|
w("")
|
|
reg = CapabilityRegistry()
|
|
m = mint_capability(reg, CapabilityCandidate(raw_term="Incident Response"), category="Security")
|
|
ca = evaluate_relation(CapabilityRelation(relation_id="r1", source="certification:ISO27001", target_capability_id=m.capability_id, relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION))
|
|
cb = evaluate_relation(CapabilityRelation(relation_id="r2", source="ir-runbook.pdf", target_capability_id=m.capability_id, relationship_type=RelationType.CONFIRMS, evidence_kind=EvidenceKind.ARTIFACT))
|
|
w("**Expected Master Capability Registry** _(computed confidence, policy-versioniert)_")
|
|
w("")
|
|
w("- ISO27001 *supports* %s -> %s/%s (policy %s) — computed, nicht gespeichert" % (m.capability_id, ca.status.value, ca.confidence.value, ca.policy_version))
|
|
w("- ir-runbook.pdf *confirms* %s -> %s/%s — nur echtes Artefakt erreicht confirmed" % (m.capability_id, cb.status.value, cb.confidence.value))
|
|
w("")
|
|
w("**Known Gaps:** `cap_*` (Company 2A) und `MCAP-*` (Registry) sind noch nicht verlinkt (RS-003).")
|
|
w("")
|
|
coverage_table([
|
|
("Company Context", "PASS", "ISO27001 + Declaration + Evidence"),
|
|
("Trust-State (declared/inferred/confirmed)", "PASS", "Zertifizierung nie confirmed"),
|
|
("Master Capability Registry", "PASS", "computed confidence, policy-versioniert"),
|
|
("cap ↔ MCAP Linking", "TODO", "zwei Vokabulare unverbunden → RS-003"),
|
|
])
|
|
|
|
# ── Scenario 4 — Transition (RS-005 Planning Engine + Knowledge Pattern) ───
|
|
w("## Szenario 4 — Transition ISO27001 → CRA (RS-005 + Pattern TP-ISO27001-CRA-v1)")
|
|
w("")
|
|
w('_Frage: „Ich bin ISO27001-zertifiziert — was fehlt mir für den CRA?"_')
|
|
w("")
|
|
_pat_path = os.path.join(os.path.dirname(__file__), "..", "knowledge", "transition_patterns",
|
|
"transition_pattern_iso27001_to_cra_v1.yaml")
|
|
with open(_pat_path, encoding="utf-8") as _f:
|
|
PAT = yaml.safe_load(_f)
|
|
# „habe": ISO27001 -> the pattern's likely_covered capabilities become INFERRED via Company 2A
|
|
_iso_caps = [a["capability"] for a in PAT["likely_covered"]]
|
|
_iso_map = {"ISO27001": CapabilityMappingEntry(capability_ids=_iso_caps, confidence=Confidence.MEDIUM)}
|
|
_profile = build_company_profile(
|
|
CompanyContext(company_id="iso-kunde", certifications=[Certification(certification_id="ISO27001")]), _iso_map)
|
|
# „required": likely_covered + delta -> TargetRequirements (here read from the pattern)
|
|
_reqs = [TargetRequirement(capability_id=a["capability"], question_intent="verify_existence",
|
|
expected_evidence=a.get("expected_evidence", [])) for a in PAT["likely_covered"]]
|
|
_reqs += [TargetRequirement(capability_id=d["capability"], question_intent=d.get("needed_information", "verify_existence"),
|
|
expected_evidence=d.get("expected_evidence", [])) for d in PAT["delta_requirements"]]
|
|
_tc = TransitionContext(company_id="iso-kunde", known_certifications=["ISO27001"],
|
|
target=TransitionGoal(target_id="CRA", target_type=TargetType.REGULATION,
|
|
label=PAT["transition_goal"]["to"]["regulation"]))
|
|
_a = assess_transition(_tc, _reqs, _profile)
|
|
w("**Input:** ISO27001-zertifiziert (Pattern TP-ISO27001-CRA-v1) → %d ISMS-Capabilities inferred; Ziel CRA." % len(_iso_caps))
|
|
w("")
|
|
w("**Expected Transition Assessment (RS-005 v0 gegen den Pattern):**")
|
|
w("> Ziel %s · %s" % (_a.target_id, _a.summary.headline))
|
|
w("")
|
|
w("**Delta zuerst (HIGH — fehlt einem ISO-27001-only-Hersteller):**")
|
|
for _r in _a.question_requests:
|
|
if _r.priority.value == "high":
|
|
w("- `%s` — intent=%s, Nachweis=%s" % (_r.capability_id, _r.question_intent, _r.expected_evidence))
|
|
w("")
|
|
w("**Aus ISO27001 vermutlich abgedeckt (Produkt-Nachweis bestätigen):** %s" % ", ".join(_a.summary.probably_covered))
|
|
w("")
|
|
_carried = len(_a.coverage) == len(_reqs) and len(_a.question_requests) > 0
|
|
_n_high = sum(1 for _r in _a.question_requests if _r.priority.value == "high")
|
|
w("**Architektur-Test — trägt RS-005 den Pattern vollständig?** %d Pattern-Capabilities → %d Coverage + %d Question-Requests → **%s**."
|
|
% (len(_reqs), len(_a.coverage), len(_a.question_requests), "ja, vollständig getragen" if _carried else "NICHT vollständig"))
|
|
w("")
|
|
coverage_table([
|
|
("Pattern-Load (YAML)", "PASS", "TP-ISO27001-CRA-v1 (draft, gold-standard)"),
|
|
("Company 2A (habe)", "PASS", "ISO27001 → %d inferred caps" % len(_a.summary.probably_covered)),
|
|
("RS-005 Planning Engine", "PASS" if _carried else "PARTIAL", "Pattern → TransitionQuestionRequests"),
|
|
("Transition ISO27001→CRA", "PASS" if _carried else "PARTIAL",
|
|
"%d Delta-Fragen (HIGH) + %d zu bestätigen" % (_n_high, len(_a.summary.probably_covered))),
|
|
("RS-005.1 Renderer (Fragetext)", "TODO", "verschoben — Engine liefert nur Requests"),
|
|
])
|
|
|
|
# ── Epics + roll-up ───────────────────────────────────────────────────────
|
|
w("## Gaps → Epics (Backlog — nur erfasst, NICHT implementiert)")
|
|
w("")
|
|
w("| Epic | Titel | schliesst Coverage-Luecke |")
|
|
w("|---|---|---|")
|
|
w("| RS-001 | Interpretation Pattern Library | Sz1 Interpretation PARTIAL -> PASS (CRA-Muster) |")
|
|
w("| RS-002 | Environmental Corpus (Pilotdomaene) | Sz2 Environmental UNSUPPORTED -> PASS |")
|
|
w("| RS-003 | Capability Linking (cap↔MCAP) + Company-Gap | Sz1/Sz3 Company Gap TODO -> PASS |")
|
|
w("| RS-004 | MaschinenVO/EMV Registry Linking | Sz1/Sz2 MaschinenVO/EMV PARTIAL -> PASS |")
|
|
w("")
|
|
total = len(ROLLUP)
|
|
npass = ROLLUP.count("PASS")
|
|
w("## Suite-Status (Roll-up)")
|
|
w("")
|
|
w("- Coverage-Zellen gesamt: **%d**" % total)
|
|
w("- PASS: **%d** · PARTIAL: %d · UNSUPPORTED: %d · TODO: %d · N/A: %d · NEEDS_FACTS: %d"
|
|
% (npass, ROLLUP.count("PARTIAL"), ROLLUP.count("UNSUPPORTED"), ROLLUP.count("TODO"),
|
|
ROLLUP.count("N/A"), ROLLUP.count("NEEDS_FACTS")))
|
|
w("- Fortschritt = PASS-Anteil steigt, wenn Epics RS-001…004 landen (objektiver Maßstab, kein LOC).")
|
|
|
|
print("\n".join(OUT))
|