From 10c32d7f7c027ab3e865153a41fbbb31655ee802 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 14 Jun 2026 08:59:41 +0200 Subject: [PATCH] feat(cra): cyber-meets-safety bridge as real logic (step 2) Deterministic bridge (cra_safety_bridge.py): a cyber finding's attack capability (remote_actuation / code_tampering / integrity_loss / auth_bypass, derived from its CRA category) is matched against what each CE safety function is vulnerable to. A match re-opens the mitigated hazard, flags the finding safety_impact (which floors it to P0), and produces the cross-link. Endpoint accepts safety_functions; frontend passes the project's safety functions and renders the LIVE cross-links (no more hardcode). Safety functions are demo input now; come from the CE risk assessment in production. Co-Authored-By: Claude Opus 4.7 --- .../sdk/iace/[projectId]/cra/_hooks/useCRA.ts | 24 +++++- .../compliance/api/cra_assess_routes.py | 16 +++- .../compliance/services/cra_finding_mapper.py | 22 ++++-- .../compliance/services/cra_safety_bridge.py | 77 +++++++++++++++++++ .../tests/test_cra_safety_bridge.py | 46 +++++++++++ 5 files changed, 176 insertions(+), 9 deletions(-) create mode 100644 backend-compliance/compliance/services/cra_safety_bridge.py create mode 100644 backend-compliance/tests/test_cra_safety_bridge.py diff --git a/admin-compliance/app/sdk/iace/[projectId]/cra/_hooks/useCRA.ts b/admin-compliance/app/sdk/iace/[projectId]/cra/_hooks/useCRA.ts index b37fe996..255e7b36 100644 --- a/admin-compliance/app/sdk/iace/[projectId]/cra/_hooks/useCRA.ts +++ b/admin-compliance/app/sdk/iace/[projectId]/cra/_hooks/useCRA.ts @@ -11,6 +11,24 @@ import { CRADemo, CRAFinding, Measure, DEMO_SCENARIO } from './useCRADemo' export type Weights = Record // objective -> high|medium|low +// Demo: CE-risk-assessment safety functions of the Kistenhub (would come from the +// project's CE risk assessment in production). The backend bridge decides which +// cyber findings can defeat them (and flags those safety_impact -> P0). +const SAFETY_FUNCTIONS = [ + { + name: 'Zweihandschaltung + trennende Schutzeinrichtung am Hubwerk', + hazard: 'Unerwarteter Anlauf des Hubwerks → Quetschen zwischen Last und Rahmen', + original_measure: 'Zweihandschaltung + trennende Schutzeinrichtung (mechanisch, PL d)', + kind: 'prevent_unexpected_actuation', + }, + { + name: 'Überlastsicherung / Lastmomentbegrenzer', + hazard: 'Überlast / Lastabsturz durch manipulierte Lastgrenze', + original_measure: 'Überlastsicherung / Lastmomentbegrenzer', + kind: 'signal_integrity', + }, +] + function reqTitle(rationale: string): string { const i = rationale.indexOf(': ') return i >= 0 ? rationale.slice(i + 2) : rationale @@ -54,7 +72,7 @@ function merge(live: any): CRADemo { coverage_pct: live.coverage_pct ?? DEMO_SCENARIO.coverage_pct, requirements_touched: live.requirements_touched || DEMO_SCENARIO.requirements_touched, open_measures, - cross_links: DEMO_SCENARIO.cross_links, + cross_links: live.cross_links && live.cross_links.length ? live.cross_links : DEMO_SCENARIO.cross_links, deadlines: live.deadlines || DEMO_SCENARIO.deadlines, quick_wins: live.quick_wins || [], objectives: live.objectives || [], @@ -71,10 +89,10 @@ export function useCRA() { const payload = { findings: DEMO_SCENARIO.findings.map((f) => ({ id: f.id, title: f.title, cwe: f.cwe, severity: f.scanner_severity, location: f.location, - // demo: flag the two findings tied to a safety cross-link as safety_impact - safety_impact: f.id === 'KH-CY-1' || f.id === 'KH-CY-2' || f.id === 'KH-CY-3', })), weights, + // the bridge decides safety_impact from these (no frontend hardcode) + safety_functions: SAFETY_FUNCTIONS, } fetch('/api/v1/cra/assess', { method: 'POST', diff --git a/backend-compliance/compliance/api/cra_assess_routes.py b/backend-compliance/compliance/api/cra_assess_routes.py index 9b5faf2f..0d14f17f 100644 --- a/backend-compliance/compliance/api/cra_assess_routes.py +++ b/backend-compliance/compliance/api/cra_assess_routes.py @@ -32,14 +32,28 @@ class FindingIn(BaseModel): exploited: Optional[bool] = False +class SafetyFunctionIn(BaseModel): + name: str + hazard: Optional[str] = "" + original_measure: Optional[str] = "" + kind: Optional[str] = "" # prevent_unexpected_actuation | signal_integrity + vulnerable_to: Optional[List[str]] = None + + class AssessRequest(BaseModel): findings: List[FindingIn] # customer priorities for the discretionary tier: {objective: high|medium|low}. # objectives: access | data | network_api | supply_updates | monitoring. weights: Optional[Dict[str, str]] = None + # CE-risk-assessment safety functions for the cyber-meets-safety bridge. + safety_functions: Optional[List[SafetyFunctionIn]] = None @router.post("/assess") async def assess(body: AssessRequest): - payload = {"findings": [f.model_dump() for f in body.findings], "weights": body.weights} + payload = { + "findings": [f.model_dump() for f in body.findings], + "weights": body.weights, + "safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None, + } return assess_findings_payload(payload) diff --git a/backend-compliance/compliance/services/cra_finding_mapper.py b/backend-compliance/compliance/services/cra_finding_mapper.py index 18f3dd33..9b8d72f8 100644 --- a/backend-compliance/compliance/services/cra_finding_mapper.py +++ b/backend-compliance/compliance/services/cra_finding_mapper.py @@ -16,6 +16,7 @@ from typing import Optional from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES from compliance.services.cra_security_crosswalk import security_refs_for from compliance.services.cra_prioritizer import prioritize, OBJECTIVES +from compliance.services.cra_safety_bridge import build_cross_links _REQ_INDEX = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS} _SEV_ORDER = {"LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4} @@ -120,6 +121,7 @@ class CRAAssessment: coverage_pct: float = 0.0 quick_wins: list = field(default_factory=list) # finding_ids: high impact, low effort objectives: list = field(default_factory=lambda: list(OBJECTIVES)) + cross_links: list = field(default_factory=list) # cyber-meets-safety bridge deadlines: list = field(default_factory=lambda: list(DEADLINES)) @@ -197,13 +199,21 @@ def map_finding(f: ScannerFinding) -> MappedFinding: ) -def assess_findings(findings: list, weights=None) -> CRAAssessment: +def assess_findings(findings: list, weights=None, safety_functions=None) -> CRAAssessment: """Map findings to a deterministic CRA assessment, then prioritise them. - weights: {objective: 'high'|'medium'|'low'} — the customer's priorities for - the discretionary tier (the P0 floor ignores them). + weights: {objective: 'high'|'medium'|'low'} — customer priorities for the + discretionary tier (the P0 floor ignores them). + safety_functions: CE-risk-assessment safety functions for the cyber-meets- + safety bridge; a finding that can defeat one is flagged safety_impact (→ P0). """ - mapped = prioritize([map_finding(f) for f in findings], weights) + mapped = [map_finding(f) for f in findings] + cross_links = build_cross_links(mapped, safety_functions) + flagged = {fid for cl in cross_links for fid in cl["cyber_finding_ids"]} + for m in mapped: + if m.finding_id in flagged: + m.safety_impact = True + mapped = prioritize(mapped, weights) by_risk = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} reqs_touched, measure_ids, unmapped = set(), [], [] for m in mapped: @@ -226,6 +236,7 @@ def assess_findings(findings: list, weights=None) -> CRAAssessment: unmapped_findings=unmapped, coverage_pct=round(100.0 * covered / total, 1) if total else 0.0, quick_wins=[m.finding_id for m in mapped if m.quick_win], + cross_links=cross_links, ) @@ -236,6 +247,7 @@ def assess_findings_payload(payload: dict) -> dict: """ raw = payload.get("findings", []) if isinstance(payload, dict) else [] weights = payload.get("weights") if isinstance(payload, dict) else None + safety_functions = payload.get("safety_functions") if isinstance(payload, dict) else None findings = [ScannerFinding.from_dict(d) for d in raw] - assessment = assess_findings(findings, weights) + assessment = assess_findings(findings, weights, safety_functions) return asdict(assessment) # recurses into nested MappedFinding dataclasses diff --git a/backend-compliance/compliance/services/cra_safety_bridge.py b/backend-compliance/compliance/services/cra_safety_bridge.py new file mode 100644 index 00000000..f76aeaae --- /dev/null +++ b/backend-compliance/compliance/services/cra_safety_bridge.py @@ -0,0 +1,77 @@ +"""Cyber-meets-Safety bridge — deterministic, replaces the demo hardcode. + +Links a cyber finding to a CE-risk-assessment safety function it can DEFEAT, by +matching the finding's attack *capability* against what the safety function is +*vulnerable to*. A match means: the cyber finding re-opens a hazard that was +considered mitigated → it must surface in the CE risk assessment (Machinery Reg +2023/1230) and is flagged safety_impact (which feeds the P0 priority floor). + +Capabilities are derived from the finding's CRA Annex I category (deterministic, +no LLM). Safety functions come from the project's CE risk assessment (passed in); +for the demo they are supplied as scenario input. Output matches the frontend +CrossLink shape. +""" +from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS + +_REQ = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS} + +# CRA Annex I category -> attack capabilities the finding grants. +_CATEGORY_CAPABILITIES = { + "Secure-by-Design": {"remote_actuation", "code_tampering"}, + "Authentifizierung": {"auth_bypass", "remote_actuation"}, + "Kryptografie": {"integrity_loss"}, + "Supply Chain": {"code_tampering"}, + "Updates": {"code_tampering", "integrity_loss"}, +} + +# Safety-function kind -> the capabilities that defeat it. +_SF_KIND_VULN = { + "prevent_unexpected_actuation": {"remote_actuation", "code_tampering", "auth_bypass"}, + "signal_integrity": {"integrity_loss", "code_tampering"}, +} + +_CAP_LABEL = { + "remote_actuation": "Fern-Auslösung", + "code_tampering": "manipulierte Firmware/Software", + "integrity_loss": "manipulierte Signale/Parameter", + "auth_bypass": "Authentifizierungs-Umgehung", +} + + +def capabilities_for(mapped) -> set: + req = _REQ.get(getattr(mapped, "primary_requirement", ""), {}) + return set(_CATEGORY_CAPABILITIES.get(req.get("category", ""), set())) + + +def build_cross_links(mapped_list: list, safety_functions: list) -> list: + """For each safety function, collect the cyber findings that can defeat it. + + safety_functions: [{name, hazard, original_measure, kind|vulnerable_to}]. + Returns CrossLink dicts; empty if nothing matches. + """ + out = [] + for sf in safety_functions or []: + vuln = set(sf.get("vulnerable_to") or _SF_KIND_VULN.get(sf.get("kind", ""), set())) + if not vuln: + continue + hits, caps = [], set() + for m in mapped_list: + inter = capabilities_for(m) & vuln + if inter: + hits.append(getattr(m, "finding_id", "")) + caps |= inter + if not hits: + continue + cap_txt = ", ".join(sorted(_CAP_LABEL.get(c, c) for c in caps)) + out.append({ + "cyber_finding_ids": hits, + "safety_hazard": sf.get("hazard", ""), + "safety_ref": sf.get("name", ""), + "original_measure": sf.get("original_measure", ""), + "cyber_breaks_it": ( + "Über {} können die Cyber-Befunde diese Schutzfunktion aushebeln — " + "die mechanisch gemilderte Gefährdung ist wieder offen.".format(cap_txt) + ), + "residual": "offen", + }) + return out diff --git a/backend-compliance/tests/test_cra_safety_bridge.py b/backend-compliance/tests/test_cra_safety_bridge.py new file mode 100644 index 00000000..4fc44c87 --- /dev/null +++ b/backend-compliance/tests/test_cra_safety_bridge.py @@ -0,0 +1,46 @@ +"""Tests for the deterministic cyber-meets-safety bridge.""" +from compliance.services.cra_finding_mapper import ScannerFinding, assess_findings + +SF_ACTUATION = {"name": "Zweihandschaltung Hubwerk", "hazard": "Quetschen", + "original_measure": "PL d", "kind": "prevent_unexpected_actuation"} +SF_INTEGRITY = {"name": "Ueberlastsicherung", "hazard": "Lastabsturz", + "original_measure": "Lastmomentbegrenzer", "kind": "signal_integrity"} + + +def test_default_password_defeats_actuation_safety_function(): + a = assess_findings( + [ScannerFinding(id="pw", title="default password", cwe="CWE-259", severity="critical")], + safety_functions=[SF_ACTUATION]) + assert len(a.cross_links) == 1 + assert "pw" in a.cross_links[0]["cyber_finding_ids"] + assert a.mapped[0].safety_impact is True + assert a.mapped[0].priority_tier == "P0" + + +def test_unencrypted_transit_defeats_signal_integrity_and_floors_low_severity(): + a = assess_findings( + [ScannerFinding(id="mqtt", title="cleartext MQTT", cwe="CWE-319", severity="low")], + safety_functions=[SF_INTEGRITY]) + assert len(a.cross_links) == 1 + assert a.mapped[0].priority_tier == "P0" # safety_impact floors a low-severity finding + + +def test_logging_finding_does_not_defeat_actuation(): + a = assess_findings( + [ScannerFinding(id="log", title="no security logging", cwe="CWE-778", severity="high")], + safety_functions=[SF_ACTUATION]) + assert a.cross_links == [] + assert a.mapped[0].safety_impact is False + + +def test_no_safety_functions_means_no_cross_links(): + a = assess_findings([ScannerFinding(id="pw", title="default password", cwe="CWE-259", severity="critical")]) + assert a.cross_links == [] + + +def test_explicit_vulnerable_to_overrides_kind(): + sf = {"name": "X", "hazard": "H", "vulnerable_to": ["integrity_loss"]} + a = assess_findings( + [ScannerFinding(id="mqtt", title="cleartext", cwe="CWE-319", severity="high")], + safety_functions=[sf]) + assert len(a.cross_links) == 1