feat(cra): Phase 4 — Vulnerability Disclosure + Post-Market Monitoring
Migration 121: compliance_cra_vulnerabilities table with full lifecycle tracking
- Status state machine: reported → triaged → patched → disclosed (+ withdrawn)
- CRA Art. 14(2) deadlines tracked: reported_to_enisa_at (24h), detailed_report_at (72h)
- CVE-ID, severity, CVSS, affected_components (JSONB), embargo_until
Backend endpoints in cra_routes.py:
- POST /vulnerabilities — create with validation (severity, CVSS range)
- GET /vulnerabilities — list with deadline-breach summary (24h/72h counters)
- PATCH /vulnerabilities/{id} — update fields + auto-set lifecycle timestamps
- DELETE /vulnerabilities/{id} — soft-delete (withdrawn)
- GET /monitoring — combined view: CRA deadlines + vuln summary + post-market checklist
Frontend:
- /vuln page: intake form, vuln cards with 24h/72h-countdown buttons,
status-transition flow with auto-timestamps
- /monitoring page: CRA deadlines (11.06.26 / 11.09.26 / 11.12.27), breach banner
if 24h/72h obligations missed, post-market checklist with deep-links
- Dashboard: +2 buttons (Vulns, Monitoring)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1073,3 +1073,642 @@ async def run_check(
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# PHASE 4: Vulnerability Disclosure + Post-Market Monitoring
|
||||
# =============================================================================
|
||||
|
||||
VULN_STATUS_WHITELIST = {"reported", "triaged", "patched", "disclosed", "withdrawn"}
|
||||
|
||||
|
||||
class CreateVulnRequest(BaseModel):
|
||||
title: str
|
||||
description: str = ""
|
||||
cve_id: Optional[str] = None
|
||||
severity: Optional[str] = None # LOW | MEDIUM | HIGH | CRITICAL
|
||||
cvss_score: Optional[float] = None
|
||||
affected_components: list[str] = []
|
||||
reporter_source: str = "internal"
|
||||
reporter_contact: Optional[str] = None
|
||||
notes: str = ""
|
||||
|
||||
|
||||
class UpdateVulnRequest(BaseModel):
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
cve_id: Optional[str] = None
|
||||
severity: Optional[str] = None
|
||||
cvss_score: Optional[float] = None
|
||||
affected_components: Optional[list[str]] = None
|
||||
reporter_source: Optional[str] = None
|
||||
reporter_contact: Optional[str] = None
|
||||
notes: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
embargo_until: Optional[str] = None # ISO datetime
|
||||
# Lifecycle timestamps — clients can set these explicitly or via status transition
|
||||
triaged_at: Optional[str] = None
|
||||
patched_at: Optional[str] = None
|
||||
disclosed_at: Optional[str] = None
|
||||
reported_to_enisa_at: Optional[str] = None
|
||||
detailed_report_at: Optional[str] = None
|
||||
|
||||
|
||||
def _vuln_to_dict(row) -> dict:
|
||||
def _iso(v):
|
||||
return v.isoformat() if v else None
|
||||
components = row.affected_components
|
||||
if isinstance(components, str):
|
||||
try:
|
||||
components = json.loads(components)
|
||||
except Exception:
|
||||
components = []
|
||||
return {
|
||||
"id": str(row.id),
|
||||
"cra_project_id": str(row.cra_project_id),
|
||||
"cve_id": row.cve_id,
|
||||
"title": row.title,
|
||||
"description": row.description or "",
|
||||
"severity": row.severity,
|
||||
"cvss_score": float(row.cvss_score) if row.cvss_score is not None else None,
|
||||
"affected_components": components or [],
|
||||
"reporter_source": row.reporter_source or "internal",
|
||||
"reporter_contact": row.reporter_contact,
|
||||
"discovered_at": _iso(row.discovered_at),
|
||||
"triaged_at": _iso(row.triaged_at),
|
||||
"patched_at": _iso(row.patched_at),
|
||||
"disclosed_at": _iso(row.disclosed_at),
|
||||
"embargo_until": _iso(row.embargo_until),
|
||||
"reported_to_enisa_at": _iso(row.reported_to_enisa_at),
|
||||
"detailed_report_at": _iso(row.detailed_report_at),
|
||||
"status": row.status,
|
||||
"notes": row.notes or "",
|
||||
"created_at": _iso(row.created_at),
|
||||
"updated_at": _iso(row.updated_at),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/{project_id}/vulnerabilities", status_code=201)
|
||||
async def create_vulnerability(
|
||||
project_id: str,
|
||||
body: CreateVulnRequest,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""Create a vulnerability record. discovered_at = now()."""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
if not _cra_project_exists(db, project_id, tenant_id):
|
||||
raise HTTPException(status_code=404, detail="CRA project not found")
|
||||
if body.severity and body.severity not in ("LOW", "MEDIUM", "HIGH", "CRITICAL"):
|
||||
raise HTTPException(status_code=400, detail="Invalid severity")
|
||||
if body.cvss_score is not None and not (0.0 <= body.cvss_score <= 10.0):
|
||||
raise HTTPException(status_code=400, detail="cvss_score must be 0.0-10.0")
|
||||
row = db.execute(
|
||||
text("""
|
||||
INSERT INTO compliance_cra_vulnerabilities
|
||||
(cra_project_id, tenant_id, cve_id, title, description,
|
||||
severity, cvss_score, affected_components,
|
||||
reporter_source, reporter_contact, notes)
|
||||
VALUES
|
||||
(:pid, :tid, :cve, :title, :desc,
|
||||
:sev, :cvss, CAST(:comp AS jsonb),
|
||||
:src, :rcontact, :notes)
|
||||
RETURNING *
|
||||
"""),
|
||||
{
|
||||
"pid": project_id, "tid": tenant_id,
|
||||
"cve": body.cve_id, "title": body.title, "desc": body.description,
|
||||
"sev": body.severity, "cvss": body.cvss_score,
|
||||
"comp": json.dumps(body.affected_components),
|
||||
"src": body.reporter_source, "rcontact": body.reporter_contact,
|
||||
"notes": body.notes,
|
||||
},
|
||||
).fetchone()
|
||||
db.commit()
|
||||
return _vuln_to_dict(row)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception:
|
||||
db.rollback()
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.get("/{project_id}/vulnerabilities")
|
||||
async def list_vulnerabilities(
|
||||
project_id: str,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""List all vulnerabilities for this project, newest first."""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
if not _cra_project_exists(db, project_id, tenant_id):
|
||||
raise HTTPException(status_code=404, detail="CRA project not found")
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT * FROM compliance_cra_vulnerabilities
|
||||
WHERE cra_project_id = :pid AND tenant_id = :tid
|
||||
ORDER BY discovered_at DESC
|
||||
"""),
|
||||
{"pid": project_id, "tid": tenant_id},
|
||||
).fetchall()
|
||||
items = [_vuln_to_dict(r) for r in rows]
|
||||
|
||||
# Compliance summary
|
||||
critical_open = sum(1 for v in items if v["severity"] == "CRITICAL" and v["status"] in ("reported", "triaged"))
|
||||
breached_24h = 0
|
||||
breached_72h = 0
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc)
|
||||
for v in items:
|
||||
if not v["discovered_at"]:
|
||||
continue
|
||||
disc = datetime.fromisoformat(v["discovered_at"])
|
||||
age_hours = (now - disc).total_seconds() / 3600
|
||||
if age_hours > 24 and not v["reported_to_enisa_at"]:
|
||||
breached_24h += 1
|
||||
if age_hours > 72 and not v["detailed_report_at"]:
|
||||
breached_72h += 1
|
||||
return {
|
||||
"project_id": project_id,
|
||||
"total": len(items),
|
||||
"summary": {
|
||||
"critical_open": critical_open,
|
||||
"breached_24h_reporting": breached_24h,
|
||||
"breached_72h_reporting": breached_72h,
|
||||
"by_status": {s: sum(1 for v in items if v["status"] == s) for s in VULN_STATUS_WHITELIST},
|
||||
},
|
||||
"items": items,
|
||||
}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.patch("/vulnerabilities/{vuln_id}")
|
||||
async def update_vulnerability(
|
||||
vuln_id: str,
|
||||
body: UpdateVulnRequest,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""Update vuln fields incl. status transition + lifecycle timestamps."""
|
||||
from datetime import datetime
|
||||
db = SessionLocal()
|
||||
try:
|
||||
updates: dict = {"vid": vuln_id, "tid": tenant_id}
|
||||
set_parts = ["updated_at = NOW()"]
|
||||
|
||||
for field in (
|
||||
"title", "description", "cve_id", "severity", "cvss_score",
|
||||
"reporter_source", "reporter_contact", "notes", "status",
|
||||
):
|
||||
val = getattr(body, field)
|
||||
if val is None:
|
||||
continue
|
||||
if field == "status" and val not in VULN_STATUS_WHITELIST:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid status. Allowed: {sorted(VULN_STATUS_WHITELIST)}")
|
||||
if field == "severity" and val not in ("LOW", "MEDIUM", "HIGH", "CRITICAL"):
|
||||
raise HTTPException(status_code=400, detail="Invalid severity")
|
||||
if field == "cvss_score" and not (0.0 <= float(val) <= 10.0):
|
||||
raise HTTPException(status_code=400, detail="cvss_score must be 0.0-10.0")
|
||||
set_parts.append(f"{field} = :{field}")
|
||||
updates[field] = val
|
||||
|
||||
if body.affected_components is not None:
|
||||
set_parts.append("affected_components = CAST(:comp AS jsonb)")
|
||||
updates["comp"] = json.dumps(body.affected_components)
|
||||
|
||||
# Auto-set timestamps on status transitions
|
||||
# If client passes status='triaged' and triaged_at is None, set to NOW()
|
||||
if body.status == "triaged" and not body.triaged_at:
|
||||
set_parts.append("triaged_at = COALESCE(triaged_at, NOW())")
|
||||
if body.status == "patched" and not body.patched_at:
|
||||
set_parts.append("patched_at = COALESCE(patched_at, NOW())")
|
||||
if body.status == "disclosed" and not body.disclosed_at:
|
||||
set_parts.append("disclosed_at = COALESCE(disclosed_at, NOW())")
|
||||
|
||||
for ts_field in ("triaged_at", "patched_at", "disclosed_at",
|
||||
"reported_to_enisa_at", "detailed_report_at", "embargo_until"):
|
||||
val = getattr(body, ts_field)
|
||||
if val is None:
|
||||
continue
|
||||
try:
|
||||
datetime.fromisoformat(val.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid ISO datetime for {ts_field}")
|
||||
set_parts.append(f"{ts_field} = :{ts_field}")
|
||||
updates[ts_field] = val
|
||||
|
||||
if len(set_parts) == 1:
|
||||
raise HTTPException(status_code=400, detail="No fields to update")
|
||||
|
||||
row = db.execute(
|
||||
text(f"""
|
||||
UPDATE compliance_cra_vulnerabilities
|
||||
SET {', '.join(set_parts)}
|
||||
WHERE id = :vid AND tenant_id = :tid
|
||||
RETURNING *
|
||||
"""),
|
||||
updates,
|
||||
).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Vulnerability not found")
|
||||
db.commit()
|
||||
return _vuln_to_dict(row)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception:
|
||||
db.rollback()
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.delete("/vulnerabilities/{vuln_id}")
|
||||
async def delete_vulnerability(
|
||||
vuln_id: str,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""Mark vulnerability as withdrawn (soft delete)."""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
row = db.execute(
|
||||
text("""
|
||||
UPDATE compliance_cra_vulnerabilities
|
||||
SET status = 'withdrawn', updated_at = NOW()
|
||||
WHERE id = :vid AND tenant_id = :tid AND status != 'withdrawn'
|
||||
RETURNING id
|
||||
"""),
|
||||
{"vid": vuln_id, "tid": tenant_id},
|
||||
).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Vulnerability not found or already withdrawn")
|
||||
db.commit()
|
||||
return {"success": True, "id": str(row.id), "status": "withdrawn"}
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception:
|
||||
db.rollback()
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.get("/{project_id}/monitoring")
|
||||
async def post_market_monitoring(
|
||||
project_id: str,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""Combined Post-Market view: CRA timeline + vuln summary + checklist progress."""
|
||||
from datetime import datetime, timezone
|
||||
db = SessionLocal()
|
||||
try:
|
||||
if not _cra_project_exists(db, project_id, tenant_id):
|
||||
raise HTTPException(status_code=404, detail="CRA project not found")
|
||||
|
||||
vulns = db.execute(
|
||||
text("""
|
||||
SELECT id, status, severity, discovered_at,
|
||||
reported_to_enisa_at, detailed_report_at
|
||||
FROM compliance_cra_vulnerabilities
|
||||
WHERE cra_project_id = :pid AND tenant_id = :tid AND status != 'withdrawn'
|
||||
"""),
|
||||
{"pid": project_id, "tid": tenant_id},
|
||||
).fetchall()
|
||||
|
||||
sbom_count = db.execute(
|
||||
text("SELECT count(*) FROM compliance_cra_sboms WHERE cra_project_id = :pid AND tenant_id = :tid"),
|
||||
{"pid": project_id, "tid": tenant_id},
|
||||
).scalar()
|
||||
|
||||
checks_count = db.execute(
|
||||
text("""
|
||||
SELECT count(*) FROM compliance_evidence_checks
|
||||
WHERE tenant_id = CAST(:tid AS uuid) AND project_id = :pid AND check_code LIKE 'cra_%'
|
||||
"""),
|
||||
{"tid": tenant_id, "pid": project_id},
|
||||
).scalar()
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
breached_24h = 0
|
||||
breached_72h = 0
|
||||
for v in vulns:
|
||||
if not v.discovered_at:
|
||||
continue
|
||||
age = (now - v.discovered_at).total_seconds() / 3600
|
||||
if age > 24 and not v.reported_to_enisa_at:
|
||||
breached_24h += 1
|
||||
if age > 72 and not v.detailed_report_at:
|
||||
breached_72h += 1
|
||||
|
||||
checklist = [
|
||||
{"item": "SBOM hochgeladen", "done": (sbom_count or 0) > 0,
|
||||
"href_suffix": "sbom"},
|
||||
{"item": "Automatisierte Checks konfiguriert", "done": (checks_count or 0) > 0,
|
||||
"href_suffix": "checks"},
|
||||
{"item": "Vulnerability-Tracking aktiv", "done": len(vulns) > 0,
|
||||
"href_suffix": "vuln"},
|
||||
{"item": "Keine 24h-Reporting-Pflichten ueberzogen", "done": breached_24h == 0,
|
||||
"href_suffix": "vuln"},
|
||||
{"item": "Keine 72h-Reporting-Pflichten ueberzogen", "done": breached_72h == 0,
|
||||
"href_suffix": "vuln"},
|
||||
]
|
||||
|
||||
return {
|
||||
"project_id": project_id,
|
||||
"deadlines": DEADLINES,
|
||||
"summary": {
|
||||
"active_vulns": len(vulns),
|
||||
"critical_vulns": sum(1 for v in vulns if v.severity == "CRITICAL"),
|
||||
"high_vulns": sum(1 for v in vulns if v.severity == "HIGH"),
|
||||
"breached_24h_reporting": breached_24h,
|
||||
"breached_72h_reporting": breached_72h,
|
||||
"sbom_versions": sbom_count or 0,
|
||||
"configured_checks": checks_count or 0,
|
||||
},
|
||||
"post_market_checklist": checklist,
|
||||
}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# PHASE 5: Document Generation (DoC, Technical Doc, CVD Policy, Update Policy, SBOM Report)
|
||||
# =============================================================================
|
||||
|
||||
from .cra_doc_templates import DOC_GENERATORS, DOC_TYPE_LABELS # noqa: E402
|
||||
|
||||
DOC_STATUS_WHITELIST = {"draft", "reviewed", "approved", "superseded"}
|
||||
|
||||
|
||||
class GenerateDocRequest(BaseModel):
|
||||
doc_type: str
|
||||
manufacturer: Optional[str] = None
|
||||
notified_body: Optional[str] = None
|
||||
security_contact: Optional[str] = None
|
||||
|
||||
|
||||
class ApproveDocRequest(BaseModel):
|
||||
signed_by: str
|
||||
status: str = "approved"
|
||||
|
||||
|
||||
def _doc_row_to_dict(row) -> dict:
|
||||
return {
|
||||
"id": str(row.id),
|
||||
"cra_project_id": str(row.cra_project_id),
|
||||
"doc_type": row.doc_type,
|
||||
"doc_type_label": DOC_TYPE_LABELS.get(row.doc_type, row.doc_type),
|
||||
"title": row.title,
|
||||
"content_md": row.content_md,
|
||||
"version": row.version,
|
||||
"requirements_coverage": (
|
||||
row.requirements_coverage
|
||||
if isinstance(row.requirements_coverage, dict)
|
||||
else json.loads(row.requirements_coverage or "{}")
|
||||
),
|
||||
"status": row.status,
|
||||
"signed_by": row.signed_by,
|
||||
"signed_at": row.signed_at.isoformat() if row.signed_at else None,
|
||||
"generated_at": row.generated_at.isoformat() if row.generated_at else None,
|
||||
"superseded_at": row.superseded_at.isoformat() if row.superseded_at else None,
|
||||
}
|
||||
|
||||
|
||||
@router.post("/{project_id}/documents/generate", status_code=201)
|
||||
async def generate_document(
|
||||
project_id: str,
|
||||
body: GenerateDocRequest,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""Generate a document of the given type from current project state."""
|
||||
if body.doc_type not in DOC_GENERATORS:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Unknown doc_type. Allowed: {sorted(DOC_GENERATORS.keys())}",
|
||||
)
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
proj_row = db.execute(
|
||||
text("""
|
||||
SELECT * FROM compliance_cra_projects
|
||||
WHERE id = :pid AND tenant_id = :tid
|
||||
"""),
|
||||
{"pid": project_id, "tid": tenant_id},
|
||||
).fetchone()
|
||||
if not proj_row:
|
||||
raise HTTPException(status_code=404, detail="CRA project not found")
|
||||
|
||||
project = _row_to_response(proj_row)
|
||||
|
||||
# For SBOM report: fetch latest SBOM
|
||||
latest_sbom = None
|
||||
if body.doc_type == "doc_sbom_report":
|
||||
sbom_row = db.execute(
|
||||
text("""
|
||||
SELECT id, filename, format, spec_version, component_count,
|
||||
summary, scan_status, uploaded_at
|
||||
FROM compliance_cra_sboms
|
||||
WHERE cra_project_id = :pid AND tenant_id = :tid
|
||||
ORDER BY uploaded_at DESC LIMIT 1
|
||||
"""),
|
||||
{"pid": project_id, "tid": tenant_id},
|
||||
).fetchone()
|
||||
if sbom_row:
|
||||
latest_sbom = {
|
||||
"filename": sbom_row.filename,
|
||||
"format": sbom_row.format,
|
||||
"spec_version": sbom_row.spec_version,
|
||||
"component_count": sbom_row.component_count,
|
||||
"summary": sbom_row.summary if isinstance(sbom_row.summary, dict) else json.loads(sbom_row.summary or "{}"),
|
||||
"scan_status": sbom_row.scan_status,
|
||||
"uploaded_at": sbom_row.uploaded_at.isoformat() if sbom_row.uploaded_at else None,
|
||||
}
|
||||
|
||||
# Invoke generator
|
||||
gen = DOC_GENERATORS[body.doc_type]
|
||||
kwargs: dict = {}
|
||||
if body.doc_type == "doc_eu_conformity":
|
||||
kwargs = {"manufacturer": body.manufacturer, "notified_body": body.notified_body}
|
||||
elif body.doc_type == "doc_cvd_policy":
|
||||
kwargs = {"security_contact": body.security_contact}
|
||||
elif body.doc_type == "doc_sbom_report":
|
||||
kwargs = {"latest_sbom": latest_sbom}
|
||||
title, content, coverage = gen(project, **kwargs)
|
||||
|
||||
# Supersede previous versions of this doc_type
|
||||
db.execute(
|
||||
text("""
|
||||
UPDATE compliance_cra_documents
|
||||
SET status = 'superseded', superseded_at = NOW()
|
||||
WHERE cra_project_id = :pid AND tenant_id = :tid
|
||||
AND doc_type = :dtype AND status != 'superseded'
|
||||
"""),
|
||||
{"pid": project_id, "tid": tenant_id, "dtype": body.doc_type},
|
||||
)
|
||||
|
||||
# Next version number
|
||||
next_ver = db.execute(
|
||||
text("""
|
||||
SELECT COALESCE(MAX(version), 0) + 1 FROM compliance_cra_documents
|
||||
WHERE cra_project_id = :pid AND doc_type = :dtype
|
||||
"""),
|
||||
{"pid": project_id, "dtype": body.doc_type},
|
||||
).scalar()
|
||||
|
||||
# Snapshot project context for audit
|
||||
gen_context = {
|
||||
"project_status": project.get("status"),
|
||||
"classification": project.get("cra_classification"),
|
||||
"conformity_path": project.get("conformity_path"),
|
||||
"generated_for_version": next_ver,
|
||||
}
|
||||
|
||||
row = db.execute(
|
||||
text("""
|
||||
INSERT INTO compliance_cra_documents
|
||||
(cra_project_id, tenant_id, doc_type, title, content_md,
|
||||
version, requirements_coverage, generation_context, status)
|
||||
VALUES
|
||||
(:pid, :tid, :dtype, :title, :content,
|
||||
:ver, CAST(:cov AS jsonb), CAST(:ctx AS jsonb), 'draft')
|
||||
RETURNING *
|
||||
"""),
|
||||
{
|
||||
"pid": project_id, "tid": tenant_id,
|
||||
"dtype": body.doc_type, "title": title, "content": content,
|
||||
"ver": next_ver,
|
||||
"cov": json.dumps(coverage),
|
||||
"ctx": json.dumps(gen_context),
|
||||
},
|
||||
).fetchone()
|
||||
db.commit()
|
||||
return _doc_row_to_dict(row)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception:
|
||||
db.rollback()
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.get("/{project_id}/documents")
|
||||
async def list_documents(
|
||||
project_id: str,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
include_superseded: bool = False,
|
||||
):
|
||||
"""List documents for a project. By default only latest/active version per type."""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
if not _cra_project_exists(db, project_id, tenant_id):
|
||||
raise HTTPException(status_code=404, detail="CRA project not found")
|
||||
|
||||
if include_superseded:
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT * FROM compliance_cra_documents
|
||||
WHERE cra_project_id = :pid AND tenant_id = :tid
|
||||
ORDER BY doc_type, version DESC
|
||||
"""),
|
||||
{"pid": project_id, "tid": tenant_id},
|
||||
).fetchall()
|
||||
else:
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT DISTINCT ON (doc_type) *
|
||||
FROM compliance_cra_documents
|
||||
WHERE cra_project_id = :pid AND tenant_id = :tid
|
||||
AND status != 'superseded'
|
||||
ORDER BY doc_type, version DESC
|
||||
"""),
|
||||
{"pid": project_id, "tid": tenant_id},
|
||||
).fetchall()
|
||||
|
||||
# Show all doc types — even if not yet generated
|
||||
existing_types = {r.doc_type for r in rows}
|
||||
items = [_doc_row_to_dict(r) for r in rows]
|
||||
for doc_type, label in DOC_TYPE_LABELS.items():
|
||||
if doc_type not in existing_types:
|
||||
items.append({
|
||||
"id": None,
|
||||
"cra_project_id": project_id,
|
||||
"doc_type": doc_type,
|
||||
"doc_type_label": label,
|
||||
"title": label,
|
||||
"content_md": None,
|
||||
"version": 0,
|
||||
"requirements_coverage": {},
|
||||
"status": "not_generated",
|
||||
"signed_by": None,
|
||||
"signed_at": None,
|
||||
"generated_at": None,
|
||||
"superseded_at": None,
|
||||
})
|
||||
items.sort(key=lambda x: x["doc_type"])
|
||||
return {"project_id": project_id, "total": len(items), "items": items}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.get("/documents/{doc_id}")
|
||||
async def get_document(
|
||||
doc_id: str,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""Get full document content (incl. content_md)."""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
row = db.execute(
|
||||
text("""
|
||||
SELECT * FROM compliance_cra_documents
|
||||
WHERE id = :did AND tenant_id = :tid
|
||||
"""),
|
||||
{"did": doc_id, "tid": tenant_id},
|
||||
).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Document not found")
|
||||
return _doc_row_to_dict(row)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.post("/documents/{doc_id}/approve")
|
||||
async def approve_document(
|
||||
doc_id: str,
|
||||
body: ApproveDocRequest,
|
||||
tenant_id: str = Depends(get_tenant_id),
|
||||
):
|
||||
"""Set status to reviewed/approved + signature."""
|
||||
if body.status not in DOC_STATUS_WHITELIST:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Invalid status. Allowed: {sorted(DOC_STATUS_WHITELIST)}",
|
||||
)
|
||||
if not body.signed_by.strip():
|
||||
raise HTTPException(status_code=400, detail="signed_by required")
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
row = db.execute(
|
||||
text("""
|
||||
UPDATE compliance_cra_documents
|
||||
SET status = :status, signed_by = :signer, signed_at = NOW()
|
||||
WHERE id = :did AND tenant_id = :tid AND status != 'superseded'
|
||||
RETURNING *
|
||||
"""),
|
||||
{"did": doc_id, "tid": tenant_id, "status": body.status, "signer": body.signed_by},
|
||||
).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Document not found or already superseded")
|
||||
db.commit()
|
||||
return _doc_row_to_dict(row)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception:
|
||||
db.rollback()
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
-- Migration 121: CRA Vulnerability Disclosure + Lifecycle
|
||||
-- Tracks vulnerabilities reported against a CRA project + CRA-mandated deadlines:
|
||||
-- - 24h: Early warning to ENISA / national CSIRT (CRA Art. 14(2)(a))
|
||||
-- - 72h: Detailed report (CRA Art. 14(2)(b))
|
||||
-- - Patch -> Disclosure (typically with embargo)
|
||||
--
|
||||
-- Status state machine (whitelist):
|
||||
-- reported -> triaged -> patched -> disclosed
|
||||
-- (or withdrawn at any time)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS compliance_cra_vulnerabilities (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
cra_project_id UUID NOT NULL,
|
||||
tenant_id VARCHAR(255) NOT NULL,
|
||||
|
||||
-- Identification
|
||||
cve_id VARCHAR(50), -- CVE-YYYY-NNNN (optional)
|
||||
title VARCHAR(500) NOT NULL,
|
||||
description TEXT DEFAULT '',
|
||||
severity VARCHAR(20), -- LOW | MEDIUM | HIGH | CRITICAL
|
||||
cvss_score NUMERIC(3,1), -- 0.0 - 10.0
|
||||
|
||||
-- Affected components (e.g. ["lodash@4.17.20", "axios@0.21.0"])
|
||||
affected_components JSONB NOT NULL DEFAULT '[]'::jsonb,
|
||||
|
||||
-- Reporter
|
||||
reporter_source VARCHAR(50) DEFAULT 'internal', -- internal | external | researcher | scanner
|
||||
reporter_contact VARCHAR(500),
|
||||
|
||||
-- Lifecycle timestamps
|
||||
discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
triaged_at TIMESTAMPTZ,
|
||||
patched_at TIMESTAMPTZ,
|
||||
disclosed_at TIMESTAMPTZ,
|
||||
embargo_until TIMESTAMPTZ,
|
||||
|
||||
-- CRA-Mandated reports (Art. 14(2))
|
||||
reported_to_enisa_at TIMESTAMPTZ, -- 24h deadline
|
||||
detailed_report_at TIMESTAMPTZ, -- 72h deadline
|
||||
|
||||
-- Status (whitelist)
|
||||
status VARCHAR(30) NOT NULL DEFAULT 'reported',
|
||||
|
||||
-- Free-text notes (triage rationale, decision log)
|
||||
notes TEXT DEFAULT '',
|
||||
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_cra_vuln_project ON compliance_cra_vulnerabilities(cra_project_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_cra_vuln_tenant ON compliance_cra_vulnerabilities(tenant_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_cra_vuln_status ON compliance_cra_vulnerabilities(cra_project_id, status);
|
||||
CREATE INDEX IF NOT EXISTS idx_cra_vuln_cve ON compliance_cra_vulnerabilities(cve_id) WHERE cve_id IS NOT NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_cra_vuln_discovered ON compliance_cra_vulnerabilities(cra_project_id, discovered_at DESC);
|
||||
Reference in New Issue
Block a user