"""Standalone CRA cyber risk-assessment endpoint. POST /api/v1/cra/assess — takes the findings the external repo-scanner already produced and returns the deterministic CRA assessment: each finding mapped to the CRA Annex I requirement(s) it violates, a risk level, the curated CRA measures, and the NIST 800-53 / OWASP Top 10 golden-set crosswalk. Project-less by design: works standalone for ANY customer — including those with no CE risk assessment and no FMEA yet (the mandatory baseline). Reuses the fully tested mapper; no DB, no LLM, no RAG. Same logic the MCP server exposes. """ from typing import Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy import text from starlette.concurrency import run_in_threadpool from compliance.services.cra_finding_mapper import assess_findings_payload from compliance.services.cra_applicability import ( compute_verdict, compute_machinery_verdict, maturity as evidence_maturity, MACHINE_INTEGRATOR, ) from compliance.services.cra_datasheet_extractor import extract_grenzen from compliance.services.scanner_mcp_client import fetch_findings from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot from compliance.services.cra_use_case_controls import enrich_findings_with_breadth from compliance.services.cra_component_findings import findings_from_components from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS, MEASURES, DEADLINES from compliance.api.cra_routes import _classify # reuse the deterministic Annex III/IV classifier from compliance.services.use_case_controls import UseCaseControlsService from database import SessionLocal from .tenant_utils import get_tenant_id router = APIRouter(prefix="/v1/cra", tags=["cra"]) class FindingIn(BaseModel): id: str title: Optional[str] = "" description: Optional[str] = "" category: Optional[str] = "" cwe: Optional[str] = "" severity: Optional[str] = "" cvss: Optional[float] = None location: Optional[str] = "" safety_impact: Optional[bool] = False exploited: Optional[bool] = False class SafetyFunctionIn(BaseModel): name: str hazard: Optional[str] = "" original_measure: Optional[str] = "" kind: Optional[str] = "" # prevent_unexpected_actuation | signal_integrity vulnerable_to: Optional[List[str]] = None class ComponentIn(BaseModel): name: str component_class: Optional[str] = "" # controller | hmi | gateway | drive | remote_access | sensor networked: Optional[bool] = False vendor: Optional[str] = "" product: Optional[str] = "" class AssessRequest(BaseModel): findings: List[FindingIn] = [] # customer priorities for the discretionary tier: {objective: high|medium|low}. # objectives: access | data | network_api | supply_updates | monitoring. weights: Optional[Dict[str, str]] = None # CE-risk-assessment safety functions for the cyber-meets-safety bridge. safety_functions: Optional[List[SafetyFunctionIn]] = None # hardware path: networked components -> derived cyber findings (no repo). components: Optional[List[ComponentIn]] = None def _payload(body: AssessRequest) -> dict: findings = [f.model_dump() for f in body.findings] if body.components: findings = findings + findings_from_components([c.model_dump() for c in body.components]) return { "findings": findings, "weights": body.weights, "safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None, } def _assess_enriched(body: AssessRequest) -> dict: """Assessment + the network_security regulatory breadth (atom-grain). Breadth is attached at this view layer (db here), never in the pure mapper. """ result = assess_findings_payload(_payload(body)) db = SessionLocal() try: enrich_findings_with_breadth(result.get("mapped", []), db) finally: db.close() return result @router.post("/assess") async def assess(body: AssessRequest): return _assess_enriched(body) class ScannerPullRequest(BaseModel): repo_id: Optional[str] = None severity: Optional[str] = None scanner_url: Optional[str] = None # override SCANNER_MCP_URL token: Optional[str] = None # override SCANNER_MCP_TOKEN weights: Optional[Dict[str, str]] = None safety_functions: Optional[List[SafetyFunctionIn]] = None @router.post("/assess-from-scanner") async def assess_from_scanner(body: ScannerPullRequest): """Pull-flow: fetch findings from the scanner's MCP, then assess. Raw scanner finding dicts go straight to the tolerant mapper (keeps scan_type/cvss_score/file_path). Returns empty assessment if no scanner is configured — the frontend then keeps its demo scenario. """ findings = await fetch_findings( repo_id=body.repo_id, severity=body.severity, base_url=body.scanner_url, token=body.token, ) payload = { "findings": findings, "weights": body.weights, "safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None, } result = assess_findings_payload(payload) db = SessionLocal() try: enrich_findings_with_breadth(result.get("mapped", []), db) finally: db.close() result["source"] = {"scanner": True, "pulled": len(findings)} return result class DatasheetRequest(BaseModel): text: str = "" @router.post("/extract-datasheet") async def extract_datasheet(body: DatasheetRequest): """Datasheet text -> IACE 'Grenzen' draft (limits + provenance) + the essential ISO-12100 fields still missing as targeted follow-up questions. Hybrid: deterministic interface/unit detector + local 35B (llm_cascade).""" return await extract_grenzen(body.text) @router.get("/scanner-repos") async def scanner_repos(): """Distinct repo_ids the scanner has findings for, so the UI can pick which repo to assess. Best-effort (one findings page); empty if no scanner config.""" findings = await fetch_findings(limit=200) counts: Dict[str, int] = {} for f in findings: rid = f.get("repo_id") if rid: counts[rid] = counts.get(rid, 0) + 1 repos = sorted( ({"repo_id": k, "count": v} for k, v in counts.items()), key=lambda r: -r["count"], ) return {"repos": repos, "sampled": len(findings) >= 200} @router.post("/projects/{project_id}/assess-snapshot") async def assess_snapshot(project_id: str, body: AssessRequest, tenant_id: str = Depends(get_tenant_id)): """Run the assessment and persist it as a versioned snapshot (running system).""" assessment = _assess_enriched(body) snap = save_snapshot(project_id, tenant_id, assessment) return {"snapshot": snap, "assessment": assessment} @router.get("/projects/{project_id}/assess-snapshots") async def list_assess_snapshots(project_id: str, tenant_id: str = Depends(get_tenant_id)): return {"snapshots": list_snapshots(project_id, tenant_id)} @router.get("/assess-snapshots/{snapshot_id}") async def get_assess_snapshot(snapshot_id: str, tenant_id: str = Depends(get_tenant_id)): snap = get_snapshot(snapshot_id, tenant_id) if not snap: raise HTTPException(status_code=404, detail="Snapshot not found") return snap # --- Lead-magnet readiness check (stateless, no project, no DB) --- class ReadinessRequest(BaseModel): intended_use: Optional[str] = "" connected_to_internet: Optional[bool] = False has_software_updates: Optional[bool] = False processes_personal_data: Optional[bool] = False is_critical_infra_supplier: Optional[bool] = False has_firmware: Optional[bool] = False remote_maintenance: Optional[bool] = False # implies connectivity + updates user_parameter_app: Optional[bool] = False # implies connectivity + updates is_machinery: Optional[bool] = False # CE machinery -> also Machinery Reg 2023/1230 # Eingangstür / verdict layer (all optional, additive) producer_type: Optional[str] = "" # component|end_device|machine_integrator|software_app placed_on_market_after_2027: Optional[bool] = None # None = unknown -> assumed yes (conservative) customers_request_cra_evidence: Optional[bool] = False provided_evidence: Optional[List[str]] = None # evidence keys already in place (sbom, vdp, …) digital_elements: Optional[List[str]] = None # detected/declared digital elements # Machinery-Regulation person-safety axis safety_relevant: Optional[bool] = False # function can endanger persons on fault/manipulation hazard_types: Optional[List[str]] = None # movement_crush, laser_radiation, force_energy, … is_safety_component: Optional[bool] = False # marketed as a safety device (Sicherheitsbauteil) # CRA Annex I evidence_type -> guideline bucket (Code / Prozess / Dokumentation). _GUIDELINE_BUCKET = {"code": "code", "hybrid": "code", "process": "process", "document": "document"} _PATH_HINT = { "CRITICAL": "Benannte Stelle (Modul B+C/H) oder EUCC — keine Selbstbewertung", "IMPORTANT_II": "Benannte Stelle (Modul B+C/H) oder EUCC — keine Selbstbewertung", "IMPORTANT_I": "Selbstbewertung nur mit harmonisierter Norm (noch nicht verfuegbar), sonst benannte Stelle/EUCC", "STANDARD": "Selbstbewertung (Modul A)", "NOT_IN_SCOPE": "—", } # Machinery Regulation 2023/1230 cyber-with-safety obligations come from the shared # Controls-API (use_case=maschinen, atom-grain, license-clean) — NOT hardcoded. # Cyber-relevant sub-topics -> guideline bucket. _MACHINERY_SUBTOPICS = [ ("sicherheitsanforderungen", "code"), ("risikomanagement", "process"), ("konformitaetsbewertung", "document"), ] def _machinery_obligations(limit_per: int = 4) -> list: """(bucket, guideline_item) tuples from use_case=maschinen. Best-effort.""" out = [] db = SessionLocal() try: # Bound the query: on a slow/unindexed prod DB this used to hang ~30s and # block the worker. Cap at 4s → on timeout the queries raise, we degrade # to "no machinery obligations" (best-effort enrichment, not core). db.execute(text("SET statement_timeout = '4000'")) svc = UseCaseControlsService(db) for sub_topic, bucket in _MACHINERY_SUBTOPICS: try: res = svc.controls_for_use_case("maschinen", sub_topic=sub_topic, limit=limit_per) except Exception: continue for c in res.get("controls", []): out.append((bucket, { "req_id": c.get("control_id"), "title": c.get("title"), "category": sub_topic, "annex_anchor": c.get("source_regulation", "Maschinenverordnung (EU) 2023/1230"), "severity": (c.get("severity") or "").upper(), "effort_days": None, "measures": [], "source": "Maschinen-VO", })) finally: db.close() return out @router.post("/readiness") async def readiness(body: ReadinessRequest): """Low-friction CRA readiness check: business-scope answers -> Annex III/IV classification + a high-level guideline grouped Code / Prozess / Dokumentation. Reuses the deterministic classifier + Annex I spine. No project, no DB.""" machine_integrator = body.producer_type == MACHINE_INTEGRATOR has_digital = bool(body.digital_elements) # Machine/plant builders: connectivity, remote maintenance and OTA are the norm. # Declared digital elements (e.g. from a datasheet upload) imply digital elements too. intake = { "intended_use": body.intended_use, "connected_to_internet": bool(body.connected_to_internet or body.remote_maintenance or body.user_parameter_app or machine_integrator or has_digital), "has_software_updates": bool(body.has_software_updates or body.remote_maintenance or body.user_parameter_app or machine_integrator or has_digital), "processes_personal_data": bool(body.processes_personal_data), "is_critical_infra_supplier": bool(body.is_critical_infra_supplier), } classification, rationale = _classify(intake) in_scope = classification != "NOT_IN_SCOPE" groups = {"code": [], "process": [], "document": []} machinery_guideline = [] regulations = [] if in_scope: regulations.append("CRA") for req in ANNEX_I_REQUIREMENTS: bucket = _GUIDELINE_BUCKET.get(req.get("evidence_type", "process"), "process") groups[bucket].append({ "req_id": req["req_id"], "title": req["title"], "category": req["category"], "annex_anchor": req["annex_anchor"], "severity": req["severity"], "effort_days": req.get("effort_days"), "measures": [{"id": m, "name": MEASURES.get(m, m)} for m in req.get("mapped_measures", [])], "source": "CRA", }) # Machinery-Regulation safety obligations are NOT CRA Annex-I cyber controls # — keep them in their OWN section, not mixed into the Code/Process/Document # cyber buckets (machine safety != cybersecurity). if body.is_machinery or machine_integrator: machinery = await run_in_threadpool(_machinery_obligations) if machinery: regulations.append("Maschinen-VO 2023/1230") machinery_guideline = [item for _bucket, item in machinery] total_effort = sum(r["effort_days"] for g in groups.values() for r in g if r.get("effort_days")) verdict = compute_verdict( classification, body.placed_on_market_after_2027, body.producer_type or "", bool(body.customers_request_cra_evidence), ) return { "in_scope": in_scope, "classification": classification, "rationale": rationale, "conformity_path_hint": _PATH_HINT.get(classification, ""), "regulations": regulations, "guideline": groups, "machinery_guideline": machinery_guideline, "counts": {k: len(v) for k, v in groups.items()}, "total_effort_days": total_effort, "deadlines": list(DEADLINES), # Eingangstür verdict layer "verdict": verdict, "machinery_verdict": compute_machinery_verdict( body.producer_type or "", bool(body.is_machinery), bool(body.safety_relevant), body.hazard_types, bool(body.is_safety_component), ), "maturity": evidence_maturity(body.provided_evidence), "digital_elements": body.digital_elements or [], "producer_type": body.producer_type or "", }