feat(cra): hardware path — derive cyber findings from networked components

For hardware CE projects (no repo) each networked component (controller/hmi/
gateway/drive/remote_access/sensor) yields typical ICS vulnerability CLASSES
(real CWE + "CISA-ICS — product-specific check" framing, NO fabricated CVEs);
they flow through the same CRA engine. /assess accepts components[]. MappedFinding
now echoes title/location/cwe so the response is self-contained for any finding
source. Live CISA-ICS/NVD per-product CVE lookup is the later enrichment.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-14 12:37:22 +02:00
parent 398eaf3c36
commit 437c2c8fa1
4 changed files with 115 additions and 3 deletions
@@ -17,6 +17,7 @@ from pydantic import BaseModel
from compliance.services.cra_finding_mapper import assess_findings_payload
from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot
from compliance.services.cra_use_case_controls import enrich_findings_with_breadth
from compliance.services.cra_component_findings import findings_from_components
from database import SessionLocal
from .tenant_utils import get_tenant_id
@@ -44,18 +45,31 @@ class SafetyFunctionIn(BaseModel):
vulnerable_to: Optional[List[str]] = None
class ComponentIn(BaseModel):
name: str
component_class: Optional[str] = "" # controller | hmi | gateway | drive | remote_access | sensor
networked: Optional[bool] = False
vendor: Optional[str] = ""
product: Optional[str] = ""
class AssessRequest(BaseModel):
findings: List[FindingIn]
findings: List[FindingIn] = []
# customer priorities for the discretionary tier: {objective: high|medium|low}.
# objectives: access | data | network_api | supply_updates | monitoring.
weights: Optional[Dict[str, str]] = None
# CE-risk-assessment safety functions for the cyber-meets-safety bridge.
safety_functions: Optional[List[SafetyFunctionIn]] = None
# hardware path: networked components -> derived cyber findings (no repo).
components: Optional[List[ComponentIn]] = None
def _payload(body: AssessRequest) -> dict:
findings = [f.model_dump() for f in body.findings]
if body.components:
findings = findings + findings_from_components([c.model_dump() for c in body.components])
return {
"findings": [f.model_dump() for f in body.findings],
"findings": findings,
"weights": body.weights,
"safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None,
}
@@ -0,0 +1,63 @@
"""Derive cyber findings from a machine's networked components (hardware path).
For a hardware CE project there is no source repo to scan. Instead, each
*networked* component (PLC, HMI, gateway, drive, remote access) carries typical
ICS vulnerability CLASSES — well-established categories repeatedly flagged in
CISA ICS advisories. We emit one finding per class so they flow through the same
CRA engine.
Honesty: these are vulnerability CLASSES (with a real CWE), framed as
"product-specific check against CISA ICS advisories" — NOT fabricated CVE numbers.
The concrete CVEs per vendor/product are a later live CISA-ICS / NVD enrichment.
"""
# component_class -> typical ICS vulnerability classes (suffix, title, cwe, severity, category)
_CLASS_VULN_TEMPLATES = {
"controller": [ # PLC / safety controller / motion controller
("ctrl-noauth", "Steuerungsprotokoll ohne Authentifizierung erreichbar", "CWE-306", "high", "auth"),
("ctrl-fw", "Firmware-/Programm-Update ohne Signaturpruefung", "CWE-494", "high", "updates"),
("ctrl-debug", "Offener Programmier-/Debug-Port", "CWE-1188", "medium", "config"),
],
"hmi": [ # operator panel / SCADA HMI
("hmi-default", "Default-/Werks-Zugangsdaten", "CWE-1392", "critical", "auth"),
("hmi-cleartext", "Unverschluesselte Web-/Bedienoberflaeche", "CWE-319", "high", "crypto"),
],
"gateway": [ # IoT / fieldbus / telemetry gateway
("gw-telemetry", "Unverschluesselte Telemetrie / Feldbus", "CWE-319", "high", "crypto"),
("gw-bruteforce", "Kein Brute-Force-/Rate-Limit-Schutz am Zugang", "CWE-307", "medium", "auth"),
],
"drive": [ # VFD / servo drive
("drv-param", "Antriebsparameter ueber unauthentifizierten Kanal aenderbar", "CWE-306", "high", "auth"),
],
"remote_access": [ # remote maintenance / VPN box
("ra-default", "Fernzugang mit Default-Passwort", "CWE-259", "critical", "auth"),
("ra-nomfa", "Keine starke Authentifizierung (MFA) fuer Fernzugang", "CWE-287", "high", "auth"),
],
"sensor": [ # networked sensor
("sns-integrity", "Messwerte ohne Integritaetsschutz uebertragen", "CWE-345", "medium", "integrity"),
],
}
_SOURCE = "Komponenten-Klasse (CISA-ICS-typisch — produktspezifisch pruefen)"
def findings_from_components(components: list) -> list:
"""components: [{name, component_class, networked, vendor?, product?}]. Returns
finding dicts (ScannerFinding-shaped) for the NETWORKED components only."""
out = []
for comp in components or []:
if not comp.get("networked"):
continue
klass = (comp.get("component_class") or "").lower()
name = comp.get("name") or klass or "Komponente"
for suffix, title, cwe, severity, category in _CLASS_VULN_TEMPLATES.get(klass, []):
out.append({
"id": "{}-{}".format(name, suffix).replace(" ", "_"),
"title": "{} ({})".format(title, name),
"cwe": cwe,
"severity": severity,
"category": category,
"location": name,
"source": _SOURCE,
})
return out
@@ -90,6 +90,9 @@ class ScannerFinding:
@dataclass
class MappedFinding:
finding_id: str
title: str = ""
location: str = ""
cwe: str = ""
requirement_ids: list = field(default_factory=list)
primary_requirement: str = ""
annex_anchor: str = ""
@@ -172,7 +175,8 @@ def map_finding(f: ScannerFinding) -> MappedFinding:
finding_sev = (f.severity or _sev_from_cvss(f.cvss)).upper()
if not reqs:
return MappedFinding(
finding_id=f.id, risk_level=_SEV_BY_RANK.get(_rank(finding_sev), "LOW"),
finding_id=f.id, title=f.title, location=f.location, cwe=f.cwe,
risk_level=_SEV_BY_RANK.get(_rank(finding_sev), "LOW"),
rationale="Kein eindeutiger CRA-Anforderungsbezug erkannt — manuelle Pruefung.",
unmapped=True, safety_impact=f.safety_impact, exploited=f.exploited,
)
@@ -186,6 +190,9 @@ def map_finding(f: ScannerFinding) -> MappedFinding:
refs = security_refs_for(reqs)
return MappedFinding(
finding_id=f.id,
title=f.title,
location=f.location,
cwe=f.cwe,
requirement_ids=reqs,
primary_requirement=primary["req_id"],
annex_anchor=primary.get("annex_anchor", ""),