"""CRA Article 14 incident-reporting cascade — pure, deterministic core. The CRA obliges manufacturers, once they become AWARE of an actively exploited vulnerability or a severe security incident, to report to ENISA's Single Reporting Platform (SRP) / the CSIRT in a three-stage cascade: * early warning — within 24 h (Art. 14(2)(a)) * notification — within 72 h (Art. 14(2)(b)) * final report — within 14 days (Art. 14(2)(c) / 14(4)) This module is pure (no DB, no network, no clock unless you pass `now`): deadline computation + the ENISA-SRP export draft. There is NO live ENISA API yet, so the "submission" is a structured export the user downloads / hands over — never a live HTTP POST. Persistence + routes live in cra_incident_store / cra_incident_routes. """ from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Optional # Reporting deadline obligation becomes active 2026-09-11 (CRA Art. 14). REPORTING_ACTIVE_FROM = "2026-09-11" STAGES = [ {"key": "early_warning", "label": "Frühwarnung", "hours": 24, "article": "Art. 14 Abs. 2 a)"}, {"key": "notification", "label": "Meldung", "hours": 72, "article": "Art. 14 Abs. 2 b)"}, {"key": "final", "label": "Abschlussbericht", "hours": 24 * 14, "article": "Art. 14 Abs. 2 c)"}, ] _STAGE_KEYS = [s["key"] for s in STAGES] SEVERITIES = ["low", "medium", "high", "critical"] KINDS = ["exploited_vulnerability", "severe_incident"] # How long before a deadline we flag it "due soon" (amber). _DUE_SOON_HOURS = 6 def _parse(iso: Optional[str]) -> Optional[datetime]: if not iso: return None try: dt = datetime.fromisoformat(str(iso).replace("Z", "+00:00")) except (ValueError, TypeError): return None return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) def _now(now_iso: Optional[str]) -> datetime: return _parse(now_iso) or datetime.now(timezone.utc) def compute_deadlines( aware_at_iso: str, submissions: Optional[dict] = None, now_iso: Optional[str] = None, ) -> list: """Per-stage deadline + status from the moment of awareness. submissions: {stage_key: submitted_at_iso} — a submitted stage is 'submitted' regardless of timing. Status ∈ submitted | overdue | due_soon | pending. """ aware = _parse(aware_at_iso) submissions = submissions or {} now = _now(now_iso) out = [] for s in STAGES: due = aware + timedelta(hours=s["hours"]) if aware else None submitted_at = submissions.get(s["key"]) if submitted_at: status = "submitted" elif due is None: status = "pending" elif now > due: status = "overdue" elif now > due - timedelta(hours=_DUE_SOON_HOURS): status = "due_soon" else: status = "pending" remaining = (due - now).total_seconds() if due else None out.append({ "key": s["key"], "label": s["label"], "article": s["article"], "due_at": due.isoformat() if due else None, "submitted_at": submitted_at, "status": status, "remaining_seconds": int(remaining) if remaining is not None else None, }) return out def next_open_stage(deadlines: list) -> Optional[str]: """The earliest stage not yet submitted — what the user should act on next.""" for d in deadlines: if d["status"] != "submitted": return d["key"] return None def build_enisa_report(incident: dict, stage: str) -> dict: """Structured ENISA-SRP export draft for one cascade stage. Mirrors the CRA Art. 14 content requirements. This is a DRAFT for manual submission to the ENISA Single Reporting Platform — not a live API call. Later stages include everything the earlier ones do, plus their extras. """ if stage not in _STAGE_KEYS: raise ValueError(f"unknown stage '{stage}'") base = { "report_stage": stage, "report_stage_article": next(s["article"] for s in STAGES if s["key"] == stage), "manufacturer": incident.get("manufacturer", ""), "product_name": incident.get("product_name", ""), "product_version": incident.get("product_version", ""), "incident_kind": incident.get("kind", ""), "severity": incident.get("severity", ""), "aware_at": incident.get("aware_at", ""), "summary": incident.get("summary", ""), "contact": incident.get("contact", ""), "submission_target": "ENISA Single Reporting Platform (SRP)", "draft_note": "Entwurf zur manuellen Übermittlung — keine Live-API-Übertragung.", } if stage in ("notification", "final"): base.update({ "affected_components": incident.get("affected_components", ""), "impact": incident.get("impact", ""), "exploitation_status": incident.get("exploitation_status", ""), "indicators_of_compromise": incident.get("iocs", ""), "mitigations_in_place": incident.get("mitigations", ""), "personal_data_affected": bool(incident.get("personal_data_affected", False)), }) if stage == "final": base.update({ "root_cause": incident.get("root_cause", ""), "corrective_measures": incident.get("corrective_measures", ""), "patch_available": bool(incident.get("patch_available", False)), "patch_reference": incident.get("patch_reference", ""), "lessons_learned": incident.get("lessons_learned", ""), }) return base def report_completeness(incident: dict, stage: str) -> dict: """Which fields the ENISA draft for `stage` still lacks — drives follow-up prompts in the UI without blocking submission.""" draft = build_enisa_report(incident, stage) skip = {"submission_target", "draft_note", "report_stage", "report_stage_article", "personal_data_affected", "patch_available"} missing = [k for k, v in draft.items() if k not in skip and (v is None or str(v).strip() == "")] filled = [k for k, v in draft.items() if k not in skip and str(v).strip() != ""] return {"filled": filled, "missing": missing, "complete": not missing, "stage": stage}