Files
breakpilot-compliance/backend-compliance/compliance/services/cra_snapshot_store.py
T
Benjamin Admin cf917ab733 feat(cra): versioned assessment snapshots — CRA Art. 13 running system (step 3)
Persist each CRA assessment as a versioned, auditable snapshot over the product
lifecycle. Reuses the existing compliance_cra_documents table (NO new schema,
frozen DB respected): doc_type='doc_risk_assessment', full assessment in
generation_context, requirements_coverage summary, auto-incrementing version,
prior version superseded. New endpoints: POST /projects/{id}/assess-snapshot,
GET /projects/{id}/assess-snapshots (history), GET /assess-snapshots/{id}.
Additive (no contract baseline change).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-14 09:27:09 +02:00

105 lines
4.7 KiB
Python

"""Persist CRA risk-assessment snapshots — the CRA Art. 13 'running system'.
Reuses the existing compliance_cra_documents table (NO new schema, frozen DB):
a snapshot is a document of doc_type='doc_risk_assessment', versioned, with the
full assessment stored in generation_context. Each new snapshot supersedes the
prior one, giving an auditable history over the product lifecycle.
"""
import json
from sqlalchemy import text
from database import SessionLocal
DOC_TYPE = "doc_risk_assessment"
def assessment_markdown(assessment: dict) -> str:
by = assessment.get("by_risk", {})
lines = [
"# CRA-Cyber-Risikobeurteilung",
"",
"- Befunde gesamt: {}".format(assessment.get("findings_total", 0)),
"- Abdeckung: {} %".format(assessment.get("coverage_pct", 0)),
"- Risiko: kritisch {} · hoch {} · mittel {} · niedrig {}".format(
by.get("CRITICAL", 0), by.get("HIGH", 0), by.get("MEDIUM", 0), by.get("LOW", 0)),
"- CRA-Anforderungen betroffen: {} / 40".format(len(assessment.get("requirements_touched", []))),
"",
"## Priorisierte Befunde",
]
for m in assessment.get("mapped", []):
lines.append("- [{}] {}{}{}".format(
m.get("priority_tier", ""), m.get("finding_id", ""),
m.get("primary_requirement", ""), m.get("priority_reason", "")))
if assessment.get("cross_links"):
lines += ["", "## Cyber trifft Safety"]
for cl in assessment["cross_links"]:
lines.append("- {}{}".format(cl.get("safety_ref", ""), ", ".join(cl.get("cyber_finding_ids", []))))
return "\n".join(lines)
def save_snapshot(cra_project_id: str, tenant_id: str, assessment: dict,
title: str = "CRA-Cyber-Risikobeurteilung") -> dict:
db = SessionLocal()
try:
db.execute(text("""
UPDATE compliance_cra_documents SET status='superseded', superseded_at=NOW()
WHERE cra_project_id = CAST(:pid AS uuid) AND tenant_id = :tid
AND doc_type = :dt AND status != 'superseded'
"""), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE})
ver = db.execute(text("""
SELECT COALESCE(MAX(version), 0) + 1 FROM compliance_cra_documents
WHERE cra_project_id = CAST(:pid AS uuid) AND tenant_id = :tid AND doc_type = :dt
"""), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE}).scalar()
coverage = {"covered": assessment.get("requirements_touched", []), "total": 40}
row = db.execute(text("""
INSERT INTO compliance_cra_documents
(cra_project_id, tenant_id, doc_type, title, content_md, version,
requirements_coverage, generation_context, status)
VALUES (CAST(:pid AS uuid), :tid, :dt, :title, :md, :ver,
CAST(:cov AS jsonb), CAST(:ctx AS jsonb), 'draft')
RETURNING id, version, generated_at
"""), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE, "title": title,
"md": assessment_markdown(assessment), "ver": ver,
"cov": json.dumps(coverage), "ctx": json.dumps(assessment)}).fetchone()
db.commit()
return {"id": str(row.id), "version": row.version, "generated_at": row.generated_at.isoformat()}
except Exception:
db.rollback()
raise
finally:
db.close()
def list_snapshots(cra_project_id: str, tenant_id: str) -> list:
db = SessionLocal()
try:
rows = db.execute(text("""
SELECT id, version, status, generated_at, requirements_coverage
FROM compliance_cra_documents
WHERE cra_project_id = CAST(:pid AS uuid) AND tenant_id = :tid AND doc_type = :dt
ORDER BY version DESC
"""), {"pid": cra_project_id, "tid": tenant_id, "dt": DOC_TYPE}).fetchall()
return [{"id": str(r.id), "version": r.version, "status": r.status,
"generated_at": r.generated_at.isoformat(),
"coverage": r.requirements_coverage} for r in rows]
finally:
db.close()
def get_snapshot(snapshot_id: str, tenant_id: str):
db = SessionLocal()
try:
r = db.execute(text("""
SELECT id, version, status, generated_at, content_md, generation_context
FROM compliance_cra_documents
WHERE id = CAST(:sid AS uuid) AND tenant_id = :tid AND doc_type = :dt
"""), {"sid": snapshot_id, "tid": tenant_id, "dt": DOC_TYPE}).fetchone()
if not r:
return None
return {"id": str(r.id), "version": r.version, "status": r.status,
"generated_at": r.generated_at.isoformat(), "content_md": r.content_md,
"assessment": r.generation_context}
finally:
db.close()