"""Standalone CRA cyber risk-assessment endpoint. POST /api/v1/cra/assess — takes the findings the external repo-scanner already produced and returns the deterministic CRA assessment: each finding mapped to the CRA Annex I requirement(s) it violates, a risk level, the curated CRA measures, and the NIST 800-53 / OWASP Top 10 golden-set crosswalk. Project-less by design: works standalone for ANY customer — including those with no CE risk assessment and no FMEA yet (the mandatory baseline). Reuses the fully tested mapper; no DB, no LLM, no RAG. Same logic the MCP server exposes. """ from typing import Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from compliance.services.cra_finding_mapper import assess_findings_payload from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot from compliance.services.cra_use_case_controls import enrich_findings_with_breadth from database import SessionLocal from .tenant_utils import get_tenant_id router = APIRouter(prefix="/v1/cra", tags=["cra"]) class FindingIn(BaseModel): id: str title: Optional[str] = "" description: Optional[str] = "" category: Optional[str] = "" cwe: Optional[str] = "" severity: Optional[str] = "" cvss: Optional[float] = None location: Optional[str] = "" safety_impact: Optional[bool] = False exploited: Optional[bool] = False class SafetyFunctionIn(BaseModel): name: str hazard: Optional[str] = "" original_measure: Optional[str] = "" kind: Optional[str] = "" # prevent_unexpected_actuation | signal_integrity vulnerable_to: Optional[List[str]] = None class AssessRequest(BaseModel): findings: List[FindingIn] # customer priorities for the discretionary tier: {objective: high|medium|low}. # objectives: access | data | network_api | supply_updates | monitoring. weights: Optional[Dict[str, str]] = None # CE-risk-assessment safety functions for the cyber-meets-safety bridge. safety_functions: Optional[List[SafetyFunctionIn]] = None def _payload(body: AssessRequest) -> dict: return { "findings": [f.model_dump() for f in body.findings], "weights": body.weights, "safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None, } def _assess_enriched(body: AssessRequest) -> dict: """Assessment + the network_security regulatory breadth (atom-grain). Breadth is attached at this view layer (db here), never in the pure mapper. """ result = assess_findings_payload(_payload(body)) db = SessionLocal() try: enrich_findings_with_breadth(result.get("mapped", []), db) finally: db.close() return result @router.post("/assess") async def assess(body: AssessRequest): return _assess_enriched(body) @router.post("/projects/{project_id}/assess-snapshot") async def assess_snapshot(project_id: str, body: AssessRequest, tenant_id: str = Depends(get_tenant_id)): """Run the assessment and persist it as a versioned snapshot (running system).""" assessment = _assess_enriched(body) snap = save_snapshot(project_id, tenant_id, assessment) return {"snapshot": snap, "assessment": assessment} @router.get("/projects/{project_id}/assess-snapshots") async def list_assess_snapshots(project_id: str, tenant_id: str = Depends(get_tenant_id)): return {"snapshots": list_snapshots(project_id, tenant_id)} @router.get("/assess-snapshots/{snapshot_id}") async def get_assess_snapshot(snapshot_id: str, tenant_id: str = Depends(get_tenant_id)): snap = get_snapshot(snapshot_id, tenant_id) if not snap: raise HTTPException(status_code=404, detail="Snapshot not found") return snap