"""Standalone CRA cyber risk-assessment endpoint. POST /api/v1/cra/assess — takes the findings the external repo-scanner already produced and returns the deterministic CRA assessment: each finding mapped to the CRA Annex I requirement(s) it violates, a risk level, the curated CRA measures, and the NIST 800-53 / OWASP Top 10 golden-set crosswalk. Project-less by design: works standalone for ANY customer — including those with no CE risk assessment and no FMEA yet (the mandatory baseline). Reuses the fully tested mapper; no DB, no LLM, no RAG. Same logic the MCP server exposes. """ from typing import Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from compliance.services.cra_finding_mapper import assess_findings_payload from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot from compliance.services.cra_use_case_controls import enrich_findings_with_breadth from compliance.services.cra_component_findings import findings_from_components from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES from compliance.api.cra_routes import _classify # reuse the deterministic Annex III/IV classifier from compliance.services.use_case_controls import UseCaseControlsService from database import SessionLocal from .tenant_utils import get_tenant_id router = APIRouter(prefix="/v1/cra", tags=["cra"]) class FindingIn(BaseModel): id: str title: Optional[str] = "" description: Optional[str] = "" category: Optional[str] = "" cwe: Optional[str] = "" severity: Optional[str] = "" cvss: Optional[float] = None location: Optional[str] = "" safety_impact: Optional[bool] = False exploited: Optional[bool] = False class SafetyFunctionIn(BaseModel): name: str hazard: Optional[str] = "" original_measure: Optional[str] = "" kind: Optional[str] = "" # prevent_unexpected_actuation | signal_integrity vulnerable_to: Optional[List[str]] = None class ComponentIn(BaseModel): name: str component_class: Optional[str] = "" # controller | hmi | gateway | drive | remote_access | sensor networked: Optional[bool] = False vendor: Optional[str] = "" product: Optional[str] = "" class AssessRequest(BaseModel): findings: List[FindingIn] = [] # customer priorities for the discretionary tier: {objective: high|medium|low}. # objectives: access | data | network_api | supply_updates | monitoring. weights: Optional[Dict[str, str]] = None # CE-risk-assessment safety functions for the cyber-meets-safety bridge. safety_functions: Optional[List[SafetyFunctionIn]] = None # hardware path: networked components -> derived cyber findings (no repo). components: Optional[List[ComponentIn]] = None def _payload(body: AssessRequest) -> dict: findings = [f.model_dump() for f in body.findings] if body.components: findings = findings + findings_from_components([c.model_dump() for c in body.components]) return { "findings": findings, "weights": body.weights, "safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None, } def _assess_enriched(body: AssessRequest) -> dict: """Assessment + the network_security regulatory breadth (atom-grain). Breadth is attached at this view layer (db here), never in the pure mapper. """ result = assess_findings_payload(_payload(body)) db = SessionLocal() try: enrich_findings_with_breadth(result.get("mapped", []), db) finally: db.close() return result @router.post("/assess") async def assess(body: AssessRequest): return _assess_enriched(body) @router.post("/projects/{project_id}/assess-snapshot") async def assess_snapshot(project_id: str, body: AssessRequest, tenant_id: str = Depends(get_tenant_id)): """Run the assessment and persist it as a versioned snapshot (running system).""" assessment = _assess_enriched(body) snap = save_snapshot(project_id, tenant_id, assessment) return {"snapshot": snap, "assessment": assessment} @router.get("/projects/{project_id}/assess-snapshots") async def list_assess_snapshots(project_id: str, tenant_id: str = Depends(get_tenant_id)): return {"snapshots": list_snapshots(project_id, tenant_id)} @router.get("/assess-snapshots/{snapshot_id}") async def get_assess_snapshot(snapshot_id: str, tenant_id: str = Depends(get_tenant_id)): snap = get_snapshot(snapshot_id, tenant_id) if not snap: raise HTTPException(status_code=404, detail="Snapshot not found") return snap # --- Lead-magnet readiness check (stateless, no project, no DB) --- class ReadinessRequest(BaseModel): intended_use: Optional[str] = "" connected_to_internet: Optional[bool] = False has_software_updates: Optional[bool] = False processes_personal_data: Optional[bool] = False is_critical_infra_supplier: Optional[bool] = False has_firmware: Optional[bool] = False remote_maintenance: Optional[bool] = False # implies connectivity + updates user_parameter_app: Optional[bool] = False # implies connectivity + updates is_machinery: Optional[bool] = False # CE machinery -> also Machinery Reg 2023/1230 # CRA Annex I evidence_type -> guideline bucket (Code / Prozess / Dokumentation). _GUIDELINE_BUCKET = {"code": "code", "hybrid": "code", "process": "process", "document": "document"} _PATH_HINT = { "CRITICAL": "Konformitaet ueber benannte Stelle / EUCC (Modul H/C)", "IMPORTANT_II": "Modul B+C oder harmonisierte Norm", "IMPORTANT_I": "Self-Assessment bei harmonisierten Normen, sonst Modul B", "STANDARD": "Self-Assessment (Modul A)", "NOT_IN_SCOPE": "—", } # Machinery Regulation 2023/1230 cyber-with-safety obligations come from the shared # Controls-API (use_case=maschinen, atom-grain, license-clean) — NOT hardcoded. # Cyber-relevant sub-topics -> guideline bucket. _MACHINERY_SUBTOPICS = [ ("sicherheitsanforderungen", "code"), ("risikomanagement", "process"), ("konformitaetsbewertung", "document"), ] def _machinery_obligations(limit_per: int = 4) -> list: """(bucket, guideline_item) tuples from use_case=maschinen. Best-effort.""" out = [] db = SessionLocal() try: svc = UseCaseControlsService(db) for sub_topic, bucket in _MACHINERY_SUBTOPICS: try: res = svc.controls_for_use_case("maschinen", sub_topic=sub_topic, limit=limit_per) except Exception: continue for c in res.get("controls", []): out.append((bucket, { "req_id": c.get("control_id"), "title": c.get("title"), "category": sub_topic, "annex_anchor": c.get("source_regulation", "Maschinenverordnung (EU) 2023/1230"), "severity": (c.get("severity") or "").upper(), "effort_days": None, "measures": [], "source": "Maschinen-VO", })) finally: db.close() return out @router.post("/readiness") async def readiness(body: ReadinessRequest): """Low-friction CRA readiness check: business-scope answers -> Annex III/IV classification + a high-level guideline grouped Code / Prozess / Dokumentation. Reuses the deterministic classifier + Annex I spine. No project, no DB.""" intake = { "intended_use": body.intended_use, "connected_to_internet": bool(body.connected_to_internet or body.remote_maintenance or body.user_parameter_app), "has_software_updates": bool(body.has_software_updates or body.remote_maintenance or body.user_parameter_app), "processes_personal_data": bool(body.processes_personal_data), "is_critical_infra_supplier": bool(body.is_critical_infra_supplier), } classification, rationale = _classify(intake) in_scope = classification != "NOT_IN_SCOPE" groups = {"code": [], "process": [], "document": []} regulations = [] if in_scope: regulations.append("CRA") for req in ANNEX_I_REQUIREMENTS: bucket = _GUIDELINE_BUCKET.get(req.get("evidence_type", "process"), "process") groups[bucket].append({ "req_id": req["req_id"], "title": req["title"], "category": req["category"], "annex_anchor": req["annex_anchor"], "severity": req["severity"], "effort_days": req.get("effort_days"), "measures": [{"id": m, "name": MEASURES.get(m, m)} for m in req.get("mapped_measures", [])], "source": "CRA", }) # Machine/plant builders are ALSO hit by the new Machinery Regulation's # cyber-with-safety essential requirements (Annex III) — show the combination. if body.is_machinery: machinery = _machinery_obligations() if machinery: regulations.append("Maschinen-VO 2023/1230") for bucket, item in machinery: groups[bucket].append(item) total_effort = sum(r["effort_days"] for g in groups.values() for r in g if r.get("effort_days")) return { "in_scope": in_scope, "classification": classification, "rationale": rationale, "conformity_path_hint": _PATH_HINT.get(classification, ""), "regulations": regulations, "guideline": groups, "counts": {k: len(v) for k, v in groups.items()}, "total_effort_days": total_effort, "deadlines": list(DEADLINES), }