From 3afb0e7f4d3a14ce1b798a640f3f5710c985bbb1 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 16 Jun 2026 17:17:55 +0200 Subject: [PATCH] =?UTF-8?q?feat(cra):=20neutrale=20Eingangst=C3=BCr-Verdic?= =?UTF-8?q?t-Engine=20(zwingend/ratsam/nicht=20betroffen)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reine, deterministische Verdict-Schicht ueber der bestehenden Annex-III/IV- Klassifikation (kein vierter Klassifizierer): trennt Rechtspflicht von Markt- Druck. Kern: das Inverkehrbringen (ab 11.12.2027), nicht der Entwicklungs- zeitpunkt, entscheidet — Bestandsprodukte, die nach der Frist weiter verkauft werden, fallen unter CRA. Producer-Typen (component/end_device/machine_ integrator/software_app) steuern Default-Annahmen (Anlagenbauer: Vernetzung/OTA vorausgesetzt) + Verdict-Betonung (Komponente => Markt-Druck). Plus Evidence- Checkliste (SBOM/VDP/Patch/Lifecycle/Threat-Model/Logging/Auth/Incident) + Reifegrad. /readiness additiv erweitert (verdict/maturity/digital_elements/ producer_type). 15 Tests gruen. Beispiele: OWIS PS90+, ZwickRoell roboTest. Co-Authored-By: Claude Opus 4.7 --- .../compliance/api/cra_assess_routes.py | 26 ++++- .../compliance/services/cra_applicability.py | 107 ++++++++++++++++++ .../tests/test_cra_applicability.py | 82 ++++++++++++++ 3 files changed, 212 insertions(+), 3 deletions(-) create mode 100644 backend-compliance/compliance/services/cra_applicability.py create mode 100644 backend-compliance/tests/test_cra_applicability.py diff --git a/backend-compliance/compliance/api/cra_assess_routes.py b/backend-compliance/compliance/api/cra_assess_routes.py index dcefaf8f..6a4c3b14 100644 --- a/backend-compliance/compliance/api/cra_assess_routes.py +++ b/backend-compliance/compliance/api/cra_assess_routes.py @@ -15,6 +15,7 @@ from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from compliance.services.cra_finding_mapper import assess_findings_payload +from compliance.services.cra_applicability import compute_verdict, maturity as evidence_maturity, MACHINE_INTEGRATOR from compliance.services.scanner_mcp_client import fetch_findings from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot from compliance.services.cra_use_case_controls import enrich_findings_with_breadth @@ -184,6 +185,12 @@ class ReadinessRequest(BaseModel): remote_maintenance: Optional[bool] = False # implies connectivity + updates user_parameter_app: Optional[bool] = False # implies connectivity + updates is_machinery: Optional[bool] = False # CE machinery -> also Machinery Reg 2023/1230 + # Eingangstür / verdict layer (all optional, additive) + producer_type: Optional[str] = "" # component|end_device|machine_integrator|software_app + placed_on_market_after_2027: Optional[bool] = None # None = unknown -> assumed yes (conservative) + customers_request_cra_evidence: Optional[bool] = False + provided_evidence: Optional[List[str]] = None # evidence keys already in place (sbom, vdp, …) + digital_elements: Optional[List[str]] = None # detected/declared digital elements # CRA Annex I evidence_type -> guideline bucket (Code / Prozess / Dokumentation). @@ -234,10 +241,14 @@ async def readiness(body: ReadinessRequest): """Low-friction CRA readiness check: business-scope answers -> Annex III/IV classification + a high-level guideline grouped Code / Prozess / Dokumentation. Reuses the deterministic classifier + Annex I spine. No project, no DB.""" + machine_integrator = body.producer_type == MACHINE_INTEGRATOR + has_digital = bool(body.digital_elements) + # Machine/plant builders: connectivity, remote maintenance and OTA are the norm. + # Declared digital elements (e.g. from a datasheet upload) imply digital elements too. intake = { "intended_use": body.intended_use, - "connected_to_internet": bool(body.connected_to_internet or body.remote_maintenance or body.user_parameter_app), - "has_software_updates": bool(body.has_software_updates or body.remote_maintenance or body.user_parameter_app), + "connected_to_internet": bool(body.connected_to_internet or body.remote_maintenance or body.user_parameter_app or machine_integrator or has_digital), + "has_software_updates": bool(body.has_software_updates or body.remote_maintenance or body.user_parameter_app or machine_integrator or has_digital), "processes_personal_data": bool(body.processes_personal_data), "is_critical_infra_supplier": bool(body.is_critical_infra_supplier), } @@ -258,13 +269,17 @@ async def readiness(body: ReadinessRequest): }) # Machine/plant builders are ALSO hit by the new Machinery Regulation's # cyber-with-safety essential requirements (Annex III) — show the combination. - if body.is_machinery: + if body.is_machinery or machine_integrator: machinery = _machinery_obligations() if machinery: regulations.append("Maschinen-VO 2023/1230") for bucket, item in machinery: groups[bucket].append(item) total_effort = sum(r["effort_days"] for g in groups.values() for r in g if r.get("effort_days")) + verdict = compute_verdict( + classification, body.placed_on_market_after_2027, + body.producer_type or "", bool(body.customers_request_cra_evidence), + ) return { "in_scope": in_scope, "classification": classification, @@ -275,4 +290,9 @@ async def readiness(body: ReadinessRequest): "counts": {k: len(v) for k, v in groups.items()}, "total_effort_days": total_effort, "deadlines": list(DEADLINES), + # Eingangstür verdict layer + "verdict": verdict, + "maturity": evidence_maturity(body.provided_evidence), + "digital_elements": body.digital_elements or [], + "producer_type": body.producer_type or "", } diff --git a/backend-compliance/compliance/services/cra_applicability.py b/backend-compliance/compliance/services/cra_applicability.py new file mode 100644 index 00000000..e6666432 --- /dev/null +++ b/backend-compliance/compliance/services/cra_applicability.py @@ -0,0 +1,107 @@ +"""Neutral CRA applicability verdict for the 'Eingangstür' readiness check. + +Separates LEGAL obligation from MARKET pull — the distinction SME manufacturers +actually care about: a product may not yet be legally in CRA scope, but its B2B +customers (machine/plant builders) will demand CRA evidence anyway. + +Key rule: not the development date but the PLACING ON THE MARKET (Inverkehrbringen) +decides. A legacy product (e.g. designed 2019) still placed on the market after +the CRA cutoff must be conformant. + +Pure + deterministic: no DB, no LLM, no network. Reuses the existing Annex III/IV +classifier (passed in as `cra_class`) — this is the verdict layer on top, not a +fourth classifier. +""" + +CRA_CUTOFF = "2027-12-11" # CRA main obligations apply to products placed on the market from here + +# Verdict tiers +ZWINGEND = "zwingend" +RATSAM = "ratsam" +NICHT_BETROFFEN = "nicht_betroffen" + +_TIER_LABEL = { + ZWINGEND: "CRA zwingend (Rechtspflicht)", + RATSAM: "CRA nicht zwingend, aber dringend ratsam", + NICHT_BETROFFEN: "CRA nicht betroffen", +} + +# Producer archetypes — drive default assumptions + verdict emphasis. +COMPONENT = "component" # Zulieferteil/Komponente (B2B), z. B. OWIS PS90+ +END_DEVICE = "end_device" # Endgerät +MACHINE_INTEGRATOR = "machine_integrator" # Anlagen-/Maschinenbauer — Internet/OTA meist gegeben +SOFTWARE_APP = "software_app" # App/Frontend/Cloud, keine Hardware +PRODUCER_TYPES = (COMPONENT, END_DEVICE, MACHINE_INTEGRATOR, SOFTWARE_APP) + +# Standard CRA evidence the cyber technical file needs (Annex I/II + Art. 13/14). +EVIDENCE_ITEMS = [ + {"key": "sbom", "label": "Software-Stückliste (SBOM)"}, + {"key": "vdp", "label": "Vulnerability-Disclosure-Policy / Security-Kontakt"}, + {"key": "patch_process", "label": "Patch-/Update-Prozess (signiert)"}, + {"key": "support_lifecycle", "label": "Support-Zeitraum / Lifecycle-Zusage"}, + {"key": "threat_model", "label": "Threat Model / Cyber-Risikobeurteilung"}, + {"key": "security_logging", "label": "Security-Logging"}, + {"key": "auth_concept", "label": "Authentisierung / Passwortkonzept (Netzwerk)"}, + {"key": "incident_process", "label": "Incident-/Meldeprozess (Art. 14, 24/72h)"}, +] +_EVIDENCE_KEYS = {e["key"] for e in EVIDENCE_ITEMS} + + +def in_scope(cra_class: str) -> bool: + """A product is in CRA scope once it has any digital element / Annex match.""" + return (cra_class or "").upper() not in ("", "NOT_IN_SCOPE") + + +def compute_verdict( + cra_class: str, + placed_on_market_after_cutoff, + producer_type: str = "", + customers_request: bool = False, +) -> dict: + """Neutral 3-tier verdict. `placed_on_market_after_cutoff`: True/False/None + (None = unknown → assumed True, conservative: most products keep being sold).""" + scope = in_scope(cra_class) + after_cutoff = True if placed_on_market_after_cutoff is None else bool(placed_on_market_after_cutoff) + # Components are inherently subject to downstream demand even absent an explicit signal. + market_pull = bool(customers_request) or producer_type == COMPONENT + + reasons: list = [] + if not scope: + tier = NICHT_BETROFFEN + reasons.append("Keine digitalen Elemente / keine CRA-Kategorie erkannt.") + if market_pull: + reasons.append("Hinweis: Kunden könnten dennoch CRA-Nachweise anfragen.") + elif after_cutoff: + tier = ZWINGEND + reasons.append( + f"Produkt mit digitalen Elementen und Inverkehrbringen ab {CRA_CUTOFF} → Rechtspflicht." + ) + reasons.append("Maßgeblich ist das Inverkehrbringen, nicht der Entwicklungszeitpunkt.") + else: + tier = RATSAM + reasons.append("Produkt mit digitalen Elementen, aber kein Inverkehrbringen im CRA-Geltungszeitraum.") + reasons.append(f"Sobald ab dem {CRA_CUTOFF} (weiter) in Verkehr gebracht wird, wird CRA zwingend.") + + if market_pull and tier != NICHT_BETROFFEN: + reasons.append("Markt-Druck: B2B-Kunden (Maschinen-/Anlagenbauer) fordern CRA-Nachweise zur Komponente.") + + return { + "tier": tier, + "label": _TIER_LABEL[tier], + "in_scope": scope, + "market_pull": market_pull, + "cra_class": (cra_class or "").upper(), + "cutoff": CRA_CUTOFF, + "placed_on_market_after_cutoff": after_cutoff, + "reasons": reasons, + } + + +def maturity(provided_evidence_keys) -> dict: + """Reifegrad over the standard CRA evidence checklist.""" + have = {k for k in (provided_evidence_keys or []) if k in _EVIDENCE_KEYS} + present = [e for e in EVIDENCE_ITEMS if e["key"] in have] + missing = [e for e in EVIDENCE_ITEMS if e["key"] not in have] + total = len(EVIDENCE_ITEMS) + pct = round(100.0 * len(present) / total) if total else 0 + return {"pct": pct, "present": present, "missing": missing, "total": total} diff --git a/backend-compliance/tests/test_cra_applicability.py b/backend-compliance/tests/test_cra_applicability.py new file mode 100644 index 00000000..fb9e9011 --- /dev/null +++ b/backend-compliance/tests/test_cra_applicability.py @@ -0,0 +1,82 @@ +"""Neutral CRA applicability verdict (Eingangstür): legal duty vs market pull.""" +from compliance.services.cra_applicability import ( + ZWINGEND, RATSAM, NICHT_BETROFFEN, COMPONENT, MACHINE_INTEGRATOR, + compute_verdict, maturity, in_scope, EVIDENCE_ITEMS, +) + + +class TestInScope: + def test_digital_element_classes_are_in_scope(self): + for c in ("STANDARD", "IMPORTANT_I", "IMPORTANT_II", "CRITICAL"): + assert in_scope(c) is True + + def test_not_in_scope(self): + assert in_scope("NOT_IN_SCOPE") is False + assert in_scope("") is False + + +class TestVerdict: + def test_zwingend_when_in_scope_and_after_cutoff(self): + v = compute_verdict("STANDARD", True) + assert v["tier"] == ZWINGEND + assert v["in_scope"] is True + + def test_ratsam_when_in_scope_but_not_after_cutoff(self): + v = compute_verdict("STANDARD", False) + assert v["tier"] == RATSAM + assert any("Geltungszeitraum" in r for r in v["reasons"]) + + def test_nicht_betroffen_when_no_digital_elements(self): + v = compute_verdict("NOT_IN_SCOPE", True) + assert v["tier"] == NICHT_BETROFFEN + assert v["in_scope"] is False + + def test_unknown_market_date_assumed_after_cutoff(self): + # None = unknown -> conservative -> zwingend + v = compute_verdict("STANDARD", None) + assert v["tier"] == ZWINGEND + assert v["placed_on_market_after_cutoff"] is True + + def test_component_has_market_pull_without_explicit_signal(self): + v = compute_verdict("STANDARD", True, producer_type=COMPONENT) + assert v["market_pull"] is True + assert any("Markt-Druck" in r for r in v["reasons"]) + + def test_customers_request_sets_market_pull(self): + v = compute_verdict("STANDARD", True, producer_type="end_device", customers_request=True) + assert v["market_pull"] is True + + def test_end_device_no_signal_no_market_pull(self): + v = compute_verdict("STANDARD", True, producer_type="end_device") + assert v["market_pull"] is False + + def test_not_betroffen_with_market_pull_adds_hint_not_pull_reason(self): + v = compute_verdict("NOT_IN_SCOPE", True, producer_type=COMPONENT) + assert v["tier"] == NICHT_BETROFFEN + assert any("anfragen" in r for r in v["reasons"]) + + def test_class_passthrough_uppercased(self): + v = compute_verdict("important_ii", True) + assert v["cra_class"] == "IMPORTANT_II" + + +class TestMaturity: + def test_empty_is_zero(self): + m = maturity([]) + assert m["pct"] == 0 + assert len(m["missing"]) == len(EVIDENCE_ITEMS) + assert m["present"] == [] + + def test_partial(self): + m = maturity(["sbom", "vdp"]) + assert m["pct"] == round(100 * 2 / len(EVIDENCE_ITEMS)) + assert {e["key"] for e in m["present"]} == {"sbom", "vdp"} + + def test_unknown_keys_ignored(self): + m = maturity(["sbom", "nonsense"]) + assert {e["key"] for e in m["present"]} == {"sbom"} + + def test_full(self): + m = maturity([e["key"] for e in EVIDENCE_ITEMS]) + assert m["pct"] == 100 + assert m["missing"] == []