diff --git a/admin-compliance/app/api/sdk/v1/incidents/[[...path]]/route.ts b/admin-compliance/app/api/sdk/v1/incidents/[[...path]]/route.ts index 5c364f5..79bb2f8 100644 --- a/admin-compliance/app/api/sdk/v1/incidents/[[...path]]/route.ts +++ b/admin-compliance/app/api/sdk/v1/incidents/[[...path]]/route.ts @@ -1,12 +1,15 @@ /** * Incidents/Breach Management API Proxy - Catch-all route - * Proxies all /api/sdk/v1/incidents/* requests to ai-compliance-sdk backend + * Proxies all /api/sdk/v1/incidents/* requests to backend-compliance (Python) + * Python backend is Source of Truth (migrated from Go ai-compliance-sdk) * Supports PDF generation for authority notification forms */ import { NextRequest, NextResponse } from 'next/server' -const SDK_BACKEND_URL = process.env.SDK_API_URL || 'http://ai-compliance-sdk:8090' +const BACKEND_URL = process.env.COMPLIANCE_BACKEND_URL || 'http://backend-compliance:8002' +const DEFAULT_TENANT_ID = '9282a473-5c95-4b3a-bf78-0ecc0ec71d3e' +const DEFAULT_USER_ID = 'admin' async function proxyRequest( request: NextRequest, @@ -15,7 +18,7 @@ async function proxyRequest( ) { const pathStr = pathSegments?.join('/') || '' const searchParams = request.nextUrl.searchParams.toString() - const basePath = `${SDK_BACKEND_URL}/sdk/v1/incidents` + const basePath = `${BACKEND_URL}/api/compliance/incidents` const url = pathStr ? `${basePath}/${pathStr}${searchParams ? `?${searchParams}` : ''}` : `${basePath}${searchParams ? `?${searchParams}` : ''}` @@ -30,10 +33,8 @@ async function proxyRequest( headers['Authorization'] = authHeader } - const tenantHeader = request.headers.get('x-tenant-id') - if (tenantHeader) { - headers['X-Tenant-Id'] = tenantHeader - } + headers['X-Tenant-Id'] = request.headers.get('x-tenant-id') || DEFAULT_TENANT_ID + headers['X-User-Id'] = request.headers.get('x-user-id') || DEFAULT_USER_ID const fetchOptions: RequestInit = { method, diff --git a/ai-compliance-sdk/cmd/server/main.go b/ai-compliance-sdk/cmd/server/main.go index 2910b61..c32344b 100644 --- a/ai-compliance-sdk/cmd/server/main.go +++ b/ai-compliance-sdk/cmd/server/main.go @@ -614,7 +614,9 @@ func main() { whistleblowerRoutes.GET("/stats", whistleblowerHandlers.GetStatistics) } - // Incidents routes - Datenpannen-Management (DSGVO Art. 33/34) + // DEPRECATED: Incidents routes — Python backend is now Source of Truth. + // Frontend proxies to backend-compliance:8002/api/compliance/incidents/* + // These Go routes remain registered but should not be extended. incidentRoutes := v1.Group("/incidents") { // Incident CRUD diff --git a/ai-compliance-sdk/internal/api/handlers/incidents_handlers.go b/ai-compliance-sdk/internal/api/handlers/incidents_handlers.go index 3d7a5e4..a8586a3 100644 --- a/ai-compliance-sdk/internal/api/handlers/incidents_handlers.go +++ b/ai-compliance-sdk/internal/api/handlers/incidents_handlers.go @@ -1,3 +1,6 @@ +// DEPRECATED: Python backend (backend-compliance) is now Source of Truth for Incidents. +// Frontend proxies to backend-compliance:8002/api/compliance/incidents/* +// These Go handlers remain for backward compatibility but should not be extended. package handlers import ( @@ -12,6 +15,7 @@ import ( ) // IncidentHandlers handles incident/breach management HTTP requests +// DEPRECATED: Use Python backend-compliance incident_routes.py instead. type IncidentHandlers struct { store *incidents.Store } diff --git a/backend-compliance/compliance/api/__init__.py b/backend-compliance/compliance/api/__init__.py index 90c52ee..5563158 100644 --- a/backend-compliance/compliance/api/__init__.py +++ b/backend-compliance/compliance/api/__init__.py @@ -28,6 +28,7 @@ from .banner_routes import router as banner_router from .extraction_routes import router as extraction_router from .tom_routes import router as tom_router from .vendor_compliance_routes import router as vendor_compliance_router +from .incident_routes import router as incident_router # Include sub-routers router.include_router(audit_router) @@ -57,6 +58,7 @@ router.include_router(banner_router) router.include_router(extraction_router) router.include_router(tom_router) router.include_router(vendor_compliance_router) +router.include_router(incident_router) __all__ = [ "router", @@ -86,4 +88,5 @@ __all__ = [ "banner_router", "tom_router", "vendor_compliance_router", + "incident_router", ] diff --git a/backend-compliance/compliance/api/incident_routes.py b/backend-compliance/compliance/api/incident_routes.py new file mode 100644 index 0000000..c5dbfe0 --- /dev/null +++ b/backend-compliance/compliance/api/incident_routes.py @@ -0,0 +1,916 @@ +""" +FastAPI routes for Incidents / Datenpannen-Management (DSGVO Art. 33/34). + +Migrated from Go ai-compliance-sdk — Python backend is now Source of Truth. + +Endpoints: + POST /incidents — create incident + GET /incidents — list (filter: status, severity, category) + GET /incidents/stats — statistics + GET /incidents/{id} — detail + measures + deadline_info + PUT /incidents/{id} — update + DELETE /incidents/{id} — delete + PUT /incidents/{id}/status — quick status change (NEW) + POST /incidents/{id}/assess-risk — risk assessment + POST /incidents/{id}/notify-authority — Art. 33 authority notification + POST /incidents/{id}/notify-subjects — Art. 34 data subject notification + POST /incidents/{id}/measures — add measure + PUT /incidents/{id}/measures/{mid} — update measure + POST /incidents/{id}/measures/{mid}/complete — complete measure + POST /incidents/{id}/timeline — add timeline entry + POST /incidents/{id}/close — close incident +""" + +import json +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional, List, Any +from uuid import UUID, uuid4 + +from fastapi import APIRouter, Depends, HTTPException, Query, Header +from pydantic import BaseModel +from sqlalchemy import text +from sqlalchemy.orm import Session + +from classroom_engine.database import get_db + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/incidents", tags=["incidents"]) + +DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" + + +# ============================================================================= +# Helpers +# ============================================================================= + +def _calculate_risk_level(likelihood: int, impact: int) -> str: + """Calculate risk level from likelihood * impact score.""" + score = likelihood * impact + if score >= 20: + return "critical" + elif score >= 12: + return "high" + elif score >= 6: + return "medium" + return "low" + + +def _is_notification_required(risk_level: str) -> bool: + """DSGVO Art. 33 — notification required for critical/high risk.""" + return risk_level in ("critical", "high") + + +def _calculate_72h_deadline(detected_at: datetime) -> str: + """Calculate 72-hour DSGVO Art. 33 deadline.""" + deadline = detected_at + timedelta(hours=72) + return deadline.isoformat() + + +def _parse_jsonb(val): + """Parse a JSONB field — already dict/list from psycopg or a JSON string.""" + if val is None: + return None + if isinstance(val, (dict, list)): + return val + if isinstance(val, str): + try: + return json.loads(val) + except (json.JSONDecodeError, TypeError): + return val + return val + + +def _incident_to_response(row) -> dict: + """Convert a DB row (RowMapping) to incident response dict.""" + r = dict(row) + # Parse JSONB fields + for field in ( + "risk_assessment", "authority_notification", + "data_subject_notification", "timeline", + "affected_data_categories", "affected_systems", + ): + if field in r: + r[field] = _parse_jsonb(r[field]) + # Ensure ISO strings for datetime fields + for field in ("detected_at", "created_at", "updated_at", "closed_at"): + if field in r and r[field] is not None and hasattr(r[field], "isoformat"): + r[field] = r[field].isoformat() + return r + + +def _measure_to_response(row) -> dict: + """Convert a DB measure row to response dict.""" + r = dict(row) + for field in ("due_date", "completed_at", "created_at", "updated_at"): + if field in r and r[field] is not None and hasattr(r[field], "isoformat"): + r[field] = r[field].isoformat() + return r + + +def _append_timeline(db: Session, incident_id: str, entry: dict): + """Append a timeline entry to the incident's timeline JSONB array.""" + db.execute(text(""" + UPDATE incident_incidents + SET timeline = COALESCE(timeline, '[]'::jsonb) || :entry::jsonb, + updated_at = NOW() + WHERE id = :id + """), {"id": incident_id, "entry": json.dumps(entry)}) + + +# ============================================================================= +# Pydantic Schemas +# ============================================================================= + +class IncidentCreate(BaseModel): + title: str + description: Optional[str] = None + category: Optional[str] = "data_breach" + severity: Optional[str] = "medium" + detected_at: Optional[str] = None + affected_data_categories: Optional[List[str]] = None + affected_data_subject_count: Optional[int] = 0 + affected_systems: Optional[List[str]] = None + + +class IncidentUpdate(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + category: Optional[str] = None + status: Optional[str] = None + severity: Optional[str] = None + affected_data_categories: Optional[List[str]] = None + affected_data_subject_count: Optional[int] = None + affected_systems: Optional[List[str]] = None + + +class StatusUpdate(BaseModel): + status: str + + +class RiskAssessmentRequest(BaseModel): + likelihood: int + impact: int + notes: Optional[str] = None + + +class AuthorityNotificationRequest(BaseModel): + authority_name: str + reference_number: Optional[str] = None + contact_person: Optional[str] = None + notes: Optional[str] = None + + +class DataSubjectNotificationRequest(BaseModel): + notification_text: str + channel: str = "email" + affected_count: Optional[int] = 0 + + +class MeasureCreate(BaseModel): + title: str + description: Optional[str] = None + measure_type: str = "corrective" + responsible: Optional[str] = None + due_date: Optional[str] = None + + +class MeasureUpdate(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + measure_type: Optional[str] = None + status: Optional[str] = None + responsible: Optional[str] = None + due_date: Optional[str] = None + + +class TimelineEntryRequest(BaseModel): + action: str + details: Optional[str] = None + + +class CloseIncidentRequest(BaseModel): + root_cause: str + lessons_learned: Optional[str] = None + + +# ============================================================================= +# CRUD Endpoints +# ============================================================================= + +@router.post("") +def create_incident( + body: IncidentCreate, + db: Session = Depends(get_db), + x_tenant_id: Optional[str] = Header(None), + x_user_id: Optional[str] = Header(None), +): + tenant_id = x_tenant_id or DEFAULT_TENANT_ID + user_id = x_user_id or "system" + + incident_id = str(uuid4()) + now = datetime.now(timezone.utc) + + detected_at = now + if body.detected_at: + try: + parsed = datetime.fromisoformat(body.detected_at.replace("Z", "+00:00")) + # Ensure timezone-aware + detected_at = parsed if parsed.tzinfo else parsed.replace(tzinfo=timezone.utc) + except (ValueError, AttributeError): + detected_at = now + + deadline = detected_at + timedelta(hours=72) + + authority_notification = { + "status": "pending", + "deadline": deadline.isoformat(), + } + data_subject_notification = { + "required": False, + "status": "not_required", + } + timeline = [{ + "timestamp": now.isoformat(), + "action": "incident_created", + "user_id": user_id, + "details": "Incident detected and reported", + }] + + db.execute(text(""" + INSERT INTO incident_incidents ( + id, tenant_id, title, description, category, status, severity, + detected_at, reported_by, + affected_data_categories, affected_data_subject_count, affected_systems, + authority_notification, data_subject_notification, timeline, + created_at, updated_at + ) VALUES ( + :id, :tenant_id, :title, :description, :category, 'detected', :severity, + :detected_at, :reported_by, + CAST(:affected_data_categories AS jsonb), + :affected_data_subject_count, + CAST(:affected_systems AS jsonb), + CAST(:authority_notification AS jsonb), + CAST(:data_subject_notification AS jsonb), + CAST(:timeline AS jsonb), + :now, :now + ) + """), { + "id": incident_id, + "tenant_id": tenant_id, + "title": body.title, + "description": body.description or "", + "category": body.category, + "severity": body.severity, + "detected_at": detected_at.isoformat(), + "reported_by": user_id, + "affected_data_categories": json.dumps(body.affected_data_categories or []), + "affected_data_subject_count": body.affected_data_subject_count or 0, + "affected_systems": json.dumps(body.affected_systems or []), + "authority_notification": json.dumps(authority_notification), + "data_subject_notification": json.dumps(data_subject_notification), + "timeline": json.dumps(timeline), + "now": now.isoformat(), + }) + db.commit() + + # Fetch back for response + result = db.execute(text( + "SELECT * FROM incident_incidents WHERE id = :id" + ), {"id": incident_id}) + row = result.mappings().first() + incident_resp = _incident_to_response(row) if row else {} + + return { + "incident": incident_resp, + "authority_deadline": deadline.isoformat(), + "hours_until_deadline": (deadline - now).total_seconds() / 3600, + } + + +@router.get("") +def list_incidents( + db: Session = Depends(get_db), + x_tenant_id: Optional[str] = Header(None), + status: Optional[str] = Query(None), + severity: Optional[str] = Query(None), + category: Optional[str] = Query(None), + limit: int = Query(50), + offset: int = Query(0), +): + tenant_id = x_tenant_id or DEFAULT_TENANT_ID + + where_clauses = ["tenant_id = :tenant_id"] + params: dict = {"tenant_id": tenant_id, "limit": limit, "offset": offset} + + if status: + where_clauses.append("status = :status") + params["status"] = status + if severity: + where_clauses.append("severity = :severity") + params["severity"] = severity + if category: + where_clauses.append("category = :category") + params["category"] = category + + where_sql = " AND ".join(where_clauses) + + count_result = db.execute( + text(f"SELECT COUNT(*) FROM incident_incidents WHERE {where_sql}"), + params, + ) + total = count_result.scalar() or 0 + + result = db.execute(text(f""" + SELECT * FROM incident_incidents + WHERE {where_sql} + ORDER BY created_at DESC + LIMIT :limit OFFSET :offset + """), params) + + incidents = [_incident_to_response(r) for r in result.mappings().all()] + return {"incidents": incidents, "total": total} + + +@router.get("/stats") +def get_stats( + db: Session = Depends(get_db), + x_tenant_id: Optional[str] = Header(None), +): + tenant_id = x_tenant_id or DEFAULT_TENANT_ID + + result = db.execute(text(""" + SELECT + COUNT(*) AS total, + SUM(CASE WHEN status != 'closed' THEN 1 ELSE 0 END) AS open, + SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) AS closed, + SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) AS critical, + SUM(CASE WHEN severity = 'high' THEN 1 ELSE 0 END) AS high, + SUM(CASE WHEN severity = 'medium' THEN 1 ELSE 0 END) AS medium, + SUM(CASE WHEN severity = 'low' THEN 1 ELSE 0 END) AS low + FROM incident_incidents + WHERE tenant_id = :tenant_id + """), {"tenant_id": tenant_id}) + row = result.mappings().first() + + return { + "total": int(row["total"] or 0), + "open": int(row["open"] or 0), + "closed": int(row["closed"] or 0), + "by_severity": { + "critical": int(row["critical"] or 0), + "high": int(row["high"] or 0), + "medium": int(row["medium"] or 0), + "low": int(row["low"] or 0), + }, + } + + +@router.get("/{incident_id}") +def get_incident( + incident_id: UUID, + db: Session = Depends(get_db), +): + result = db.execute(text( + "SELECT * FROM incident_incidents WHERE id = :id" + ), {"id": str(incident_id)}) + row = result.mappings().first() + if not row: + raise HTTPException(status_code=404, detail="incident not found") + + incident = _incident_to_response(row) + + # Get measures + measures_result = db.execute(text( + "SELECT * FROM incident_measures WHERE incident_id = :id ORDER BY created_at" + ), {"id": str(incident_id)}) + measures = [_measure_to_response(m) for m in measures_result.mappings().all()] + + # Calculate deadline info + deadline_info = None + auth_notif = _parse_jsonb(row["authority_notification"]) if "authority_notification" in row.keys() else None + if auth_notif and isinstance(auth_notif, dict) and "deadline" in auth_notif: + try: + deadline_dt = datetime.fromisoformat(auth_notif["deadline"].replace("Z", "+00:00")) + now = datetime.now(timezone.utc) + hours_remaining = (deadline_dt - now).total_seconds() / 3600 + deadline_info = { + "deadline": auth_notif["deadline"], + "hours_remaining": hours_remaining, + "overdue": hours_remaining < 0, + } + except (ValueError, TypeError): + pass + + return { + "incident": incident, + "measures": measures, + "deadline_info": deadline_info, + } + + +@router.put("/{incident_id}") +def update_incident( + incident_id: UUID, + body: IncidentUpdate, + db: Session = Depends(get_db), +): + iid = str(incident_id) + + # Check exists + check = db.execute(text( + "SELECT id FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + if not check.first(): + raise HTTPException(status_code=404, detail="incident not found") + + updates = [] + params: dict = {"id": iid} + for field in ("title", "description", "category", "status", "severity"): + val = getattr(body, field, None) + if val is not None: + updates.append(f"{field} = :{field}") + params[field] = val + if body.affected_data_categories is not None: + updates.append("affected_data_categories = CAST(:adc AS jsonb)") + params["adc"] = json.dumps(body.affected_data_categories) + if body.affected_data_subject_count is not None: + updates.append("affected_data_subject_count = :adsc") + params["adsc"] = body.affected_data_subject_count + if body.affected_systems is not None: + updates.append("affected_systems = CAST(:asys AS jsonb)") + params["asys"] = json.dumps(body.affected_systems) + + if not updates: + raise HTTPException(status_code=400, detail="no fields to update") + + updates.append("updated_at = NOW()") + sql = f"UPDATE incident_incidents SET {', '.join(updates)} WHERE id = :id" + db.execute(text(sql), params) + db.commit() + + result = db.execute(text( + "SELECT * FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + row = result.mappings().first() + return {"incident": _incident_to_response(row)} + + +@router.delete("/{incident_id}") +def delete_incident( + incident_id: UUID, + db: Session = Depends(get_db), +): + iid = str(incident_id) + check = db.execute(text( + "SELECT id FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + if not check.first(): + raise HTTPException(status_code=404, detail="incident not found") + + db.execute(text("DELETE FROM incident_measures WHERE incident_id = :id"), {"id": iid}) + db.execute(text("DELETE FROM incident_incidents WHERE id = :id"), {"id": iid}) + db.commit() + return {"message": "incident deleted"} + + +# ============================================================================= +# Status Update (NEW — not in Go) +# ============================================================================= + +@router.put("/{incident_id}/status") +def update_status( + incident_id: UUID, + body: StatusUpdate, + db: Session = Depends(get_db), + x_user_id: Optional[str] = Header(None), +): + iid = str(incident_id) + user_id = x_user_id or "system" + + check = db.execute(text( + "SELECT id FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + if not check.first(): + raise HTTPException(status_code=404, detail="incident not found") + + db.execute(text(""" + UPDATE incident_incidents + SET status = :status, updated_at = NOW() + WHERE id = :id + """), {"id": iid, "status": body.status}) + + _append_timeline(db, iid, { + "timestamp": datetime.now(timezone.utc).isoformat(), + "action": "status_changed", + "user_id": user_id, + "details": f"Status changed to {body.status}", + }) + db.commit() + + result = db.execute(text( + "SELECT * FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + row = result.mappings().first() + return {"incident": _incident_to_response(row)} + + +# ============================================================================= +# Risk Assessment +# ============================================================================= + +@router.post("/{incident_id}/assess-risk") +def assess_risk( + incident_id: UUID, + body: RiskAssessmentRequest, + db: Session = Depends(get_db), + x_user_id: Optional[str] = Header(None), +): + iid = str(incident_id) + user_id = x_user_id or "system" + + check = db.execute(text( + "SELECT id, status, authority_notification FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + row = check.mappings().first() + if not row: + raise HTTPException(status_code=404, detail="incident not found") + + risk_level = _calculate_risk_level(body.likelihood, body.impact) + notification_required = _is_notification_required(risk_level) + now = datetime.now(timezone.utc) + + assessment = { + "likelihood": body.likelihood, + "impact": body.impact, + "risk_level": risk_level, + "assessed_at": now.isoformat(), + "assessed_by": user_id, + "notes": body.notes or "", + } + + new_status = "assessment" + if notification_required: + new_status = "notification_required" + # Update authority notification status to pending + auth = _parse_jsonb(row["authority_notification"]) or {} + auth["status"] = "pending" + db.execute(text(""" + UPDATE incident_incidents + SET authority_notification = CAST(:an AS jsonb) + WHERE id = :id + """), {"id": iid, "an": json.dumps(auth)}) + + db.execute(text(""" + UPDATE incident_incidents + SET risk_assessment = CAST(:ra AS jsonb), + status = :status, + updated_at = NOW() + WHERE id = :id + """), {"id": iid, "ra": json.dumps(assessment), "status": new_status}) + + _append_timeline(db, iid, { + "timestamp": now.isoformat(), + "action": "risk_assessed", + "user_id": user_id, + "details": f"Risk level: {risk_level} (likelihood={body.likelihood}, impact={body.impact})", + }) + db.commit() + + return { + "risk_assessment": assessment, + "notification_required": notification_required, + "incident_status": new_status, + } + + +# ============================================================================= +# Authority Notification (Art. 33) +# ============================================================================= + +@router.post("/{incident_id}/notify-authority") +def notify_authority( + incident_id: UUID, + body: AuthorityNotificationRequest, + db: Session = Depends(get_db), + x_user_id: Optional[str] = Header(None), +): + iid = str(incident_id) + user_id = x_user_id or "system" + + check = db.execute(text( + "SELECT id, detected_at, authority_notification FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + row = check.mappings().first() + if not row: + raise HTTPException(status_code=404, detail="incident not found") + + now = datetime.now(timezone.utc) + + # Preserve existing deadline + auth_existing = _parse_jsonb(row["authority_notification"]) or {} + deadline_str = auth_existing.get("deadline") + if not deadline_str and row["detected_at"]: + detected = row["detected_at"] + if hasattr(detected, "isoformat"): + deadline_str = (detected + timedelta(hours=72)).isoformat() + else: + deadline_str = _calculate_72h_deadline( + datetime.fromisoformat(str(detected).replace("Z", "+00:00")) + ) + + notification = { + "status": "sent", + "deadline": deadline_str, + "submitted_at": now.isoformat(), + "authority_name": body.authority_name, + "reference_number": body.reference_number or "", + "contact_person": body.contact_person or "", + "notes": body.notes or "", + } + + db.execute(text(""" + UPDATE incident_incidents + SET authority_notification = CAST(:an AS jsonb), + status = 'notification_sent', + updated_at = NOW() + WHERE id = :id + """), {"id": iid, "an": json.dumps(notification)}) + + _append_timeline(db, iid, { + "timestamp": now.isoformat(), + "action": "authority_notified", + "user_id": user_id, + "details": f"Authority notification submitted to {body.authority_name}", + }) + db.commit() + + # Check if submitted within 72h + submitted_within_72h = True + if deadline_str: + try: + deadline_dt = datetime.fromisoformat(deadline_str.replace("Z", "+00:00")) + submitted_within_72h = now < deadline_dt + except (ValueError, TypeError): + pass + + return { + "authority_notification": notification, + "submitted_within_72h": submitted_within_72h, + } + + +# ============================================================================= +# Data Subject Notification (Art. 34) +# ============================================================================= + +@router.post("/{incident_id}/notify-subjects") +def notify_subjects( + incident_id: UUID, + body: DataSubjectNotificationRequest, + db: Session = Depends(get_db), + x_user_id: Optional[str] = Header(None), +): + iid = str(incident_id) + user_id = x_user_id or "system" + + check = db.execute(text( + "SELECT id, affected_data_subject_count FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + row = check.mappings().first() + if not row: + raise HTTPException(status_code=404, detail="incident not found") + + now = datetime.now(timezone.utc) + affected_count = body.affected_count or row["affected_data_subject_count"] or 0 + + notification = { + "required": True, + "status": "sent", + "sent_at": now.isoformat(), + "affected_count": affected_count, + "notification_text": body.notification_text, + "channel": body.channel, + } + + db.execute(text(""" + UPDATE incident_incidents + SET data_subject_notification = CAST(:dsn AS jsonb), + updated_at = NOW() + WHERE id = :id + """), {"id": iid, "dsn": json.dumps(notification)}) + + _append_timeline(db, iid, { + "timestamp": now.isoformat(), + "action": "data_subjects_notified", + "user_id": user_id, + "details": f"Data subjects notified via {body.channel} ({affected_count} affected)", + }) + db.commit() + + return {"data_subject_notification": notification} + + +# ============================================================================= +# Measures +# ============================================================================= + +@router.post("/{incident_id}/measures") +def add_measure( + incident_id: UUID, + body: MeasureCreate, + db: Session = Depends(get_db), + x_user_id: Optional[str] = Header(None), +): + iid = str(incident_id) + user_id = x_user_id or "system" + + check = db.execute(text( + "SELECT id FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + if not check.first(): + raise HTTPException(status_code=404, detail="incident not found") + + measure_id = str(uuid4()) + now = datetime.now(timezone.utc) + + db.execute(text(""" + INSERT INTO incident_measures ( + id, incident_id, title, description, measure_type, status, + responsible, due_date, created_at, updated_at + ) VALUES ( + :id, :incident_id, :title, :description, :measure_type, 'planned', + :responsible, :due_date, :now, :now + ) + """), { + "id": measure_id, + "incident_id": iid, + "title": body.title, + "description": body.description or "", + "measure_type": body.measure_type, + "responsible": body.responsible or "", + "due_date": body.due_date, + "now": now.isoformat(), + }) + + _append_timeline(db, iid, { + "timestamp": now.isoformat(), + "action": "measure_added", + "user_id": user_id, + "details": f"Measure added: {body.title} ({body.measure_type})", + }) + db.commit() + + result = db.execute(text( + "SELECT * FROM incident_measures WHERE id = :id" + ), {"id": measure_id}) + measure = _measure_to_response(result.mappings().first()) + return {"measure": measure} + + +@router.put("/{incident_id}/measures/{measure_id}") +def update_measure( + incident_id: UUID, + measure_id: UUID, + body: MeasureUpdate, + db: Session = Depends(get_db), +): + mid = str(measure_id) + + check = db.execute(text( + "SELECT id FROM incident_measures WHERE id = :id" + ), {"id": mid}) + if not check.first(): + raise HTTPException(status_code=404, detail="measure not found") + + updates = [] + params: dict = {"id": mid} + for field in ("title", "description", "measure_type", "status", "responsible", "due_date"): + val = getattr(body, field, None) + if val is not None: + updates.append(f"{field} = :{field}") + params[field] = val + + if not updates: + raise HTTPException(status_code=400, detail="no fields to update") + + updates.append("updated_at = NOW()") + sql = f"UPDATE incident_measures SET {', '.join(updates)} WHERE id = :id" + db.execute(text(sql), params) + db.commit() + + result = db.execute(text( + "SELECT * FROM incident_measures WHERE id = :id" + ), {"id": mid}) + measure = _measure_to_response(result.mappings().first()) + return {"measure": measure} + + +@router.post("/{incident_id}/measures/{measure_id}/complete") +def complete_measure( + incident_id: UUID, + measure_id: UUID, + db: Session = Depends(get_db), +): + mid = str(measure_id) + + check = db.execute(text( + "SELECT id FROM incident_measures WHERE id = :id" + ), {"id": mid}) + if not check.first(): + raise HTTPException(status_code=404, detail="measure not found") + + db.execute(text(""" + UPDATE incident_measures + SET status = 'completed', completed_at = NOW(), updated_at = NOW() + WHERE id = :id + """), {"id": mid}) + db.commit() + + return {"message": "measure completed"} + + +# ============================================================================= +# Timeline +# ============================================================================= + +@router.post("/{incident_id}/timeline") +def add_timeline_entry( + incident_id: UUID, + body: TimelineEntryRequest, + db: Session = Depends(get_db), + x_user_id: Optional[str] = Header(None), +): + iid = str(incident_id) + user_id = x_user_id or "system" + + check = db.execute(text( + "SELECT id FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + if not check.first(): + raise HTTPException(status_code=404, detail="incident not found") + + now = datetime.now(timezone.utc) + entry = { + "timestamp": now.isoformat(), + "action": body.action, + "user_id": user_id, + "details": body.details or "", + } + + _append_timeline(db, iid, entry) + db.commit() + + return {"timeline_entry": entry} + + +# ============================================================================= +# Close Incident +# ============================================================================= + +@router.post("/{incident_id}/close") +def close_incident( + incident_id: UUID, + body: CloseIncidentRequest, + db: Session = Depends(get_db), + x_user_id: Optional[str] = Header(None), +): + iid = str(incident_id) + user_id = x_user_id or "system" + + check = db.execute(text( + "SELECT id FROM incident_incidents WHERE id = :id" + ), {"id": iid}) + if not check.first(): + raise HTTPException(status_code=404, detail="incident not found") + + now = datetime.now(timezone.utc) + + db.execute(text(""" + UPDATE incident_incidents + SET status = 'closed', + root_cause = :root_cause, + lessons_learned = :lessons_learned, + closed_at = :now, + updated_at = :now + WHERE id = :id + """), { + "id": iid, + "root_cause": body.root_cause, + "lessons_learned": body.lessons_learned or "", + "now": now.isoformat(), + }) + + _append_timeline(db, iid, { + "timestamp": now.isoformat(), + "action": "incident_closed", + "user_id": user_id, + "details": f"Incident closed. Root cause: {body.root_cause}", + }) + db.commit() + + return { + "message": "incident closed", + "root_cause": body.root_cause, + "lessons_learned": body.lessons_learned or "", + } diff --git a/backend-compliance/tests/test_incident_routes.py b/backend-compliance/tests/test_incident_routes.py new file mode 100644 index 0000000..0c2b2d7 --- /dev/null +++ b/backend-compliance/tests/test_incident_routes.py @@ -0,0 +1,671 @@ +"""Tests for Incident routes (incident_routes.py) — Datenpannen-Management DSGVO Art. 33/34.""" + +import json +import pytest +from unittest.mock import MagicMock +from fastapi.testclient import TestClient +from fastapi import FastAPI +from datetime import datetime, timedelta, timezone + +from compliance.api.incident_routes import ( + router, + _calculate_risk_level, + _is_notification_required, + _calculate_72h_deadline, + _incident_to_response, + _measure_to_response, + _parse_jsonb, + DEFAULT_TENANT_ID, +) + +app = FastAPI() +app.include_router(router) +client = TestClient(app) + +DEFAULT_TENANT = DEFAULT_TENANT_ID +INCIDENT_ID = "ffffffff-0001-0001-0001-000000000001" +MEASURE_ID = "ffffffff-0002-0002-0002-000000000002" +UNKNOWN_ID = "aaaaaaaa-9999-9999-9999-999999999999" + + +# ============================================================================= +# Helpers — _DictResult / _DictSession pattern for SQLite-free DB mocks +# ============================================================================= + +class _DictRow: + """Simulates a SQLAlchemy RowMapping (dict-like).""" + def __init__(self, data: dict): + self._data = data + + def __getitem__(self, key): + return self._data[key] + + def __contains__(self, key): + return key in self._data + + def keys(self): + return self._data.keys() + + def items(self): + return self._data.items() + + def values(self): + return self._data.values() + + +class _DictResult: + """Simulates a SQLAlchemy CursorResult.""" + def __init__(self, rows=None, scalar_val=None): + self._rows = rows or [] + self._scalar_val = scalar_val + + def mappings(self): + return self + + def first(self): + return _DictRow(self._rows[0]) if self._rows else None + + def all(self): + return [_DictRow(r) for r in self._rows] + + def scalar(self): + return self._scalar_val + + +def make_incident_row(overrides=None): + now = datetime(2024, 6, 1, 12, 0, 0) + deadline = now + timedelta(hours=72) + data = { + "id": INCIDENT_ID, + "tenant_id": DEFAULT_TENANT, + "title": "Datenpanne Test", + "description": "Test Beschreibung", + "category": "data_breach", + "status": "detected", + "severity": "medium", + "detected_at": now, + "reported_by": "admin", + "affected_data_categories": json.dumps(["personal"]), + "affected_data_subject_count": 100, + "affected_systems": json.dumps(["CRM"]), + "risk_assessment": None, + "authority_notification": json.dumps({ + "status": "pending", + "deadline": deadline.isoformat(), + }), + "data_subject_notification": json.dumps({ + "required": False, + "status": "not_required", + }), + "timeline": json.dumps([{ + "timestamp": now.isoformat(), + "action": "incident_created", + "user_id": "admin", + "details": "Incident detected and reported", + }]), + "root_cause": None, + "lessons_learned": None, + "closed_at": None, + "created_at": now, + "updated_at": now, + } + if overrides: + data.update(overrides) + return data + + +def make_measure_row(overrides=None): + now = datetime(2024, 6, 1, 12, 0, 0) + data = { + "id": MEASURE_ID, + "incident_id": INCIDENT_ID, + "title": "Passwort Reset", + "description": "Alle betroffenen Passwoerter zuruecksetzen", + "measure_type": "corrective", + "status": "planned", + "responsible": "IT-Admin", + "due_date": None, + "completed_at": None, + "created_at": now, + "updated_at": now, + } + if overrides: + data.update(overrides) + return data + + +@pytest.fixture +def mock_db(): + from classroom_engine.database import get_db + db = MagicMock() + app.dependency_overrides[get_db] = lambda: db + yield db + app.dependency_overrides.clear() + + +# ============================================================================= +# Helper / Utility Tests +# ============================================================================= + +class TestCalculateRiskLevel: + def test_low(self): + assert _calculate_risk_level(1, 1) == "low" + assert _calculate_risk_level(2, 2) == "low" + + def test_medium(self): + assert _calculate_risk_level(2, 3) == "medium" + assert _calculate_risk_level(3, 3) == "medium" + + def test_high(self): + assert _calculate_risk_level(3, 4) == "high" + assert _calculate_risk_level(4, 4) == "high" + + def test_critical(self): + assert _calculate_risk_level(4, 5) == "critical" + assert _calculate_risk_level(5, 5) == "critical" + + +class TestIsNotificationRequired: + def test_critical_requires(self): + assert _is_notification_required("critical") is True + + def test_high_requires(self): + assert _is_notification_required("high") is True + + def test_medium_not_required(self): + assert _is_notification_required("medium") is False + + def test_low_not_required(self): + assert _is_notification_required("low") is False + + +class TestCalculate72hDeadline: + def test_returns_iso_string(self): + dt = datetime(2024, 6, 1, 12, 0, 0) + result = _calculate_72h_deadline(dt) + assert "2024-06-04T12:00:00" in result + + +class TestParseJsonb: + def test_dict_passthrough(self): + assert _parse_jsonb({"a": 1}) == {"a": 1} + + def test_list_passthrough(self): + assert _parse_jsonb([1, 2]) == [1, 2] + + def test_json_string(self): + assert _parse_jsonb('{"a": 1}') == {"a": 1} + + def test_none(self): + assert _parse_jsonb(None) is None + + +class TestIncidentToResponse: + def test_parses_jsonb_fields(self): + row = _DictRow(make_incident_row()) + result = _incident_to_response(row) + assert isinstance(result["timeline"], list) + assert isinstance(result["authority_notification"], dict) + + def test_converts_datetime_to_iso(self): + row = _DictRow(make_incident_row()) + result = _incident_to_response(row) + assert "2024-06-01" in result["detected_at"] + + +# ============================================================================= +# Create Incident Tests +# ============================================================================= + +class TestCreateIncident: + def test_create_basic(self, mock_db): + created_row = make_incident_row() + mock_db.execute.return_value = _DictResult([created_row]) + + resp = client.post("/incidents", json={ + "title": "Datenpanne Test", + "description": "Test", + }, headers={"x-tenant-id": DEFAULT_TENANT, "x-user-id": "admin"}) + + assert resp.status_code == 200 + data = resp.json() + assert "incident" in data + assert "authority_deadline" in data + assert "hours_until_deadline" in data + + def test_create_with_detected_at(self, mock_db): + created_row = make_incident_row() + mock_db.execute.side_effect = [ + None, # INSERT + _DictResult([created_row]), # SELECT back + ] + + resp = client.post("/incidents", json={ + "title": "Panne 2", + "detected_at": "2024-06-01T10:00:00", + }) + assert resp.status_code == 200 + assert "authority_deadline" in resp.json() + + def test_create_missing_title(self, mock_db): + resp = client.post("/incidents", json={}) + assert resp.status_code == 422 + + +# ============================================================================= +# List Incidents Tests +# ============================================================================= + +class TestListIncidents: + def test_list_empty(self, mock_db): + mock_db.execute.side_effect = [ + _DictResult(scalar_val=0), + _DictResult([]), + ] + resp = client.get("/incidents", headers={"x-tenant-id": DEFAULT_TENANT}) + assert resp.status_code == 200 + data = resp.json() + assert data["incidents"] == [] + assert data["total"] == 0 + + def test_list_with_items(self, mock_db): + row = make_incident_row() + mock_db.execute.side_effect = [ + _DictResult(scalar_val=1), + _DictResult([row]), + ] + resp = client.get("/incidents") + assert resp.status_code == 200 + data = resp.json() + assert len(data["incidents"]) == 1 + assert data["total"] == 1 + + def test_list_filter_by_status(self, mock_db): + mock_db.execute.side_effect = [ + _DictResult(scalar_val=0), + _DictResult([]), + ] + resp = client.get("/incidents?status=closed") + assert resp.status_code == 200 + + +# ============================================================================= +# Get Incident Tests +# ============================================================================= + +class TestGetIncident: + def test_get_existing(self, mock_db): + row = make_incident_row() + mock_db.execute.side_effect = [ + _DictResult([row]), # incident + _DictResult([]), # measures + ] + resp = client.get(f"/incidents/{INCIDENT_ID}") + assert resp.status_code == 200 + data = resp.json() + assert "incident" in data + assert "measures" in data + assert "deadline_info" in data + + def test_get_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.get(f"/incidents/{UNKNOWN_ID}") + assert resp.status_code == 404 + + +# ============================================================================= +# Update Incident Tests +# ============================================================================= + +class TestUpdateIncident: + def test_update_title(self, mock_db): + row = make_incident_row({"title": "Updated Title"}) + mock_db.execute.side_effect = [ + _DictResult([{"id": INCIDENT_ID}]), # check exists + None, # update + _DictResult([row]), # fetch back + ] + resp = client.put(f"/incidents/{INCIDENT_ID}", json={"title": "Updated Title"}) + assert resp.status_code == 200 + assert resp.json()["incident"]["title"] == "Updated Title" + + def test_update_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.put(f"/incidents/{UNKNOWN_ID}", json={"title": "X"}) + assert resp.status_code == 404 + + +# ============================================================================= +# Delete Incident Tests +# ============================================================================= + +class TestDeleteIncident: + def test_delete_existing(self, mock_db): + mock_db.execute.side_effect = [ + _DictResult([{"id": INCIDENT_ID}]), # check exists + None, # delete measures + None, # delete incident + ] + resp = client.delete(f"/incidents/{INCIDENT_ID}") + assert resp.status_code == 200 + assert resp.json()["message"] == "incident deleted" + + def test_delete_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.delete(f"/incidents/{UNKNOWN_ID}") + assert resp.status_code == 404 + + +# ============================================================================= +# Status Update Tests (NEW endpoint) +# ============================================================================= + +class TestStatusUpdate: + def test_update_status(self, mock_db): + row = make_incident_row({"status": "assessment"}) + mock_db.execute.side_effect = [ + _DictResult([{"id": INCIDENT_ID}]), # check exists + None, # update status + None, # append timeline + _DictResult([row]), # fetch back + ] + resp = client.put(f"/incidents/{INCIDENT_ID}/status", json={"status": "assessment"}) + assert resp.status_code == 200 + assert resp.json()["incident"]["status"] == "assessment" + + def test_status_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.put(f"/incidents/{UNKNOWN_ID}/status", json={"status": "closed"}) + assert resp.status_code == 404 + + +# ============================================================================= +# Risk Assessment Tests +# ============================================================================= + +class TestAssessRisk: + def test_low_risk(self, mock_db): + row = make_incident_row() + mock_db.execute.side_effect = [ + _DictResult([row]), # check exists + None, # update risk_assessment + status + None, # append timeline + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/assess-risk", json={ + "likelihood": 1, + "impact": 2, + "notes": "Low risk", + }) + assert resp.status_code == 200 + data = resp.json() + assert data["risk_assessment"]["risk_level"] == "low" + assert data["notification_required"] is False + + def test_high_risk_requires_notification(self, mock_db): + row = make_incident_row() + mock_db.execute.side_effect = [ + _DictResult([row]), # check exists + None, # update authority_notification + None, # update risk_assessment + status + None, # append timeline + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/assess-risk", json={ + "likelihood": 4, + "impact": 4, + }) + assert resp.status_code == 200 + data = resp.json() + assert data["risk_assessment"]["risk_level"] == "high" + assert data["notification_required"] is True + assert data["incident_status"] == "notification_required" + + def test_critical_risk(self, mock_db): + row = make_incident_row() + mock_db.execute.side_effect = [ + _DictResult([row]), + None, None, None, + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/assess-risk", json={ + "likelihood": 5, + "impact": 5, + }) + assert resp.status_code == 200 + assert resp.json()["risk_assessment"]["risk_level"] == "critical" + + def test_assess_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.post(f"/incidents/{UNKNOWN_ID}/assess-risk", json={ + "likelihood": 3, "impact": 3, + }) + assert resp.status_code == 404 + + +# ============================================================================= +# Authority Notification Tests (Art. 33) +# ============================================================================= + +class TestNotifyAuthority: + def test_notify_authority(self, mock_db): + row = make_incident_row() + mock_db.execute.side_effect = [ + _DictResult([row]), # check exists + None, # update authority_notification + status + None, # append timeline + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/notify-authority", json={ + "authority_name": "LfD Bayern", + "reference_number": "REF-2024-001", + }) + assert resp.status_code == 200 + data = resp.json() + assert data["authority_notification"]["authority_name"] == "LfD Bayern" + assert "submitted_within_72h" in data + + def test_notify_authority_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.post(f"/incidents/{UNKNOWN_ID}/notify-authority", json={ + "authority_name": "Test", + }) + assert resp.status_code == 404 + + +# ============================================================================= +# Data Subject Notification Tests (Art. 34) +# ============================================================================= + +class TestNotifySubjects: + def test_notify_subjects(self, mock_db): + row = make_incident_row() + mock_db.execute.side_effect = [ + _DictResult([row]), + None, None, + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/notify-subjects", json={ + "notification_text": "Ihre Daten waren betroffen", + "channel": "email", + }) + assert resp.status_code == 200 + data = resp.json() + assert data["data_subject_notification"]["status"] == "sent" + assert data["data_subject_notification"]["channel"] == "email" + + +# ============================================================================= +# Measures Tests +# ============================================================================= + +class TestMeasures: + def test_add_measure(self, mock_db): + measure_row = make_measure_row() + mock_db.execute.side_effect = [ + _DictResult([{"id": INCIDENT_ID}]), # check exists + None, # insert measure + None, # append timeline + _DictResult([measure_row]), # fetch measure + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/measures", json={ + "title": "Passwort Reset", + "measure_type": "corrective", + }) + assert resp.status_code == 200 + assert resp.json()["measure"]["title"] == "Passwort Reset" + + def test_add_measure_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.post(f"/incidents/{UNKNOWN_ID}/measures", json={ + "title": "Test", + }) + assert resp.status_code == 404 + + def test_update_measure(self, mock_db): + updated_row = make_measure_row({"title": "Neuer Titel"}) + mock_db.execute.side_effect = [ + _DictResult([{"id": MEASURE_ID}]), # check exists + None, # update + _DictResult([updated_row]), # fetch back + ] + resp = client.put( + f"/incidents/{INCIDENT_ID}/measures/{MEASURE_ID}", + json={"title": "Neuer Titel"}, + ) + assert resp.status_code == 200 + assert resp.json()["measure"]["title"] == "Neuer Titel" + + def test_complete_measure(self, mock_db): + mock_db.execute.side_effect = [ + _DictResult([{"id": MEASURE_ID}]), + None, + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/measures/{MEASURE_ID}/complete") + assert resp.status_code == 200 + assert resp.json()["message"] == "measure completed" + + def test_measures_in_get_response(self, mock_db): + inc_row = make_incident_row() + meas_row = make_measure_row() + mock_db.execute.side_effect = [ + _DictResult([inc_row]), + _DictResult([meas_row]), + ] + resp = client.get(f"/incidents/{INCIDENT_ID}") + assert resp.status_code == 200 + assert len(resp.json()["measures"]) == 1 + + +# ============================================================================= +# Timeline Tests +# ============================================================================= + +class TestTimeline: + def test_add_timeline_entry(self, mock_db): + mock_db.execute.side_effect = [ + _DictResult([{"id": INCIDENT_ID}]), + None, + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/timeline", json={ + "action": "investigation_started", + "details": "Forensic analysis begun", + }) + assert resp.status_code == 200 + data = resp.json() + assert data["timeline_entry"]["action"] == "investigation_started" + + def test_timeline_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.post(f"/incidents/{UNKNOWN_ID}/timeline", json={ + "action": "test", + }) + assert resp.status_code == 404 + + def test_auto_timeline_on_create(self, mock_db): + """Create endpoint auto-adds incident_created timeline entry.""" + row = make_incident_row() + mock_db.execute.return_value = _DictResult([row]) + + resp = client.post("/incidents", json={"title": "Test"}) + assert resp.status_code == 200 + # The insert SQL includes a timeline with incident_created entry + insert_call = mock_db.execute.call_args_list[0] + sql_text = str(insert_call[0][0]) + assert "incident_incidents" in sql_text + + +# ============================================================================= +# Close Incident Tests +# ============================================================================= + +class TestCloseIncident: + def test_close_with_root_cause(self, mock_db): + mock_db.execute.side_effect = [ + _DictResult([{"id": INCIDENT_ID}]), + None, # update + None, # timeline + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/close", json={ + "root_cause": "Schwache Passwoerter", + "lessons_learned": "2FA einfuehren", + }) + assert resp.status_code == 200 + data = resp.json() + assert data["message"] == "incident closed" + assert data["root_cause"] == "Schwache Passwoerter" + + def test_close_minimal(self, mock_db): + mock_db.execute.side_effect = [ + _DictResult([{"id": INCIDENT_ID}]), + None, None, + ] + resp = client.post(f"/incidents/{INCIDENT_ID}/close", json={ + "root_cause": "Unbekannt", + }) + assert resp.status_code == 200 + + def test_close_not_found(self, mock_db): + mock_db.execute.return_value = _DictResult([]) + resp = client.post(f"/incidents/{UNKNOWN_ID}/close", json={ + "root_cause": "Test", + }) + assert resp.status_code == 404 + + +# ============================================================================= +# Statistics Tests +# ============================================================================= + +class TestStatistics: + def test_empty_stats(self, mock_db): + stats_row = { + "total": 0, "open": 0, "closed": 0, + "critical": 0, "high": 0, "medium": 0, "low": 0, + } + mock_db.execute.return_value = _DictResult([stats_row]) + resp = client.get("/incidents/stats") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["by_severity"]["critical"] == 0 + + def test_stats_with_data(self, mock_db): + stats_row = { + "total": 5, "open": 3, "closed": 2, + "critical": 1, "high": 2, "medium": 1, "low": 1, + } + mock_db.execute.return_value = _DictResult([stats_row]) + resp = client.get("/incidents/stats") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 5 + assert data["open"] == 3 + assert data["closed"] == 2 + assert data["by_severity"]["critical"] == 1 + + def test_stats_structure(self, mock_db): + stats_row = { + "total": 1, "open": 1, "closed": 0, + "critical": 0, "high": 0, "medium": 1, "low": 0, + } + mock_db.execute.return_value = _DictResult([stats_row]) + resp = client.get("/incidents/stats") + data = resp.json() + assert set(data.keys()) == {"total", "open", "closed", "by_severity"} + assert set(data["by_severity"].keys()) == {"critical", "high", "medium", "low"}