8f21650d74
CI / detect-changes (push) Successful in 16s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 15s
CI / validate-canonical-controls (push) Successful in 13s
CI / loc-budget (push) Successful in 25s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 3m9s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 31s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
- /sdk/dokumente: Kundensicht nur auf veroeffentlichte Rechtsdokumente (Ansehen + Download); Proxy mit Allow-List nur /public — Templates/Drafts/ Generator bleiben unerreichbar. - /sdk/cra-meldewesen: CRA Art. 14 Meldewesen (24h/72h/14d-Kaskade) mit Fristen-Tracking + ENISA-SRP-Export-Entwurf (kein Live-API). Backend: cra_meldewesen (pure, getestet) + cra_incident_store (schema-neutral ueber compliance_cra_documents) + /api/v1/cra/incidents (additiv, contract-safe). - Screening (Self-Scan) aus dem Frontend genommen: Flow-Stepper-Eintrag ausgeblendet (visibleWhen), Dashboard-Kachel + Import-Button entfernt. Repo-Scanning laeuft extern im Compliance-Scanner; Backend-Router bleibt vorerst gemountet (Contract-Stabilitaet). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
151 lines
6.2 KiB
Python
151 lines
6.2 KiB
Python
"""CRA Article 14 incident-reporting cascade — pure, deterministic core.
|
|
|
|
The CRA obliges manufacturers, once they become AWARE of an actively exploited
|
|
vulnerability or a severe security incident, to report to ENISA's Single
|
|
Reporting Platform (SRP) / the CSIRT in a three-stage cascade:
|
|
|
|
* early warning — within 24 h (Art. 14(2)(a))
|
|
* notification — within 72 h (Art. 14(2)(b))
|
|
* final report — within 14 days (Art. 14(2)(c) / 14(4))
|
|
|
|
This module is pure (no DB, no network, no clock unless you pass `now`): deadline
|
|
computation + the ENISA-SRP export draft. There is NO live ENISA API yet, so the
|
|
"submission" is a structured export the user downloads / hands over — never a
|
|
live HTTP POST. Persistence + routes live in cra_incident_store / cra_incident_routes.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
# Reporting deadline obligation becomes active 2026-09-11 (CRA Art. 14).
|
|
REPORTING_ACTIVE_FROM = "2026-09-11"
|
|
|
|
STAGES = [
|
|
{"key": "early_warning", "label": "Frühwarnung", "hours": 24, "article": "Art. 14 Abs. 2 a)"},
|
|
{"key": "notification", "label": "Meldung", "hours": 72, "article": "Art. 14 Abs. 2 b)"},
|
|
{"key": "final", "label": "Abschlussbericht", "hours": 24 * 14, "article": "Art. 14 Abs. 2 c)"},
|
|
]
|
|
_STAGE_KEYS = [s["key"] for s in STAGES]
|
|
|
|
SEVERITIES = ["low", "medium", "high", "critical"]
|
|
KINDS = ["exploited_vulnerability", "severe_incident"]
|
|
|
|
# How long before a deadline we flag it "due soon" (amber).
|
|
_DUE_SOON_HOURS = 6
|
|
|
|
|
|
def _parse(iso: Optional[str]) -> Optional[datetime]:
|
|
if not iso:
|
|
return None
|
|
try:
|
|
dt = datetime.fromisoformat(str(iso).replace("Z", "+00:00"))
|
|
except (ValueError, TypeError):
|
|
return None
|
|
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
|
def _now(now_iso: Optional[str]) -> datetime:
|
|
return _parse(now_iso) or datetime.now(timezone.utc)
|
|
|
|
|
|
def compute_deadlines(
|
|
aware_at_iso: str,
|
|
submissions: Optional[dict] = None,
|
|
now_iso: Optional[str] = None,
|
|
) -> list:
|
|
"""Per-stage deadline + status from the moment of awareness.
|
|
|
|
submissions: {stage_key: submitted_at_iso} — a submitted stage is 'submitted'
|
|
regardless of timing. Status ∈ submitted | overdue | due_soon | pending.
|
|
"""
|
|
aware = _parse(aware_at_iso)
|
|
submissions = submissions or {}
|
|
now = _now(now_iso)
|
|
out = []
|
|
for s in STAGES:
|
|
due = aware + timedelta(hours=s["hours"]) if aware else None
|
|
submitted_at = submissions.get(s["key"])
|
|
if submitted_at:
|
|
status = "submitted"
|
|
elif due is None:
|
|
status = "pending"
|
|
elif now > due:
|
|
status = "overdue"
|
|
elif now > due - timedelta(hours=_DUE_SOON_HOURS):
|
|
status = "due_soon"
|
|
else:
|
|
status = "pending"
|
|
remaining = (due - now).total_seconds() if due else None
|
|
out.append({
|
|
"key": s["key"], "label": s["label"], "article": s["article"],
|
|
"due_at": due.isoformat() if due else None,
|
|
"submitted_at": submitted_at,
|
|
"status": status,
|
|
"remaining_seconds": int(remaining) if remaining is not None else None,
|
|
})
|
|
return out
|
|
|
|
|
|
def next_open_stage(deadlines: list) -> Optional[str]:
|
|
"""The earliest stage not yet submitted — what the user should act on next."""
|
|
for d in deadlines:
|
|
if d["status"] != "submitted":
|
|
return d["key"]
|
|
return None
|
|
|
|
|
|
def build_enisa_report(incident: dict, stage: str) -> dict:
|
|
"""Structured ENISA-SRP export draft for one cascade stage.
|
|
|
|
Mirrors the CRA Art. 14 content requirements. This is a DRAFT for manual
|
|
submission to the ENISA Single Reporting Platform — not a live API call.
|
|
Later stages include everything the earlier ones do, plus their extras.
|
|
"""
|
|
if stage not in _STAGE_KEYS:
|
|
raise ValueError(f"unknown stage '{stage}'")
|
|
base = {
|
|
"report_stage": stage,
|
|
"report_stage_article": next(s["article"] for s in STAGES if s["key"] == stage),
|
|
"manufacturer": incident.get("manufacturer", ""),
|
|
"product_name": incident.get("product_name", ""),
|
|
"product_version": incident.get("product_version", ""),
|
|
"incident_kind": incident.get("kind", ""),
|
|
"severity": incident.get("severity", ""),
|
|
"aware_at": incident.get("aware_at", ""),
|
|
"summary": incident.get("summary", ""),
|
|
"contact": incident.get("contact", ""),
|
|
"submission_target": "ENISA Single Reporting Platform (SRP)",
|
|
"draft_note": "Entwurf zur manuellen Übermittlung — keine Live-API-Übertragung.",
|
|
}
|
|
if stage in ("notification", "final"):
|
|
base.update({
|
|
"affected_components": incident.get("affected_components", ""),
|
|
"impact": incident.get("impact", ""),
|
|
"exploitation_status": incident.get("exploitation_status", ""),
|
|
"indicators_of_compromise": incident.get("iocs", ""),
|
|
"mitigations_in_place": incident.get("mitigations", ""),
|
|
"personal_data_affected": bool(incident.get("personal_data_affected", False)),
|
|
})
|
|
if stage == "final":
|
|
base.update({
|
|
"root_cause": incident.get("root_cause", ""),
|
|
"corrective_measures": incident.get("corrective_measures", ""),
|
|
"patch_available": bool(incident.get("patch_available", False)),
|
|
"patch_reference": incident.get("patch_reference", ""),
|
|
"lessons_learned": incident.get("lessons_learned", ""),
|
|
})
|
|
return base
|
|
|
|
|
|
def report_completeness(incident: dict, stage: str) -> dict:
|
|
"""Which fields the ENISA draft for `stage` still lacks — drives follow-up
|
|
prompts in the UI without blocking submission."""
|
|
draft = build_enisa_report(incident, stage)
|
|
skip = {"submission_target", "draft_note", "report_stage", "report_stage_article",
|
|
"personal_data_affected", "patch_available"}
|
|
missing = [k for k, v in draft.items() if k not in skip and (v is None or str(v).strip() == "")]
|
|
filled = [k for k, v in draft.items() if k not in skip and str(v).strip() != ""]
|
|
return {"filled": filled, "missing": missing,
|
|
"complete": not missing, "stage": stage}
|