"""Cyber-meets-Safety bridge — deterministic, replaces the demo hardcode. Links a cyber finding to a CE-risk-assessment safety function it can DEFEAT, by matching the finding's attack *capability* against what the safety function is *vulnerable to*. A match means: the cyber finding re-opens a hazard that was considered mitigated → it must surface in the CE risk assessment (Machinery Reg 2023/1230) and is flagged safety_impact (which feeds the P0 priority floor). Capabilities are derived from the finding's CRA Annex I category (deterministic, no LLM). Safety functions come from the project's CE risk assessment (passed in); for the demo they are supplied as scenario input. Output matches the frontend CrossLink shape. """ from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS _REQ = {r["req_id"]: r for r in ANNEX_I_REQUIREMENTS} # CRA Annex I category -> attack capabilities the finding grants. _CATEGORY_CAPABILITIES = { "Secure-by-Design": {"remote_actuation", "code_tampering"}, "Authentifizierung": {"auth_bypass", "remote_actuation"}, "Kryptografie": {"integrity_loss"}, "Supply Chain": {"code_tampering"}, "Updates": {"code_tampering", "integrity_loss"}, } # Safety-function kind -> the capabilities that defeat it. _SF_KIND_VULN = { "prevent_unexpected_actuation": {"remote_actuation", "code_tampering", "auth_bypass"}, "signal_integrity": {"integrity_loss", "code_tampering"}, } _CAP_LABEL = { "remote_actuation": "Fern-Auslösung", "code_tampering": "manipulierte Firmware/Software", "integrity_loss": "manipulierte Signale/Parameter", "auth_bypass": "Authentifizierungs-Umgehung", } def capabilities_for(mapped) -> set: req = _REQ.get(getattr(mapped, "primary_requirement", ""), {}) return set(_CATEGORY_CAPABILITIES.get(req.get("category", ""), set())) def build_cross_links(mapped_list: list, safety_functions: list) -> list: """For each safety function, collect the cyber findings that can defeat it. safety_functions: [{name, hazard, original_measure, kind|vulnerable_to}]. Returns CrossLink dicts; empty if nothing matches. """ out = [] for sf in safety_functions or []: vuln = set(sf.get("vulnerable_to") or _SF_KIND_VULN.get(sf.get("kind", ""), set())) if not vuln: continue hits, caps = [], set() for m in mapped_list: inter = capabilities_for(m) & vuln if inter: hits.append(getattr(m, "finding_id", "")) caps |= inter if not hits: continue cap_txt = ", ".join(sorted(_CAP_LABEL.get(c, c) for c in caps)) out.append({ "cyber_finding_ids": hits, "safety_hazard": sf.get("hazard", ""), "safety_ref": sf.get("name", ""), "original_measure": sf.get("original_measure", ""), "cyber_breaks_it": ( "Über {} können die Cyber-Befunde diese Schutzfunktion aushebeln — " "die mechanisch gemilderte Gefährdung ist wieder offen.".format(cap_txt) ), "residual": "offen", }) return out