feat(cra): neutrale Eingangstür-Verdict-Engine (zwingend/ratsam/nicht betroffen)
CI / detect-changes (push) Successful in 20s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 10s
CI / validate-canonical-controls (push) Successful in 12s
CI / loc-budget (push) Successful in 24s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 3m11s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 33s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

Reine, deterministische Verdict-Schicht ueber der bestehenden Annex-III/IV-
Klassifikation (kein vierter Klassifizierer): trennt Rechtspflicht von Markt-
Druck. Kern: das Inverkehrbringen (ab 11.12.2027), nicht der Entwicklungs-
zeitpunkt, entscheidet — Bestandsprodukte, die nach der Frist weiter verkauft
werden, fallen unter CRA. Producer-Typen (component/end_device/machine_
integrator/software_app) steuern Default-Annahmen (Anlagenbauer: Vernetzung/OTA
vorausgesetzt) + Verdict-Betonung (Komponente => Markt-Druck). Plus Evidence-
Checkliste (SBOM/VDP/Patch/Lifecycle/Threat-Model/Logging/Auth/Incident) +
Reifegrad. /readiness additiv erweitert (verdict/maturity/digital_elements/
producer_type). 15 Tests gruen. Beispiele: OWIS PS90+, ZwickRoell roboTest.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-16 17:17:55 +02:00
parent 8086b8be03
commit 3afb0e7f4d
3 changed files with 212 additions and 3 deletions
@@ -15,6 +15,7 @@ from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from compliance.services.cra_finding_mapper import assess_findings_payload from compliance.services.cra_finding_mapper import assess_findings_payload
from compliance.services.cra_applicability import compute_verdict, maturity as evidence_maturity, MACHINE_INTEGRATOR
from compliance.services.scanner_mcp_client import fetch_findings from compliance.services.scanner_mcp_client import fetch_findings
from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot
from compliance.services.cra_use_case_controls import enrich_findings_with_breadth from compliance.services.cra_use_case_controls import enrich_findings_with_breadth
@@ -184,6 +185,12 @@ class ReadinessRequest(BaseModel):
remote_maintenance: Optional[bool] = False # implies connectivity + updates remote_maintenance: Optional[bool] = False # implies connectivity + updates
user_parameter_app: Optional[bool] = False # implies connectivity + updates user_parameter_app: Optional[bool] = False # implies connectivity + updates
is_machinery: Optional[bool] = False # CE machinery -> also Machinery Reg 2023/1230 is_machinery: Optional[bool] = False # CE machinery -> also Machinery Reg 2023/1230
# Eingangstür / verdict layer (all optional, additive)
producer_type: Optional[str] = "" # component|end_device|machine_integrator|software_app
placed_on_market_after_2027: Optional[bool] = None # None = unknown -> assumed yes (conservative)
customers_request_cra_evidence: Optional[bool] = False
provided_evidence: Optional[List[str]] = None # evidence keys already in place (sbom, vdp, …)
digital_elements: Optional[List[str]] = None # detected/declared digital elements
# CRA Annex I evidence_type -> guideline bucket (Code / Prozess / Dokumentation). # CRA Annex I evidence_type -> guideline bucket (Code / Prozess / Dokumentation).
@@ -234,10 +241,14 @@ async def readiness(body: ReadinessRequest):
"""Low-friction CRA readiness check: business-scope answers -> Annex III/IV """Low-friction CRA readiness check: business-scope answers -> Annex III/IV
classification + a high-level guideline grouped Code / Prozess / Dokumentation. classification + a high-level guideline grouped Code / Prozess / Dokumentation.
Reuses the deterministic classifier + Annex I spine. No project, no DB.""" Reuses the deterministic classifier + Annex I spine. No project, no DB."""
machine_integrator = body.producer_type == MACHINE_INTEGRATOR
has_digital = bool(body.digital_elements)
# Machine/plant builders: connectivity, remote maintenance and OTA are the norm.
# Declared digital elements (e.g. from a datasheet upload) imply digital elements too.
intake = { intake = {
"intended_use": body.intended_use, "intended_use": body.intended_use,
"connected_to_internet": bool(body.connected_to_internet or body.remote_maintenance or body.user_parameter_app), "connected_to_internet": bool(body.connected_to_internet or body.remote_maintenance or body.user_parameter_app or machine_integrator or has_digital),
"has_software_updates": bool(body.has_software_updates or body.remote_maintenance or body.user_parameter_app), "has_software_updates": bool(body.has_software_updates or body.remote_maintenance or body.user_parameter_app or machine_integrator or has_digital),
"processes_personal_data": bool(body.processes_personal_data), "processes_personal_data": bool(body.processes_personal_data),
"is_critical_infra_supplier": bool(body.is_critical_infra_supplier), "is_critical_infra_supplier": bool(body.is_critical_infra_supplier),
} }
@@ -258,13 +269,17 @@ async def readiness(body: ReadinessRequest):
}) })
# Machine/plant builders are ALSO hit by the new Machinery Regulation's # Machine/plant builders are ALSO hit by the new Machinery Regulation's
# cyber-with-safety essential requirements (Annex III) — show the combination. # cyber-with-safety essential requirements (Annex III) — show the combination.
if body.is_machinery: if body.is_machinery or machine_integrator:
machinery = _machinery_obligations() machinery = _machinery_obligations()
if machinery: if machinery:
regulations.append("Maschinen-VO 2023/1230") regulations.append("Maschinen-VO 2023/1230")
for bucket, item in machinery: for bucket, item in machinery:
groups[bucket].append(item) groups[bucket].append(item)
total_effort = sum(r["effort_days"] for g in groups.values() for r in g if r.get("effort_days")) total_effort = sum(r["effort_days"] for g in groups.values() for r in g if r.get("effort_days"))
verdict = compute_verdict(
classification, body.placed_on_market_after_2027,
body.producer_type or "", bool(body.customers_request_cra_evidence),
)
return { return {
"in_scope": in_scope, "in_scope": in_scope,
"classification": classification, "classification": classification,
@@ -275,4 +290,9 @@ async def readiness(body: ReadinessRequest):
"counts": {k: len(v) for k, v in groups.items()}, "counts": {k: len(v) for k, v in groups.items()},
"total_effort_days": total_effort, "total_effort_days": total_effort,
"deadlines": list(DEADLINES), "deadlines": list(DEADLINES),
# Eingangstür verdict layer
"verdict": verdict,
"maturity": evidence_maturity(body.provided_evidence),
"digital_elements": body.digital_elements or [],
"producer_type": body.producer_type or "",
} }
@@ -0,0 +1,107 @@
"""Neutral CRA applicability verdict for the 'Eingangstür' readiness check.
Separates LEGAL obligation from MARKET pull — the distinction SME manufacturers
actually care about: a product may not yet be legally in CRA scope, but its B2B
customers (machine/plant builders) will demand CRA evidence anyway.
Key rule: not the development date but the PLACING ON THE MARKET (Inverkehrbringen)
decides. A legacy product (e.g. designed 2019) still placed on the market after
the CRA cutoff must be conformant.
Pure + deterministic: no DB, no LLM, no network. Reuses the existing Annex III/IV
classifier (passed in as `cra_class`) — this is the verdict layer on top, not a
fourth classifier.
"""
CRA_CUTOFF = "2027-12-11" # CRA main obligations apply to products placed on the market from here
# Verdict tiers
ZWINGEND = "zwingend"
RATSAM = "ratsam"
NICHT_BETROFFEN = "nicht_betroffen"
_TIER_LABEL = {
ZWINGEND: "CRA zwingend (Rechtspflicht)",
RATSAM: "CRA nicht zwingend, aber dringend ratsam",
NICHT_BETROFFEN: "CRA nicht betroffen",
}
# Producer archetypes — drive default assumptions + verdict emphasis.
COMPONENT = "component" # Zulieferteil/Komponente (B2B), z. B. OWIS PS90+
END_DEVICE = "end_device" # Endgerät
MACHINE_INTEGRATOR = "machine_integrator" # Anlagen-/Maschinenbauer — Internet/OTA meist gegeben
SOFTWARE_APP = "software_app" # App/Frontend/Cloud, keine Hardware
PRODUCER_TYPES = (COMPONENT, END_DEVICE, MACHINE_INTEGRATOR, SOFTWARE_APP)
# Standard CRA evidence the cyber technical file needs (Annex I/II + Art. 13/14).
EVIDENCE_ITEMS = [
{"key": "sbom", "label": "Software-Stückliste (SBOM)"},
{"key": "vdp", "label": "Vulnerability-Disclosure-Policy / Security-Kontakt"},
{"key": "patch_process", "label": "Patch-/Update-Prozess (signiert)"},
{"key": "support_lifecycle", "label": "Support-Zeitraum / Lifecycle-Zusage"},
{"key": "threat_model", "label": "Threat Model / Cyber-Risikobeurteilung"},
{"key": "security_logging", "label": "Security-Logging"},
{"key": "auth_concept", "label": "Authentisierung / Passwortkonzept (Netzwerk)"},
{"key": "incident_process", "label": "Incident-/Meldeprozess (Art. 14, 24/72h)"},
]
_EVIDENCE_KEYS = {e["key"] for e in EVIDENCE_ITEMS}
def in_scope(cra_class: str) -> bool:
"""A product is in CRA scope once it has any digital element / Annex match."""
return (cra_class or "").upper() not in ("", "NOT_IN_SCOPE")
def compute_verdict(
cra_class: str,
placed_on_market_after_cutoff,
producer_type: str = "",
customers_request: bool = False,
) -> dict:
"""Neutral 3-tier verdict. `placed_on_market_after_cutoff`: True/False/None
(None = unknown → assumed True, conservative: most products keep being sold)."""
scope = in_scope(cra_class)
after_cutoff = True if placed_on_market_after_cutoff is None else bool(placed_on_market_after_cutoff)
# Components are inherently subject to downstream demand even absent an explicit signal.
market_pull = bool(customers_request) or producer_type == COMPONENT
reasons: list = []
if not scope:
tier = NICHT_BETROFFEN
reasons.append("Keine digitalen Elemente / keine CRA-Kategorie erkannt.")
if market_pull:
reasons.append("Hinweis: Kunden könnten dennoch CRA-Nachweise anfragen.")
elif after_cutoff:
tier = ZWINGEND
reasons.append(
f"Produkt mit digitalen Elementen und Inverkehrbringen ab {CRA_CUTOFF} → Rechtspflicht."
)
reasons.append("Maßgeblich ist das Inverkehrbringen, nicht der Entwicklungszeitpunkt.")
else:
tier = RATSAM
reasons.append("Produkt mit digitalen Elementen, aber kein Inverkehrbringen im CRA-Geltungszeitraum.")
reasons.append(f"Sobald ab dem {CRA_CUTOFF} (weiter) in Verkehr gebracht wird, wird CRA zwingend.")
if market_pull and tier != NICHT_BETROFFEN:
reasons.append("Markt-Druck: B2B-Kunden (Maschinen-/Anlagenbauer) fordern CRA-Nachweise zur Komponente.")
return {
"tier": tier,
"label": _TIER_LABEL[tier],
"in_scope": scope,
"market_pull": market_pull,
"cra_class": (cra_class or "").upper(),
"cutoff": CRA_CUTOFF,
"placed_on_market_after_cutoff": after_cutoff,
"reasons": reasons,
}
def maturity(provided_evidence_keys) -> dict:
"""Reifegrad over the standard CRA evidence checklist."""
have = {k for k in (provided_evidence_keys or []) if k in _EVIDENCE_KEYS}
present = [e for e in EVIDENCE_ITEMS if e["key"] in have]
missing = [e for e in EVIDENCE_ITEMS if e["key"] not in have]
total = len(EVIDENCE_ITEMS)
pct = round(100.0 * len(present) / total) if total else 0
return {"pct": pct, "present": present, "missing": missing, "total": total}
@@ -0,0 +1,82 @@
"""Neutral CRA applicability verdict (Eingangstür): legal duty vs market pull."""
from compliance.services.cra_applicability import (
ZWINGEND, RATSAM, NICHT_BETROFFEN, COMPONENT, MACHINE_INTEGRATOR,
compute_verdict, maturity, in_scope, EVIDENCE_ITEMS,
)
class TestInScope:
def test_digital_element_classes_are_in_scope(self):
for c in ("STANDARD", "IMPORTANT_I", "IMPORTANT_II", "CRITICAL"):
assert in_scope(c) is True
def test_not_in_scope(self):
assert in_scope("NOT_IN_SCOPE") is False
assert in_scope("") is False
class TestVerdict:
def test_zwingend_when_in_scope_and_after_cutoff(self):
v = compute_verdict("STANDARD", True)
assert v["tier"] == ZWINGEND
assert v["in_scope"] is True
def test_ratsam_when_in_scope_but_not_after_cutoff(self):
v = compute_verdict("STANDARD", False)
assert v["tier"] == RATSAM
assert any("Geltungszeitraum" in r for r in v["reasons"])
def test_nicht_betroffen_when_no_digital_elements(self):
v = compute_verdict("NOT_IN_SCOPE", True)
assert v["tier"] == NICHT_BETROFFEN
assert v["in_scope"] is False
def test_unknown_market_date_assumed_after_cutoff(self):
# None = unknown -> conservative -> zwingend
v = compute_verdict("STANDARD", None)
assert v["tier"] == ZWINGEND
assert v["placed_on_market_after_cutoff"] is True
def test_component_has_market_pull_without_explicit_signal(self):
v = compute_verdict("STANDARD", True, producer_type=COMPONENT)
assert v["market_pull"] is True
assert any("Markt-Druck" in r for r in v["reasons"])
def test_customers_request_sets_market_pull(self):
v = compute_verdict("STANDARD", True, producer_type="end_device", customers_request=True)
assert v["market_pull"] is True
def test_end_device_no_signal_no_market_pull(self):
v = compute_verdict("STANDARD", True, producer_type="end_device")
assert v["market_pull"] is False
def test_not_betroffen_with_market_pull_adds_hint_not_pull_reason(self):
v = compute_verdict("NOT_IN_SCOPE", True, producer_type=COMPONENT)
assert v["tier"] == NICHT_BETROFFEN
assert any("anfragen" in r for r in v["reasons"])
def test_class_passthrough_uppercased(self):
v = compute_verdict("important_ii", True)
assert v["cra_class"] == "IMPORTANT_II"
class TestMaturity:
def test_empty_is_zero(self):
m = maturity([])
assert m["pct"] == 0
assert len(m["missing"]) == len(EVIDENCE_ITEMS)
assert m["present"] == []
def test_partial(self):
m = maturity(["sbom", "vdp"])
assert m["pct"] == round(100 * 2 / len(EVIDENCE_ITEMS))
assert {e["key"] for e in m["present"]} == {"sbom", "vdp"}
def test_unknown_keys_ignored(self):
m = maturity(["sbom", "nonsense"])
assert {e["key"] for e in m["present"]} == {"sbom"}
def test_full(self):
m = maturity([e["key"] for e in EVIDENCE_ITEMS])
assert m["pct"] == 100
assert m["missing"] == []