feat(cra): standalone POST /api/v1/cra/assess endpoint
Live HTTP entry for the deterministic CRA assessment — repo-scanner findings in, CRA Annex I mapping + risk + curated measures + NIST/OWASP golden-set crosswalk out. Project-less (works for any customer, no CE-RA/FMEA required); reuses the tested mapper, same logic the MCP server exposes. Additive endpoint (no contract baseline change); no DB. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,40 @@
|
||||
"""Standalone CRA cyber risk-assessment endpoint.
|
||||
|
||||
POST /api/v1/cra/assess — takes the findings the external repo-scanner already
|
||||
produced and returns the deterministic CRA assessment: each finding mapped to
|
||||
the CRA Annex I requirement(s) it violates, a risk level, the curated CRA
|
||||
measures, and the NIST 800-53 / OWASP Top 10 golden-set crosswalk.
|
||||
|
||||
Project-less by design: works standalone for ANY customer — including those with
|
||||
no CE risk assessment and no FMEA yet (the mandatory baseline). Reuses the fully
|
||||
tested mapper; no DB, no LLM, no RAG. Same logic the MCP server exposes.
|
||||
"""
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import APIRouter
|
||||
from pydantic import BaseModel
|
||||
|
||||
from compliance.services.cra_finding_mapper import assess_findings_payload
|
||||
|
||||
router = APIRouter(prefix="/v1/cra", tags=["cra"])
|
||||
|
||||
|
||||
class FindingIn(BaseModel):
|
||||
id: str
|
||||
title: Optional[str] = ""
|
||||
description: Optional[str] = ""
|
||||
category: Optional[str] = ""
|
||||
cwe: Optional[str] = ""
|
||||
severity: Optional[str] = ""
|
||||
cvss: Optional[float] = None
|
||||
location: Optional[str] = ""
|
||||
|
||||
|
||||
class AssessRequest(BaseModel):
|
||||
findings: List[FindingIn]
|
||||
|
||||
|
||||
@router.post("/assess")
|
||||
async def assess(body: AssessRequest):
|
||||
payload = {"findings": [f.model_dump() for f in body.findings]}
|
||||
return assess_findings_payload(payload)
|
||||
@@ -56,6 +56,7 @@ from compliance.api.saving_scan_routes import router as saving_scan_router
|
||||
from compliance.api.agent_migration_routes import router as agent_migration_router
|
||||
from compliance.api.vendor_assessment_routes import router as vendor_assessment_router
|
||||
from compliance.api.cra_routes import router as cra_router
|
||||
from compliance.api.cra_assess_routes import router as cra_assess_router
|
||||
from compliance.api.quaidal_routes import router as quaidal_router
|
||||
|
||||
# Middleware
|
||||
@@ -171,6 +172,7 @@ app.include_router(vendor_assessment_router, prefix="/api")
|
||||
|
||||
# CRA (Cyber Resilience Act) Compliance
|
||||
app.include_router(cra_router, prefix="/api")
|
||||
app.include_router(cra_assess_router, prefix="/api")
|
||||
app.include_router(quaidal_router, prefix="/api")
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
"""Contract test for the standalone CRA assess endpoint.
|
||||
|
||||
Mounts only the cra_assess router on a minimal app — no full app/DB startup.
|
||||
"""
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from compliance.api.cra_assess_routes import router
|
||||
|
||||
app = FastAPI()
|
||||
app.include_router(router, prefix="/api")
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def test_assess_maps_findings_with_crosswalk():
|
||||
r = client.post("/api/v1/cra/assess", json={"findings": [
|
||||
{"id": "x", "title": "default password", "cwe": "CWE-259", "severity": "critical"},
|
||||
{"id": "y", "category": "dependency", "title": "outdated lib", "severity": "high"},
|
||||
]})
|
||||
assert r.status_code == 200
|
||||
d = r.json()
|
||||
assert d["findings_total"] == 2
|
||||
by_id = {m["finding_id"]: m for m in d["mapped"]}
|
||||
assert by_id["x"]["primary_requirement"] == "CRA-AI-8"
|
||||
assert by_id["x"]["risk_level"] == "CRITICAL"
|
||||
assert "IA-5" in by_id["x"]["nist_refs"]
|
||||
assert by_id["y"]["primary_requirement"] == "CRA-AI-22"
|
||||
assert any(o["code"] == "A06:2021" for o in by_id["y"]["owasp_refs"])
|
||||
|
||||
|
||||
def test_assess_empty_is_ok():
|
||||
r = client.post("/api/v1/cra/assess", json={"findings": []})
|
||||
assert r.status_code == 200
|
||||
assert r.json()["findings_total"] == 0
|
||||
|
||||
|
||||
def test_assess_requires_finding_id():
|
||||
# id is required by the schema -> 422
|
||||
r = client.post("/api/v1/cra/assess", json={"findings": [{"title": "no id"}]})
|
||||
assert r.status_code == 422
|
||||
Reference in New Issue
Block a user