10c32d7f7c
Deterministic bridge (cra_safety_bridge.py): a cyber finding's attack capability (remote_actuation / code_tampering / integrity_loss / auth_bypass, derived from its CRA category) is matched against what each CE safety function is vulnerable to. A match re-opens the mitigated hazard, flags the finding safety_impact (which floors it to P0), and produces the cross-link. Endpoint accepts safety_functions; frontend passes the project's safety functions and renders the LIVE cross-links (no more hardcode). Safety functions are demo input now; come from the CE risk assessment in production. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
78 lines
3.1 KiB
Python
78 lines
3.1 KiB
Python
"""Cyber-meets-Safety bridge — deterministic, replaces the demo hardcode.
|
|
|
|
Links a cyber finding to a CE-risk-assessment safety function it can DEFEAT, by
|
|
matching the finding's attack *capability* against what the safety function is
|
|
*vulnerable to*. A match means: the cyber finding re-opens a hazard that was
|
|
considered mitigated → it must surface in the CE risk assessment (Machinery Reg
|
|
2023/1230) and is flagged safety_impact (which feeds the P0 priority floor).
|
|
|
|
Capabilities are derived from the finding's CRA Annex I category (deterministic,
|
|
no LLM). Safety functions come from the project's CE risk assessment (passed in);
|
|
for the demo they are supplied as scenario input. Output matches the frontend
|
|
CrossLink shape.
|
|
"""
|
|
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS
|
|
|
|
_REQ = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS}
|
|
|
|
# CRA Annex I category -> attack capabilities the finding grants.
|
|
_CATEGORY_CAPABILITIES = {
|
|
"Secure-by-Design": {"remote_actuation", "code_tampering"},
|
|
"Authentifizierung": {"auth_bypass", "remote_actuation"},
|
|
"Kryptografie": {"integrity_loss"},
|
|
"Supply Chain": {"code_tampering"},
|
|
"Updates": {"code_tampering", "integrity_loss"},
|
|
}
|
|
|
|
# Safety-function kind -> the capabilities that defeat it.
|
|
_SF_KIND_VULN = {
|
|
"prevent_unexpected_actuation": {"remote_actuation", "code_tampering", "auth_bypass"},
|
|
"signal_integrity": {"integrity_loss", "code_tampering"},
|
|
}
|
|
|
|
_CAP_LABEL = {
|
|
"remote_actuation": "Fern-Auslösung",
|
|
"code_tampering": "manipulierte Firmware/Software",
|
|
"integrity_loss": "manipulierte Signale/Parameter",
|
|
"auth_bypass": "Authentifizierungs-Umgehung",
|
|
}
|
|
|
|
|
|
def capabilities_for(mapped) -> set:
|
|
req = _REQ.get(getattr(mapped, "primary_requirement", ""), {})
|
|
return set(_CATEGORY_CAPABILITIES.get(req.get("category", ""), set()))
|
|
|
|
|
|
def build_cross_links(mapped_list: list, safety_functions: list) -> list:
|
|
"""For each safety function, collect the cyber findings that can defeat it.
|
|
|
|
safety_functions: [{name, hazard, original_measure, kind|vulnerable_to}].
|
|
Returns CrossLink dicts; empty if nothing matches.
|
|
"""
|
|
out = []
|
|
for sf in safety_functions or []:
|
|
vuln = set(sf.get("vulnerable_to") or _SF_KIND_VULN.get(sf.get("kind", ""), set()))
|
|
if not vuln:
|
|
continue
|
|
hits, caps = [], set()
|
|
for m in mapped_list:
|
|
inter = capabilities_for(m) & vuln
|
|
if inter:
|
|
hits.append(getattr(m, "finding_id", ""))
|
|
caps |= inter
|
|
if not hits:
|
|
continue
|
|
cap_txt = ", ".join(sorted(_CAP_LABEL.get(c, c) for c in caps))
|
|
out.append({
|
|
"cyber_finding_ids": hits,
|
|
"safety_hazard": sf.get("hazard", ""),
|
|
"safety_ref": sf.get("name", ""),
|
|
"original_measure": sf.get("original_measure", ""),
|
|
"cyber_breaks_it": (
|
|
"Über {} können die Cyber-Befunde diese Schutzfunktion aushebeln — "
|
|
"die mechanisch gemilderte Gefährdung ist wieder offen.".format(cap_txt)
|
|
),
|
|
"residual": "offen",
|
|
})
|
|
return out
|