feat(cra): cyber-meets-safety bridge as real logic (step 2)

Deterministic bridge (cra_safety_bridge.py): a cyber finding's attack capability
(remote_actuation / code_tampering / integrity_loss / auth_bypass, derived from
its CRA category) is matched against what each CE safety function is vulnerable
to. A match re-opens the mitigated hazard, flags the finding safety_impact (which
floors it to P0), and produces the cross-link. Endpoint accepts safety_functions;
frontend passes the project's safety functions and renders the LIVE cross-links
(no more hardcode). Safety functions are demo input now; come from the CE risk
assessment in production.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-14 08:59:41 +02:00
parent fb4d7641ab
commit 10c32d7f7c
5 changed files with 176 additions and 9 deletions
@@ -32,14 +32,28 @@ class FindingIn(BaseModel):
exploited: Optional[bool] = False
class SafetyFunctionIn(BaseModel):
name: str
hazard: Optional[str] = ""
original_measure: Optional[str] = ""
kind: Optional[str] = "" # prevent_unexpected_actuation | signal_integrity
vulnerable_to: Optional[List[str]] = None
class AssessRequest(BaseModel):
findings: List[FindingIn]
# customer priorities for the discretionary tier: {objective: high|medium|low}.
# objectives: access | data | network_api | supply_updates | monitoring.
weights: Optional[Dict[str, str]] = None
# CE-risk-assessment safety functions for the cyber-meets-safety bridge.
safety_functions: Optional[List[SafetyFunctionIn]] = None
@router.post("/assess")
async def assess(body: AssessRequest):
payload = {"findings": [f.model_dump() for f in body.findings], "weights": body.weights}
payload = {
"findings": [f.model_dump() for f in body.findings],
"weights": body.weights,
"safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None,
}
return assess_findings_payload(payload)
@@ -16,6 +16,7 @@ from typing import Optional
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES
from compliance.services.cra_security_crosswalk import security_refs_for
from compliance.services.cra_prioritizer import prioritize, OBJECTIVES
from compliance.services.cra_safety_bridge import build_cross_links
_REQ_INDEX = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS}
_SEV_ORDER = {"LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@@ -120,6 +121,7 @@ class CRAAssessment:
coverage_pct: float = 0.0
quick_wins: list = field(default_factory=list) # finding_ids: high impact, low effort
objectives: list = field(default_factory=lambda: list(OBJECTIVES))
cross_links: list = field(default_factory=list) # cyber-meets-safety bridge
deadlines: list = field(default_factory=lambda: list(DEADLINES))
@@ -197,13 +199,21 @@ def map_finding(f: ScannerFinding) -> MappedFinding:
)
def assess_findings(findings: list, weights=None) -> CRAAssessment:
def assess_findings(findings: list, weights=None, safety_functions=None) -> CRAAssessment:
"""Map findings to a deterministic CRA assessment, then prioritise them.
weights: {objective: 'high'|'medium'|'low'} — the customer's priorities for
the discretionary tier (the P0 floor ignores them).
weights: {objective: 'high'|'medium'|'low'} — customer priorities for the
discretionary tier (the P0 floor ignores them).
safety_functions: CE-risk-assessment safety functions for the cyber-meets-
safety bridge; a finding that can defeat one is flagged safety_impact (→ P0).
"""
mapped = prioritize([map_finding(f) for f in findings], weights)
mapped = [map_finding(f) for f in findings]
cross_links = build_cross_links(mapped, safety_functions)
flagged = {fid for cl in cross_links for fid in cl["cyber_finding_ids"]}
for m in mapped:
if m.finding_id in flagged:
m.safety_impact = True
mapped = prioritize(mapped, weights)
by_risk = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
reqs_touched, measure_ids, unmapped = set(), [], []
for m in mapped:
@@ -226,6 +236,7 @@ def assess_findings(findings: list, weights=None) -> CRAAssessment:
unmapped_findings=unmapped,
coverage_pct=round(100.0 * covered / total, 1) if total else 0.0,
quick_wins=[m.finding_id for m in mapped if m.quick_win],
cross_links=cross_links,
)
@@ -236,6 +247,7 @@ def assess_findings_payload(payload: dict) -> dict:
"""
raw = payload.get("findings", []) if isinstance(payload, dict) else []
weights = payload.get("weights") if isinstance(payload, dict) else None
safety_functions = payload.get("safety_functions") if isinstance(payload, dict) else None
findings = [ScannerFinding.from_dict(d) for d in raw]
assessment = assess_findings(findings, weights)
assessment = assess_findings(findings, weights, safety_functions)
return asdict(assessment) # recurses into nested MappedFinding dataclasses
@@ -0,0 +1,77 @@
"""Cyber-meets-Safety bridge — deterministic, replaces the demo hardcode.
Links a cyber finding to a CE-risk-assessment safety function it can DEFEAT, by
matching the finding's attack *capability* against what the safety function is
*vulnerable to*. A match means: the cyber finding re-opens a hazard that was
considered mitigated → it must surface in the CE risk assessment (Machinery Reg
2023/1230) and is flagged safety_impact (which feeds the P0 priority floor).
Capabilities are derived from the finding's CRA Annex I category (deterministic,
no LLM). Safety functions come from the project's CE risk assessment (passed in);
for the demo they are supplied as scenario input. Output matches the frontend
CrossLink shape.
"""
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS
_REQ = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS}
# CRA Annex I category -> attack capabilities the finding grants.
_CATEGORY_CAPABILITIES = {
"Secure-by-Design": {"remote_actuation", "code_tampering"},
"Authentifizierung": {"auth_bypass", "remote_actuation"},
"Kryptografie": {"integrity_loss"},
"Supply Chain": {"code_tampering"},
"Updates": {"code_tampering", "integrity_loss"},
}
# Safety-function kind -> the capabilities that defeat it.
_SF_KIND_VULN = {
"prevent_unexpected_actuation": {"remote_actuation", "code_tampering", "auth_bypass"},
"signal_integrity": {"integrity_loss", "code_tampering"},
}
_CAP_LABEL = {
"remote_actuation": "Fern-Auslösung",
"code_tampering": "manipulierte Firmware/Software",
"integrity_loss": "manipulierte Signale/Parameter",
"auth_bypass": "Authentifizierungs-Umgehung",
}
def capabilities_for(mapped) -> set:
req = _REQ.get(getattr(mapped, "primary_requirement", ""), {})
return set(_CATEGORY_CAPABILITIES.get(req.get("category", ""), set()))
def build_cross_links(mapped_list: list, safety_functions: list) -> list:
"""For each safety function, collect the cyber findings that can defeat it.
safety_functions: [{name, hazard, original_measure, kind|vulnerable_to}].
Returns CrossLink dicts; empty if nothing matches.
"""
out = []
for sf in safety_functions or []:
vuln = set(sf.get("vulnerable_to") or _SF_KIND_VULN.get(sf.get("kind", ""), set()))
if not vuln:
continue
hits, caps = [], set()
for m in mapped_list:
inter = capabilities_for(m) & vuln
if inter:
hits.append(getattr(m, "finding_id", ""))
caps |= inter
if not hits:
continue
cap_txt = ", ".join(sorted(_CAP_LABEL.get(c, c) for c in caps))
out.append({
"cyber_finding_ids": hits,
"safety_hazard": sf.get("hazard", ""),
"safety_ref": sf.get("name", ""),
"original_measure": sf.get("original_measure", ""),
"cyber_breaks_it": (
"Über {} können die Cyber-Befunde diese Schutzfunktion aushebeln — "
"die mechanisch gemilderte Gefährdung ist wieder offen.".format(cap_txt)
),
"residual": "offen",
})
return out