From 437c2c8fa1f6edf51b53e2a84565cb6c27e7545b Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 14 Jun 2026 12:37:22 +0200 Subject: [PATCH] =?UTF-8?q?feat(cra):=20hardware=20path=20=E2=80=94=20deri?= =?UTF-8?q?ve=20cyber=20findings=20from=20networked=20components?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For hardware CE projects (no repo) each networked component (controller/hmi/ gateway/drive/remote_access/sensor) yields typical ICS vulnerability CLASSES (real CWE + "CISA-ICS — product-specific check" framing, NO fabricated CVEs); they flow through the same CRA engine. /assess accepts components[]. MappedFinding now echoes title/location/cwe so the response is self-contained for any finding source. Live CISA-ICS/NVD per-product CVE lookup is the later enrichment. Co-Authored-By: Claude Opus 4.7 --- .../compliance/api/cra_assess_routes.py | 18 +++++- .../services/cra_component_findings.py | 63 +++++++++++++++++++ .../compliance/services/cra_finding_mapper.py | 9 ++- .../tests/test_cra_component_findings.py | 28 +++++++++ 4 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 backend-compliance/compliance/services/cra_component_findings.py create mode 100644 backend-compliance/tests/test_cra_component_findings.py diff --git a/backend-compliance/compliance/api/cra_assess_routes.py b/backend-compliance/compliance/api/cra_assess_routes.py index 53f83c35..9506cbd3 100644 --- a/backend-compliance/compliance/api/cra_assess_routes.py +++ b/backend-compliance/compliance/api/cra_assess_routes.py @@ -17,6 +17,7 @@ from pydantic import BaseModel from compliance.services.cra_finding_mapper import assess_findings_payload from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot from compliance.services.cra_use_case_controls import enrich_findings_with_breadth +from compliance.services.cra_component_findings import findings_from_components from database import SessionLocal from .tenant_utils import get_tenant_id @@ -44,18 +45,31 @@ class SafetyFunctionIn(BaseModel): vulnerable_to: Optional[List[str]] = None +class ComponentIn(BaseModel): + name: str + component_class: Optional[str] = "" # controller | hmi | gateway | drive | remote_access | sensor + networked: Optional[bool] = False + vendor: Optional[str] = "" + product: Optional[str] = "" + + class AssessRequest(BaseModel): - findings: List[FindingIn] + findings: List[FindingIn] = [] # customer priorities for the discretionary tier: {objective: high|medium|low}. # objectives: access | data | network_api | supply_updates | monitoring. weights: Optional[Dict[str, str]] = None # CE-risk-assessment safety functions for the cyber-meets-safety bridge. safety_functions: Optional[List[SafetyFunctionIn]] = None + # hardware path: networked components -> derived cyber findings (no repo). + components: Optional[List[ComponentIn]] = None def _payload(body: AssessRequest) -> dict: + findings = [f.model_dump() for f in body.findings] + if body.components: + findings = findings + findings_from_components([c.model_dump() for c in body.components]) return { - "findings": [f.model_dump() for f in body.findings], + "findings": findings, "weights": body.weights, "safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None, } diff --git a/backend-compliance/compliance/services/cra_component_findings.py b/backend-compliance/compliance/services/cra_component_findings.py new file mode 100644 index 00000000..f5b8d4c8 --- /dev/null +++ b/backend-compliance/compliance/services/cra_component_findings.py @@ -0,0 +1,63 @@ +"""Derive cyber findings from a machine's networked components (hardware path). + +For a hardware CE project there is no source repo to scan. Instead, each +*networked* component (PLC, HMI, gateway, drive, remote access) carries typical +ICS vulnerability CLASSES — well-established categories repeatedly flagged in +CISA ICS advisories. We emit one finding per class so they flow through the same +CRA engine. + +Honesty: these are vulnerability CLASSES (with a real CWE), framed as +"product-specific check against CISA ICS advisories" — NOT fabricated CVE numbers. +The concrete CVEs per vendor/product are a later live CISA-ICS / NVD enrichment. +""" + +# component_class -> typical ICS vulnerability classes (suffix, title, cwe, severity, category) +_CLASS_VULN_TEMPLATES = { + "controller": [ # PLC / safety controller / motion controller + ("ctrl-noauth", "Steuerungsprotokoll ohne Authentifizierung erreichbar", "CWE-306", "high", "auth"), + ("ctrl-fw", "Firmware-/Programm-Update ohne Signaturpruefung", "CWE-494", "high", "updates"), + ("ctrl-debug", "Offener Programmier-/Debug-Port", "CWE-1188", "medium", "config"), + ], + "hmi": [ # operator panel / SCADA HMI + ("hmi-default", "Default-/Werks-Zugangsdaten", "CWE-1392", "critical", "auth"), + ("hmi-cleartext", "Unverschluesselte Web-/Bedienoberflaeche", "CWE-319", "high", "crypto"), + ], + "gateway": [ # IoT / fieldbus / telemetry gateway + ("gw-telemetry", "Unverschluesselte Telemetrie / Feldbus", "CWE-319", "high", "crypto"), + ("gw-bruteforce", "Kein Brute-Force-/Rate-Limit-Schutz am Zugang", "CWE-307", "medium", "auth"), + ], + "drive": [ # VFD / servo drive + ("drv-param", "Antriebsparameter ueber unauthentifizierten Kanal aenderbar", "CWE-306", "high", "auth"), + ], + "remote_access": [ # remote maintenance / VPN box + ("ra-default", "Fernzugang mit Default-Passwort", "CWE-259", "critical", "auth"), + ("ra-nomfa", "Keine starke Authentifizierung (MFA) fuer Fernzugang", "CWE-287", "high", "auth"), + ], + "sensor": [ # networked sensor + ("sns-integrity", "Messwerte ohne Integritaetsschutz uebertragen", "CWE-345", "medium", "integrity"), + ], +} + +_SOURCE = "Komponenten-Klasse (CISA-ICS-typisch — produktspezifisch pruefen)" + + +def findings_from_components(components: list) -> list: + """components: [{name, component_class, networked, vendor?, product?}]. Returns + finding dicts (ScannerFinding-shaped) for the NETWORKED components only.""" + out = [] + for comp in components or []: + if not comp.get("networked"): + continue + klass = (comp.get("component_class") or "").lower() + name = comp.get("name") or klass or "Komponente" + for suffix, title, cwe, severity, category in _CLASS_VULN_TEMPLATES.get(klass, []): + out.append({ + "id": "{}-{}".format(name, suffix).replace(" ", "_"), + "title": "{} ({})".format(title, name), + "cwe": cwe, + "severity": severity, + "category": category, + "location": name, + "source": _SOURCE, + }) + return out diff --git a/backend-compliance/compliance/services/cra_finding_mapper.py b/backend-compliance/compliance/services/cra_finding_mapper.py index b135575c..ad109191 100644 --- a/backend-compliance/compliance/services/cra_finding_mapper.py +++ b/backend-compliance/compliance/services/cra_finding_mapper.py @@ -90,6 +90,9 @@ class ScannerFinding: @dataclass class MappedFinding: finding_id: str + title: str = "" + location: str = "" + cwe: str = "" requirement_ids: list = field(default_factory=list) primary_requirement: str = "" annex_anchor: str = "" @@ -172,7 +175,8 @@ def map_finding(f: ScannerFinding) -> MappedFinding: finding_sev = (f.severity or _sev_from_cvss(f.cvss)).upper() if not reqs: return MappedFinding( - finding_id=f.id, risk_level=_SEV_BY_RANK.get(_rank(finding_sev), "LOW"), + finding_id=f.id, title=f.title, location=f.location, cwe=f.cwe, + risk_level=_SEV_BY_RANK.get(_rank(finding_sev), "LOW"), rationale="Kein eindeutiger CRA-Anforderungsbezug erkannt — manuelle Pruefung.", unmapped=True, safety_impact=f.safety_impact, exploited=f.exploited, ) @@ -186,6 +190,9 @@ def map_finding(f: ScannerFinding) -> MappedFinding: refs = security_refs_for(reqs) return MappedFinding( finding_id=f.id, + title=f.title, + location=f.location, + cwe=f.cwe, requirement_ids=reqs, primary_requirement=primary["req_id"], annex_anchor=primary.get("annex_anchor", ""), diff --git a/backend-compliance/tests/test_cra_component_findings.py b/backend-compliance/tests/test_cra_component_findings.py new file mode 100644 index 00000000..665cf019 --- /dev/null +++ b/backend-compliance/tests/test_cra_component_findings.py @@ -0,0 +1,28 @@ +"""Tests for the hardware path: components -> cyber findings -> assessment.""" +from compliance.services.cra_component_findings import findings_from_components +from compliance.services.cra_finding_mapper import ScannerFinding, assess_findings + + +def test_networked_controller_yields_cwe_findings(): + fs = findings_from_components([{"name": "SPS-1", "component_class": "controller", "networked": True}]) + assert len(fs) >= 1 + assert all(f["cwe"] for f in fs) + assert all("SPS-1" in f["location"] for f in fs) + assert all("CISA-ICS" in f["source"] for f in fs) # honest framing, no fabricated CVE + + +def test_non_networked_component_yields_nothing(): + assert findings_from_components([{"name": "X", "component_class": "controller", "networked": False}]) == [] + + +def test_unknown_class_yields_nothing(): + assert findings_from_components([{"name": "Y", "component_class": "unknown", "networked": True}]) == [] + + +def test_component_findings_flow_through_assessment_with_echoed_title(): + fs = findings_from_components([{"name": "HMI", "component_class": "hmi", "networked": True}]) + a = assess_findings([ScannerFinding.from_dict(f) for f in fs]) + assert a.findings_total == len(fs) + assert all(m.title for m in a.mapped) # title echoed -> self-contained response + assert all(m.location == "HMI" for m in a.mapped) + assert any(m.primary_requirement == "CRA-AI-8" for m in a.mapped) # default creds -> auth