Files
breakpilot-compliance/backend-compliance/compliance/api/cra_assess_routes.py
T
Benjamin Admin b0f78ae9a3 feat(cra): readiness derives obligations from Machinery Reg 2023/1230 too
Machine/plant builders are hit by BOTH the CRA and the new Machinery Regulation.
New machinery_reg_cyber.py models its two well-corroborated Annex III cyber-with-
safety essential requirements (1.1.9 protection against corruption, 1.2.1 control-
system safety incl. foreseeable manipulation) in our own words; EU legal text is
freely reusable (Commission Decision 2011/833/EU, source acknowledged), harmonised
standards referenced by identifier only. The readiness check asks "is it
machinery?" and, if so, adds these obligations tagged "Maschinen-VO" alongside the
CRA ones — the combination is visible (regulations list + per-item source badge).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-14 14:26:08 +02:00

196 lines
8.4 KiB
Python

"""Standalone CRA cyber risk-assessment endpoint.
POST /api/v1/cra/assess — takes the findings the external repo-scanner already
produced and returns the deterministic CRA assessment: each finding mapped to
the CRA Annex I requirement(s) it violates, a risk level, the curated CRA
measures, and the NIST 800-53 / OWASP Top 10 golden-set crosswalk.
Project-less by design: works standalone for ANY customer — including those with
no CE risk assessment and no FMEA yet (the mandatory baseline). Reuses the fully
tested mapper; no DB, no LLM, no RAG. Same logic the MCP server exposes.
"""
from typing import Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from compliance.services.cra_finding_mapper import assess_findings_payload
from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot
from compliance.services.cra_use_case_controls import enrich_findings_with_breadth
from compliance.services.cra_component_findings import findings_from_components
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES
from compliance.api.cra_routes import _classify # reuse the deterministic Annex III/IV classifier
from compliance.api.machinery_reg_cyber import MACHINERY_REG_CYBER
from database import SessionLocal
from .tenant_utils import get_tenant_id
router = APIRouter(prefix="/v1/cra", tags=["cra"])
class FindingIn(BaseModel):
id: str
title: Optional[str] = ""
description: Optional[str] = ""
category: Optional[str] = ""
cwe: Optional[str] = ""
severity: Optional[str] = ""
cvss: Optional[float] = None
location: Optional[str] = ""
safety_impact: Optional[bool] = False
exploited: Optional[bool] = False
class SafetyFunctionIn(BaseModel):
name: str
hazard: Optional[str] = ""
original_measure: Optional[str] = ""
kind: Optional[str] = "" # prevent_unexpected_actuation | signal_integrity
vulnerable_to: Optional[List[str]] = None
class ComponentIn(BaseModel):
name: str
component_class: Optional[str] = "" # controller | hmi | gateway | drive | remote_access | sensor
networked: Optional[bool] = False
vendor: Optional[str] = ""
product: Optional[str] = ""
class AssessRequest(BaseModel):
findings: List[FindingIn] = []
# customer priorities for the discretionary tier: {objective: high|medium|low}.
# objectives: access | data | network_api | supply_updates | monitoring.
weights: Optional[Dict[str, str]] = None
# CE-risk-assessment safety functions for the cyber-meets-safety bridge.
safety_functions: Optional[List[SafetyFunctionIn]] = None
# hardware path: networked components -> derived cyber findings (no repo).
components: Optional[List[ComponentIn]] = None
def _payload(body: AssessRequest) -> dict:
findings = [f.model_dump() for f in body.findings]
if body.components:
findings = findings + findings_from_components([c.model_dump() for c in body.components])
return {
"findings": findings,
"weights": body.weights,
"safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None,
}
def _assess_enriched(body: AssessRequest) -> dict:
"""Assessment + the network_security regulatory breadth (atom-grain).
Breadth is attached at this view layer (db here), never in the pure mapper.
"""
result = assess_findings_payload(_payload(body))
db = SessionLocal()
try:
enrich_findings_with_breadth(result.get("mapped", []), db)
finally:
db.close()
return result
@router.post("/assess")
async def assess(body: AssessRequest):
return _assess_enriched(body)
@router.post("/projects/{project_id}/assess-snapshot")
async def assess_snapshot(project_id: str, body: AssessRequest, tenant_id: str = Depends(get_tenant_id)):
"""Run the assessment and persist it as a versioned snapshot (running system)."""
assessment = _assess_enriched(body)
snap = save_snapshot(project_id, tenant_id, assessment)
return {"snapshot": snap, "assessment": assessment}
@router.get("/projects/{project_id}/assess-snapshots")
async def list_assess_snapshots(project_id: str, tenant_id: str = Depends(get_tenant_id)):
return {"snapshots": list_snapshots(project_id, tenant_id)}
@router.get("/assess-snapshots/{snapshot_id}")
async def get_assess_snapshot(snapshot_id: str, tenant_id: str = Depends(get_tenant_id)):
snap = get_snapshot(snapshot_id, tenant_id)
if not snap:
raise HTTPException(status_code=404, detail="Snapshot not found")
return snap
# --- Lead-magnet readiness check (stateless, no project, no DB) ---
class ReadinessRequest(BaseModel):
intended_use: Optional[str] = ""
connected_to_internet: Optional[bool] = False
has_software_updates: Optional[bool] = False
processes_personal_data: Optional[bool] = False
is_critical_infra_supplier: Optional[bool] = False
has_firmware: Optional[bool] = False
remote_maintenance: Optional[bool] = False # implies connectivity + updates
user_parameter_app: Optional[bool] = False # implies connectivity + updates
is_machinery: Optional[bool] = False # CE machinery -> also Machinery Reg 2023/1230
# CRA Annex I evidence_type -> guideline bucket (Code / Prozess / Dokumentation).
_GUIDELINE_BUCKET = {"code": "code", "hybrid": "code", "process": "process", "document": "document"}
_PATH_HINT = {
"CRITICAL": "Konformitaet ueber benannte Stelle / EUCC (Modul H/C)",
"IMPORTANT_II": "Modul B+C oder harmonisierte Norm",
"IMPORTANT_I": "Self-Assessment bei harmonisierten Normen, sonst Modul B",
"STANDARD": "Self-Assessment (Modul A)",
"NOT_IN_SCOPE": "",
}
@router.post("/readiness")
async def readiness(body: ReadinessRequest):
"""Low-friction CRA readiness check: business-scope answers -> Annex III/IV
classification + a high-level guideline grouped Code / Prozess / Dokumentation.
Reuses the deterministic classifier + Annex I spine. No project, no DB."""
intake = {
"intended_use": body.intended_use,
"connected_to_internet": bool(body.connected_to_internet or body.remote_maintenance or body.user_parameter_app),
"has_software_updates": bool(body.has_software_updates or body.remote_maintenance or body.user_parameter_app),
"processes_personal_data": bool(body.processes_personal_data),
"is_critical_infra_supplier": bool(body.is_critical_infra_supplier),
}
classification, rationale = _classify(intake)
in_scope = classification != "NOT_IN_SCOPE"
groups = {"code": [], "process": [], "document": []}
regulations = []
if in_scope:
regulations.append("CRA")
for req in ANNEX_I_REQUIREMENTS:
bucket = _GUIDELINE_BUCKET.get(req.get("evidence_type", "process"), "process")
groups[bucket].append({
"req_id": req["req_id"], "title": req["title"], "category": req["category"],
"annex_anchor": req["annex_anchor"], "severity": req["severity"],
"effort_days": req.get("effort_days"),
"measures": [{"id": m, "name": MEASURES.get(m, m)} for m in req.get("mapped_measures", [])],
"source": "CRA",
})
# Machine/plant builders are ALSO hit by the new Machinery Regulation's
# cyber-with-safety essential requirements (Annex III) — show the combination.
if body.is_machinery:
regulations.append("Maschinen-VO 2023/1230")
for req in MACHINERY_REG_CYBER:
bucket = _GUIDELINE_BUCKET.get(req.get("evidence_type", "process"), "process")
groups[bucket].append({
"req_id": req["req_id"], "title": req["title"], "category": req["category"],
"annex_anchor": req["annex_anchor"], "severity": req["severity"],
"effort_days": None, "measures": [], "source": "Maschinen-VO",
})
total_effort = sum(r["effort_days"] for g in groups.values() for r in g if r.get("effort_days"))
return {
"in_scope": in_scope,
"classification": classification,
"rationale": rationale,
"conformity_path_hint": _PATH_HINT.get(classification, ""),
"regulations": regulations,
"guideline": groups,
"counts": {k: len(v) for k, v in groups.items()},
"total_effort_days": total_effort,
"deadlines": list(DEADLINES),
}