# ruff: noqa # mypy: ignore-errors """Reference Scenario Suite v1 — generator (living regression reference). Runs the REAL deployed engines for three real customer scenarios and emits a markdown reference artifact. Per scenario it derives an "Architecture Coverage" table from the actual run, so when a new domain lands the cells flip automatically (e.g. Sz2/Environmental UNSUPPORTED -> PASS). This is the objective measure of "is BreakPilot better than six months ago" — not LOC, not features. Run: cd backend-compliance && PYTHONPATH=. python3 reference_scenarios/generate.py Not product code; not imported by the app. """ from __future__ import annotations from typing import List, Tuple from compliance.profile.canonical import ( CanonicalProductRegulatoryProfile as P, CanonicalProductType as PT, EconomicOperatorRole as Role, CanonicalLifecyclePhase as LP, ProductComponent as Comp, ComponentKind as CK, EnvironmentalImpact as Env, ) from compliance.product_scope.orchestrator import resolve_product_scope from compliance.product_scope.schemas import ScopeStatus from compliance.regulatory_map.renderer import render_regulatory_map from compliance.interpretation_map.adapter import interpret_in_map from compliance.rci import create_baseline, assess_change, RegulatoryChange, ChangeType from compliance.company import ( CompanyContext, Certification, Declaration, ExistingEvidence, CapabilityMappingEntry, build_company_profile, ) from compliance.capability import ( CapabilityRegistry, CapabilityCandidate, CapabilityRelation, RelationType, EvidenceKind, mint_capability, evaluate_relation, ) from compliance.reasoning.enums import Confidence Row = Tuple[str, str, str] OUT: List[str] = [] ROLLUP: List[str] = [] def w(s: str = "") -> None: OUT.append(s) def coverage_table(rows: List[Row]) -> None: w("**Architecture Coverage**") w("") w("| Layer | Status | Hinweis |") w("|---|---|---|") for layer, status, note in rows: w("| %s | **%s** | %s |" % (layer, status, note)) ROLLUP.append(status) w("") def reg_map_block(rmap) -> None: w("**Expected Regulatory Map**") w("") w("> " + rmap.executive_summary) w("") for v in rmap.applicable_regulations: obs = ", ".join(o.obligation_id for o in v.obligations) or v.obligations_note w("- **%s** (%s) — Pflichten: %s" % (v.regulation_id, v.name, obs)) for u in rmap.uncertain_regulations: w("- _unsicher_ %s — fehlt: %s" % (u.regulation_id, ", ".join(u.missing_facts) or "-")) for ov in rmap.overlaps: w("- Overlap %s: %s" % (ov.overlap_group_id, ", ".join(ov.shared_obligations))) for ev, ids in rmap.shared_evidence.items(): w("- 1 Nachweis `%s` => %d Pflichten" % (ev, len(ids))) w("") def unsupported_block(rmap) -> None: w("**Expected Unsupported Domains**") w("") if not rmap.unsupported_domains: w("- keine — alle getriggerten Domaenen sind im Korpus") for d in rmap.unsupported_domains: w("- `%s` (Trigger: %s) -> %s" % (d.domain, d.trigger, d.note)) w("") ISO_MAP = {"ISO27001": CapabilityMappingEntry( capability_ids=["cap_incident_response", "cap_supplier_management", "cap_asset_management"], confidence=Confidence.MEDIUM)} def interp_status(verdict_value: str) -> str: return "PARTIAL" if verdict_value in ("uncertain", "unsupported") else "PASS" w("# Reference Scenario Suite v1") w("") w("> **Kein Doku-Artefakt — die erste Ground Truth / Living Reference Suite.** Erzeugt aus den " "REALEN deployten Engines (aktueller deployter main) via `reference_scenarios/generate.py`. Jede " "`Architecture Coverage`-Zelle ist aus dem echten Lauf ABGELEITET; sobald eine Domaene landet, " "kippt die Zelle automatisch (z. B. Sz2/Environmental UNSUPPORTED -> PASS). Beantwortet dauerhaft: " '„Ist BreakPilot besser als vor sechs Monaten?" — anhand echter Kundensituationen, nicht LOC.') w("") w("Synthetische `Cert->Capability`-Mappings sind als ILLUSTRATIV markiert; die echte Tabelle gehoert " "Compliance Execution (siehe Master Capability Registry).") w("") # ── Scenario 1 ──────────────────────────────────────────────────────────── w("## Szenario 1 — Maschinenbauer mit ISMS + SBOM + Remote Access") w("") w('**Frage:** „Was gilt fuer uns, und reicht das?"') w("") p1 = P(name="Industrielle Verpackungsmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"], economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET, is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True, has_remote_access=True, connected_to_internet=True, has_security_function=True, has_sbom=True, has_incident_response=True, technologies=["cloud", "ota_updates"], existing_certifications=["ISO27001"]) sc1 = resolve_product_scope(p1) rmap1 = render_regulatory_map(p1) w("**Input:** Maschine, vernetzt (Remote/Cloud), Firmware, Rolle Hersteller, Maerkte EU/DE; " "Company: ISMS (ISO27001) + SBOM + Incident Response.") w("") reg_map_block(rmap1) w("**Input — Company Context** _(ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management)_") w("") ctx1 = CompanyContext(company_id="maschinenbau", certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")], declarations=[Declaration(capability_id="cap_patch_management")], evidence=[ExistingEvidence(evidence_id="sbom.json", evidence_type="sbom", proves_capability_id="cap_sbom_management")]) prof1 = build_company_profile(ctx1, ISO_MAP) for c in prof1.candidate_capabilities: w("- candidate: %s — %s (%s)" % (c.capability_id, c.verification_status.value, c.source)) for c in prof1.confirmed_capabilities: w("- CONFIRMED: %s — %s (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources))) w("") ie1 = interpret_in_map(rmap1, "Wir liefern Sicherheitsupdates fuenf Jahre lang, damit ist der CRA erfuellt.") w("**Expected Interpretation**") w("") w('> Auslegung „5 Jahre Updates -> CRA erfuellt" -> **%s**' % ie1.assessment.value) w("> " + ie1.explanation) w("") base1 = create_baseline(p1, {"sbom_creation": ["sbom"], "provide_security_updates": ["policy"]}, baseline_id="s1") a1 = assess_change(base1, RegulatoryChange(change_id="cra-2026-amd", affected_regulations=["CRA"], affected_obligations=["cra_new_disclosure_duty", "sbom_creation", "vuln_handling_process"], change_type=ChangeType.AMENDMENT)) w("**Expected RCI** _(CRA-Novelle gegen gespeicherte Baseline)_") w("") w("> affects_product = %s — %s" % (a1.affects_product, a1.summary.what_changed)) for d in a1.deltas: w("- %s -> **%s** (fehlende Nachweise: %s)" % (d.obligation_id, d.delta_type.value, ", ".join(d.missing_evidence) or "-")) w("") unsupported_block(rmap1) w("**Known Gaps:** Interpretation kennt kein CRA-Muster (RS-001) · MaschinenVO/EMV-Pflichten nicht " "registry-verlinkt (RS-004) · cap/MCAP/Pflicht-Evidence nicht gejoint = Company-Gap (RS-003).") w("") coverage_table([ ("Company Context", "PASS", "ISO27001 + SBOM + Declaration"), ("Product Profile", "PASS", "Maschine, vernetzt, Firmware"), ("Navigator", "PASS", "ready_for_scope"), ("Scope", "PASS" if sc1.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", "CRA/MaschinenVO/EMV + 3 unsicher"), ("Regulatory Map", "PASS", "Overlaps + 1-Nachweis-N-Pflichten"), ("CRA Obligations", "PASS", "12 registry-verlinkt"), ("MaschinenVO/EMV Obligations", "PARTIAL", "Scope ja, Pflichten nicht verlinkt → RS-004"), ("Interpretation", interp_status(ie1.assessment.value), "kein CRA-Muster → RS-001"), ("RCI", "PASS", "1 neu, 2 geaendert"), ("Company Gap", "TODO", "cap↔MCAP↔Pflicht nicht gejoint → RS-003"), ("Environmental", "N/A", "keine Umwelt-Trigger"), ]) # ── Scenario 2 ──────────────────────────────────────────────────────────── w("## Szenario 2 — Industriespuelmaschine mit Abwasser/Chemikalien") w("") w('**Frage:** „Welche Umweltbereiche sind noch nicht abgedeckt?"') w("") p2 = P(name="Industriespuelmaschine", product_type=PT.MACHINERY, markets=["EU", "DE"], economic_operator_role=Role.MANUFACTURER, lifecycle_phase=LP.PLACING_ON_MARKET, is_machine=True, is_component=False, has_software_updates=True, has_embedded_software=True, components=[Comp(name="Dosierpumpe", kind=CK.CHEMICAL_DOSING), Comp(name="Abwasserauslass", kind=CK.WASTEWATER_OUTLET)], environmental=Env(discharges_to_wastewater=True, uses_cleaning_chemicals=True, consumes_energy_or_water=True)) sc2 = resolve_product_scope(p2) rmap2 = render_regulatory_map(p2) w("**Input:** Maschine mit Chemikalien-Dosierung + Abwasserauslass; Umwelt-Trigger gesetzt.") w("") reg_map_block(rmap2) unsupported_block(rmap2) ie2 = interpret_in_map(rmap2, "Unsere Abwassereinleitung mit Reinigungschemikalien ist sicher abgedeckt.") w("**Expected Interpretation** _(Umwelt -> bewusst nicht bewertet)_") w("") w("> " + ie2.explanation) w("") w("**Known Gaps:** Abwasser/Chemikalien/Energie sind `unsupported_domain` — Environmental Corpus fehlt (RS-002).") w("") env_status = "UNSUPPORTED" if rmap2.unsupported_domains else "PASS" coverage_table([ ("Product Profile", "PASS", "Maschine + Umwelt-Komponenten"), ("Scope", "PASS" if sc2.status == ScopeStatus.RESOLVED else "NEEDS_FACTS", ""), ("Regulatory Map", "PASS", "CRA/MaschinenVO/EMV"), ("Environmental (Abwasser/Chemikalien/Energie)", env_status, 'ehrlich „noch nicht im Korpus" → RS-002'), ("Interpretation (Umwelt)", "PARTIAL", "future_corpus_needed statt Scheinsicherheit"), ]) # ── Scenario 3 ──────────────────────────────────────────────────────────── w("## Szenario 3 — ISO27001-zertifiziertes Unternehmen") w("") w('**Frage:** „Welche Capabilities sind inferred, declared oder confirmed?"') w("") w("_ILLUSTRATIVES Mapping: ISO27001 -> incident_response, supplier_management, asset_management_") w("") ctx3 = CompanyContext(company_id="iso-kunde", certifications=[Certification(certification_id="ISO27001", name="ISO/IEC 27001")], declarations=[Declaration(capability_id="cap_patch_management", statement="Wir machen Patch Management")], evidence=[ExistingEvidence(evidence_id="patch-policy.pdf", evidence_type="policy", proves_capability_id="cap_patch_management")]) prof3 = build_company_profile(ctx3, ISO_MAP) w("**Expected Company Capability Profile** _(4-Zustands-Trust-Model)_") w("") for c in prof3.candidate_capabilities: w("- %s — **%s** (Quelle: %s)" % (c.capability_id, c.verification_status.value, c.source)) for c in prof3.confirmed_capabilities: w("- %s — **%s** (Nachweis: %s)" % (c.capability_id, c.verification_status.value, ", ".join(c.sources))) w("") reg = CapabilityRegistry() m = mint_capability(reg, CapabilityCandidate(raw_term="Incident Response"), category="Security") ca = evaluate_relation(CapabilityRelation(relation_id="r1", source="certification:ISO27001", target_capability_id=m.capability_id, relationship_type=RelationType.SUPPORTS, evidence_kind=EvidenceKind.CERTIFICATION)) cb = evaluate_relation(CapabilityRelation(relation_id="r2", source="ir-runbook.pdf", target_capability_id=m.capability_id, relationship_type=RelationType.CONFIRMS, evidence_kind=EvidenceKind.ARTIFACT)) w("**Expected Master Capability Registry** _(computed confidence, policy-versioniert)_") w("") w("- ISO27001 *supports* %s -> %s/%s (policy %s) — computed, nicht gespeichert" % (m.capability_id, ca.status.value, ca.confidence.value, ca.policy_version)) w("- ir-runbook.pdf *confirms* %s -> %s/%s — nur echtes Artefakt erreicht confirmed" % (m.capability_id, cb.status.value, cb.confidence.value)) w("") w("**Known Gaps:** `cap_*` (Company 2A) und `MCAP-*` (Registry) sind noch nicht verlinkt (RS-003).") w("") coverage_table([ ("Company Context", "PASS", "ISO27001 + Declaration + Evidence"), ("Trust-State (declared/inferred/confirmed)", "PASS", "Zertifizierung nie confirmed"), ("Master Capability Registry", "PASS", "computed confidence, policy-versioniert"), ("cap ↔ MCAP Linking", "TODO", "zwei Vokabulare unverbunden → RS-003"), ]) # ── Epics + roll-up ─────────────────────────────────────────────────────── w("## Gaps → Epics (Backlog — nur erfasst, NICHT implementiert)") w("") w("| Epic | Titel | schliesst Coverage-Luecke |") w("|---|---|---|") w("| RS-001 | Interpretation Pattern Library | Sz1 Interpretation PARTIAL -> PASS (CRA-Muster) |") w("| RS-002 | Environmental Corpus (Pilotdomaene) | Sz2 Environmental UNSUPPORTED -> PASS |") w("| RS-003 | Capability Linking (cap↔MCAP) + Company-Gap | Sz1/Sz3 Company Gap TODO -> PASS |") w("| RS-004 | MaschinenVO/EMV Registry Linking | Sz1/Sz2 MaschinenVO/EMV PARTIAL -> PASS |") w("") total = len(ROLLUP) npass = ROLLUP.count("PASS") w("## Suite-Status (Roll-up)") w("") w("- Coverage-Zellen gesamt: **%d**" % total) w("- PASS: **%d** · PARTIAL: %d · UNSUPPORTED: %d · TODO: %d · N/A: %d · NEEDS_FACTS: %d" % (npass, ROLLUP.count("PARTIAL"), ROLLUP.count("UNSUPPORTED"), ROLLUP.count("TODO"), ROLLUP.count("N/A"), ROLLUP.count("NEEDS_FACTS"))) w("- Fortschritt = PASS-Anteil steigt, wenn Epics RS-001…004 landen (objektiver Maßstab, kein LOC).") print("\n".join(OUT))