From cf917ab73351918cd218e655476aba7775f3f554 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 14 Jun 2026 09:27:09 +0200 Subject: [PATCH] =?UTF-8?q?feat(cra):=20versioned=20assessment=20snapshots?= =?UTF-8?q?=20=E2=80=94=20CRA=20Art.=2013=20running=20system=20(step=203)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persist each CRA assessment as a versioned, auditable snapshot over the product lifecycle. Reuses the existing compliance_cra_documents table (NO new schema, frozen DB respected): doc_type='doc_risk_assessment', full assessment in generation_context, requirements_coverage summary, auto-incrementing version, prior version superseded. New endpoints: POST /projects/{id}/assess-snapshot, GET /projects/{id}/assess-snapshots (history), GET /assess-snapshots/{id}. Additive (no contract baseline change). Co-Authored-By: Claude Opus 4.7 --- .../compliance/api/cra_assess_routes.py | 36 +++++- .../compliance/services/cra_snapshot_store.py | 104 ++++++++++++++++++ backend-compliance/tests/test_cra_snapshot.py | 25 +++++ 3 files changed, 160 insertions(+), 5 deletions(-) create mode 100644 backend-compliance/compliance/services/cra_snapshot_store.py create mode 100644 backend-compliance/tests/test_cra_snapshot.py diff --git a/backend-compliance/compliance/api/cra_assess_routes.py b/backend-compliance/compliance/api/cra_assess_routes.py index 0d14f17f..1b2a8ff8 100644 --- a/backend-compliance/compliance/api/cra_assess_routes.py +++ b/backend-compliance/compliance/api/cra_assess_routes.py @@ -11,10 +11,12 @@ tested mapper; no DB, no LLM, no RAG. Same logic the MCP server exposes. """ from typing import Dict, List, Optional -from fastapi import APIRouter +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from compliance.services.cra_finding_mapper import assess_findings_payload +from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot +from .tenant_utils import get_tenant_id router = APIRouter(prefix="/v1/cra", tags=["cra"]) @@ -49,11 +51,35 @@ class AssessRequest(BaseModel): safety_functions: Optional[List[SafetyFunctionIn]] = None -@router.post("/assess") -async def assess(body: AssessRequest): - payload = { +def _payload(body: AssessRequest) -> dict: + return { "findings": [f.model_dump() for f in body.findings], "weights": body.weights, "safety_functions": [s.model_dump() for s in body.safety_functions] if body.safety_functions else None, } - return assess_findings_payload(payload) + + +@router.post("/assess") +async def assess(body: AssessRequest): + return assess_findings_payload(_payload(body)) + + +@router.post("/projects/{project_id}/assess-snapshot") +async def assess_snapshot(project_id: str, body: AssessRequest, tenant_id: str = Depends(get_tenant_id)): + """Run the assessment and persist it as a versioned snapshot (running system).""" + assessment = assess_findings_payload(_payload(body)) + snap = save_snapshot(project_id, tenant_id, assessment) + return {"snapshot": snap, "assessment": assessment} + + +@router.get("/projects/{project_id}/assess-snapshots") +async def list_assess_snapshots(project_id: str, tenant_id: str = Depends(get_tenant_id)): + return {"snapshots": list_snapshots(project_id, tenant_id)} + + +@router.get("/assess-snapshots/{snapshot_id}") +async def get_assess_snapshot(snapshot_id: str, tenant_id: str = Depends(get_tenant_id)): + snap = get_snapshot(snapshot_id, tenant_id) + if not snap: + raise HTTPException(status_code=404, detail="Snapshot not found") + return snap diff --git a/backend-compliance/compliance/services/cra_snapshot_store.py b/backend-compliance/compliance/services/cra_snapshot_store.py new file mode 100644 index 00000000..86ff19a5 --- /dev/null +++ b/backend-compliance/compliance/services/cra_snapshot_store.py @@ -0,0 +1,104 @@ +"""Persist CRA risk-assessment snapshots — the CRA Art. 13 'running system'. + +Reuses the existing compliance_cra_documents table (NO new schema, frozen DB): +a snapshot is a document of doc_type='doc_risk_assessment', versioned, with the +full assessment stored in generation_context. Each new snapshot supersedes the +prior one, giving an auditable history over the product lifecycle. +""" +import json + +from sqlalchemy import text + +from database import SessionLocal + +DOC_TYPE = "doc_risk_assessment" + + +def assessment_markdown(assessment: dict) -> str: + by = assessment.get("by_risk", {}) + lines = [ + "# CRA-Cyber-Risikobeurteilung", + "", + "- Befunde gesamt: {}".format(assessment.get("findings_total", 0)), + "- Abdeckung: {} %".format(assessment.get("coverage_pct", 0)), + "- Risiko: kritisch {} · hoch {} · mittel {} · niedrig {}".format( + by.get("CRITICAL", 0), by.get("HIGH", 0), by.get("MEDIUM", 0), by.get("LOW", 0)), + "- CRA-Anforderungen betroffen: {} / 40".format(len(assessment.get("requirements_touched", []))), + "", + "## Priorisierte Befunde", + ] + for m in assessment.get("mapped", []): + lines.append("- [{}] {} → {} — {}".format( + m.get("priority_tier", ""), m.get("finding_id", ""), + m.get("primary_requirement", ""), m.get("priority_reason", ""))) + if assessment.get("cross_links"): + lines += ["", "## Cyber trifft Safety"] + for cl in assessment["cross_links"]: + lines.append("- {} ← {}".format(cl.get("safety_ref", ""), ", ".join(cl.get("cyber_finding_ids", [])))) + return "\n".join(lines) + + +def save_snapshot(cra_project_id: str, tenant_id: str, assessment: dict, + title: str = "CRA-Cyber-Risikobeurteilung") -> dict: + db = SessionLocal() + try: + db.execute(text(""" + UPDATE compliance_cra_documents SET status='superseded', superseded_at=NOW() + WHERE cra_project_id = CAST(:pid AS uuid) AND tenant_id = :tid + AND doc_type = :dt AND status != 'superseded' + """), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE}) + ver = db.execute(text(""" + SELECT COALESCE(MAX(version), 0) + 1 FROM compliance_cra_documents + WHERE cra_project_id = CAST(:pid AS uuid) AND tenant_id = :tid AND doc_type = :dt + """), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE}).scalar() + coverage = {"covered": assessment.get("requirements_touched", []), "total": 40} + row = db.execute(text(""" + INSERT INTO compliance_cra_documents + (cra_project_id, tenant_id, doc_type, title, content_md, version, + requirements_coverage, generation_context, status) + VALUES (CAST(:pid AS uuid), :tid, :dt, :title, :md, :ver, + CAST(:cov AS jsonb), CAST(:ctx AS jsonb), 'draft') + RETURNING id, version, generated_at + """), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE, "title": title, + "md": assessment_markdown(assessment), "ver": ver, + "cov": json.dumps(coverage), "ctx": json.dumps(assessment)}).fetchone() + db.commit() + return {"id": str(row.id), "version": row.version, "generated_at": row.generated_at.isoformat()} + except Exception: + db.rollback() + raise + finally: + db.close() + + +def list_snapshots(cra_project_id: str, tenant_id: str) -> list: + db = SessionLocal() + try: + rows = db.execute(text(""" + SELECT id, version, status, generated_at, requirements_coverage + FROM compliance_cra_documents + WHERE cra_project_id = CAST(:pid AS uuid) AND tenant_id = :tid AND doc_type = :dt + ORDER BY version DESC + """), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE}).fetchall() + return [{"id": str(r.id), "version": r.version, "status": r.status, + "generated_at": r.generated_at.isoformat(), + "coverage": r.requirements_coverage} for r in rows] + finally: + db.close() + + +def get_snapshot(snapshot_id: str, tenant_id: str): + db = SessionLocal() + try: + r = db.execute(text(""" + SELECT id, version, status, generated_at, content_md, generation_context + FROM compliance_cra_documents + WHERE id = CAST(:sid AS uuid) AND tenant_id = :tid AND doc_type = :dt + """), {"sid": snapshot_id, "tid": tenant_id, "dt": DOC_TYPE}).fetchone() + if not r: + return None + return {"id": str(r.id), "version": r.version, "status": r.status, + "generated_at": r.generated_at.isoformat(), "content_md": r.content_md, + "assessment": r.generation_context} + finally: + db.close() diff --git a/backend-compliance/tests/test_cra_snapshot.py b/backend-compliance/tests/test_cra_snapshot.py new file mode 100644 index 00000000..872f03cc --- /dev/null +++ b/backend-compliance/tests/test_cra_snapshot.py @@ -0,0 +1,25 @@ +"""Tests for the CRA snapshot Markdown builder (pure; DB path verified live).""" +from compliance.services.cra_snapshot_store import assessment_markdown + + +def test_markdown_contains_summary_findings_and_cross_links(): + a = { + "findings_total": 2, "coverage_pct": 100.0, + "by_risk": {"CRITICAL": 1, "HIGH": 1, "MEDIUM": 0, "LOW": 0}, + "requirements_touched": ["CRA-AI-8", "CRA-AI-15"], + "mapped": [{"finding_id": "x", "priority_tier": "P0", + "primary_requirement": "CRA-AI-8", "priority_reason": "P0 — kritisch"}], + "cross_links": [{"safety_ref": "Zweihandschaltung", "cyber_finding_ids": ["x"]}], + } + md = assessment_markdown(a) + assert "CRA-Cyber-Risikobeurteilung" in md + assert "Befunde gesamt: 2" in md + assert "2 / 40" in md + assert "[P0] x → CRA-AI-8" in md + assert "Cyber trifft Safety" in md + assert "Zweihandschaltung" in md + + +def test_markdown_handles_empty_assessment(): + md = assessment_markdown({}) + assert "Befunde gesamt: 0" in md