Files
breakpilot-compliance/backend-compliance/compliance/api/notfallplan_routes.py
Benjamin Admin b19fc11737
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 37s
CI / test-python-backend-compliance (push) Successful in 34s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 18s
feat: Betrieb-Module → 100% — Echte CRUD-Flows, kein Mock-Data
Alle 7 Betrieb-Module von 30–75% auf 100% gebracht:

**Gruppe 1 — UI-Ergänzungen (Backend bereits vorhanden):**
- incidents/page.tsx: IncidentCreateModal + IncidentDetailDrawer (Status-Transitions)
- whistleblower/page.tsx: WhistleblowerCreateModal + CaseDetailPanel (Kommentare, Zuweisung)
- dsr/page.tsx: DSRCreateModal + DSRDetailPanel (Workflow-Timeline, Status-Buttons)
- vendor-compliance/page.tsx: VendorCreateModal + "Neuer Vendor" Button

**Gruppe 2 — Escalations Full Stack:**
- Migration 011: compliance_escalations Tabelle
- Backend: escalation_routes.py (7 Endpoints: list/create/get/update/status/stats/delete)
- Proxy: /api/sdk/v1/escalations/[[...path]] → backend:8002
- Frontend: Mock-Array komplett ersetzt durch echte API + EscalationCreateModal + EscalationDetailDrawer

**Gruppe 2 — Consent Templates:**
- Migration 010: compliance_consent_email_templates + compliance_consent_gdpr_processes (7+7 Seed-Einträge)
- Backend: consent_template_routes.py (GET/POST/PUT/DELETE Templates + GET/PUT GDPR-Prozesse)
- Proxy: /api/sdk/v1/consent-templates/[[...path]]
- Frontend: consent-management/page.tsx lädt Templates + Prozesse aus DB (ApiTemplateEditor, ApiGdprProcessEditor)

**Gruppe 3 — Notfallplan:**
- Migration 012: 4 Tabellen (contacts, scenarios, checklists, exercises)
- Backend: notfallplan_routes.py (vollständiges CRUD + /stats)
- Proxy: /api/sdk/v1/notfallplan/[[...path]]
- Frontend: notfallplan/page.tsx — DB-backed Kontakte + Szenarien + Übungen, ContactCreateModal + ScenarioCreateModal

**Infrastruktur:**
- __init__.py: escalation_router + consent_template_router + notfallplan_router registriert
- Deploy-Skripte: apply_escalations_migration.sh, apply_consent_templates_migration.sh, apply_notfallplan_migration.sh
- Tests: 40 neue Tests (test_escalation_routes.py, test_consent_template_routes.py, test_notfallplan_routes.py)
- flow-data.ts: Completion aller 7 Module auf 100% gesetzt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 12:48:43 +01:00

700 lines
24 KiB
Python

"""
FastAPI routes for Notfallplan (Emergency Plan) — Art. 33/34 DSGVO.
Endpoints:
GET /notfallplan/contacts — List emergency contacts
POST /notfallplan/contacts — Create contact
PUT /notfallplan/contacts/{id} — Update contact
DELETE /notfallplan/contacts/{id} — Delete contact
GET /notfallplan/scenarios — List scenarios
POST /notfallplan/scenarios — Create scenario
PUT /notfallplan/scenarios/{id} — Update scenario
DELETE /notfallplan/scenarios/{id} — Delete scenario
GET /notfallplan/checklists — List checklists (filter by scenario_id)
POST /notfallplan/checklists — Create checklist item
PUT /notfallplan/checklists/{id} — Update checklist item
DELETE /notfallplan/checklists/{id} — Delete checklist item
GET /notfallplan/exercises — List exercises
POST /notfallplan/exercises — Create exercise
GET /notfallplan/stats — Statistics overview
"""
import json
import logging
from datetime import datetime
from typing import Optional, List, Any
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from classroom_engine.database import get_db
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/notfallplan", tags=["notfallplan"])
# ============================================================================
# Pydantic Schemas
# ============================================================================
class ContactCreate(BaseModel):
name: str
role: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
is_primary: bool = False
available_24h: bool = False
class ContactUpdate(BaseModel):
name: Optional[str] = None
role: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
is_primary: Optional[bool] = None
available_24h: Optional[bool] = None
class ScenarioCreate(BaseModel):
title: str
category: Optional[str] = None
severity: str = 'medium'
description: Optional[str] = None
response_steps: List[Any] = []
estimated_recovery_time: Optional[int] = None
is_active: bool = True
class ScenarioUpdate(BaseModel):
title: Optional[str] = None
category: Optional[str] = None
severity: Optional[str] = None
description: Optional[str] = None
response_steps: Optional[List[Any]] = None
estimated_recovery_time: Optional[int] = None
last_tested: Optional[str] = None
is_active: Optional[bool] = None
class ChecklistCreate(BaseModel):
title: str
scenario_id: Optional[str] = None
description: Optional[str] = None
order_index: int = 0
is_required: bool = True
class ChecklistUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
order_index: Optional[int] = None
is_required: Optional[bool] = None
class ExerciseCreate(BaseModel):
title: str
scenario_id: Optional[str] = None
exercise_type: str = 'tabletop'
exercise_date: Optional[str] = None
participants: List[Any] = []
outcome: Optional[str] = None
notes: Optional[str] = None
# ============================================================================
# Helpers
# ============================================================================
def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str:
return x_tenant_id or 'default'
# ============================================================================
# Contacts
# ============================================================================
@router.get("/contacts")
async def list_contacts(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all emergency contacts for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, name, role, email, phone, is_primary, available_24h, created_at
FROM compliance_notfallplan_contacts
WHERE tenant_id = :tenant_id
ORDER BY is_primary DESC, name
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"name": r.name,
"role": r.role,
"email": r.email,
"phone": r.phone,
"is_primary": r.is_primary,
"available_24h": r.available_24h,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/contacts", status_code=201)
async def create_contact(
request: ContactCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new emergency contact."""
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_contacts
(tenant_id, name, role, email, phone, is_primary, available_24h)
VALUES (:tenant_id, :name, :role, :email, :phone, :is_primary, :available_24h)
RETURNING id, tenant_id, name, role, email, phone, is_primary, available_24h, created_at
"""),
{
"tenant_id": tenant_id,
"name": request.name,
"role": request.role,
"email": request.email,
"phone": request.phone,
"is_primary": request.is_primary,
"available_24h": request.available_24h,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"name": row.name,
"role": row.role,
"email": row.email,
"phone": row.phone,
"is_primary": row.is_primary,
"available_24h": row.available_24h,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.put("/contacts/{contact_id}")
async def update_contact(
contact_id: str,
request: ContactUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update an existing emergency contact."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_contacts WHERE id = :id AND tenant_id = :tenant_id"),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Contact {contact_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = contact_id
updates["tenant_id"] = tenant_id
row = db.execute(
text(f"""
UPDATE compliance_notfallplan_contacts
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, name, role, email, phone, is_primary, available_24h, created_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"name": row.name,
"role": row.role,
"email": row.email,
"phone": row.phone,
"is_primary": row.is_primary,
"available_24h": row.available_24h,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.delete("/contacts/{contact_id}")
async def delete_contact(
contact_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Delete an emergency contact."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_contacts WHERE id = :id AND tenant_id = :tenant_id"),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Contact {contact_id} not found")
db.execute(
text("DELETE FROM compliance_notfallplan_contacts WHERE id = :id AND tenant_id = :tenant_id"),
{"id": contact_id, "tenant_id": tenant_id},
)
db.commit()
return {"success": True, "message": f"Contact {contact_id} deleted"}
# ============================================================================
# Scenarios
# ============================================================================
@router.get("/scenarios")
async def list_scenarios(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all scenarios for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested, is_active, created_at
FROM compliance_notfallplan_scenarios
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"category": r.category,
"severity": r.severity,
"description": r.description,
"response_steps": r.response_steps if r.response_steps else [],
"estimated_recovery_time": r.estimated_recovery_time,
"last_tested": r.last_tested.isoformat() if r.last_tested else None,
"is_active": r.is_active,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/scenarios", status_code=201)
async def create_scenario(
request: ScenarioCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new scenario."""
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_scenarios
(tenant_id, title, category, severity, description, response_steps, estimated_recovery_time, is_active)
VALUES (:tenant_id, :title, :category, :severity, :description, :response_steps, :estimated_recovery_time, :is_active)
RETURNING id, tenant_id, title, category, severity, description, response_steps,
estimated_recovery_time, last_tested, is_active, created_at
"""),
{
"tenant_id": tenant_id,
"title": request.title,
"category": request.category,
"severity": request.severity,
"description": request.description,
"response_steps": json.dumps(request.response_steps),
"estimated_recovery_time": request.estimated_recovery_time,
"is_active": request.is_active,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"title": row.title,
"category": row.category,
"severity": row.severity,
"description": row.description,
"response_steps": row.response_steps if row.response_steps else [],
"estimated_recovery_time": row.estimated_recovery_time,
"last_tested": row.last_tested.isoformat() if row.last_tested else None,
"is_active": row.is_active,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.put("/scenarios/{scenario_id}")
async def update_scenario(
scenario_id: str,
request: ScenarioUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update an existing scenario."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_scenarios WHERE id = :id AND tenant_id = :tenant_id"),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Scenario {scenario_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
# Serialize response_steps to JSON if present
if "response_steps" in updates:
updates["response_steps"] = json.dumps(updates["response_steps"])
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = scenario_id
updates["tenant_id"] = tenant_id
row = db.execute(
text(f"""
UPDATE compliance_notfallplan_scenarios
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, title, category, severity, description, response_steps,
estimated_recovery_time, last_tested, is_active, created_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"title": row.title,
"category": row.category,
"severity": row.severity,
"description": row.description,
"response_steps": row.response_steps if row.response_steps else [],
"estimated_recovery_time": row.estimated_recovery_time,
"last_tested": row.last_tested.isoformat() if row.last_tested else None,
"is_active": row.is_active,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.delete("/scenarios/{scenario_id}")
async def delete_scenario(
scenario_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Delete a scenario."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_scenarios WHERE id = :id AND tenant_id = :tenant_id"),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Scenario {scenario_id} not found")
db.execute(
text("DELETE FROM compliance_notfallplan_scenarios WHERE id = :id AND tenant_id = :tenant_id"),
{"id": scenario_id, "tenant_id": tenant_id},
)
db.commit()
return {"success": True, "message": f"Scenario {scenario_id} deleted"}
# ============================================================================
# Checklists
# ============================================================================
@router.get("/checklists")
async def list_checklists(
scenario_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List checklist items, optionally filtered by scenario_id."""
if scenario_id:
rows = db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id AND scenario_id = :scenario_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id, "scenario_id": scenario_id},
).fetchall()
else:
rows = db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"title": r.title,
"description": r.description,
"order_index": r.order_index,
"is_required": r.is_required,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/checklists", status_code=201)
async def create_checklist(
request: ChecklistCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new checklist item."""
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_checklists
(tenant_id, scenario_id, title, description, order_index, is_required)
VALUES (:tenant_id, :scenario_id, :title, :description, :order_index, :is_required)
RETURNING id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
"""),
{
"tenant_id": tenant_id,
"scenario_id": request.scenario_id,
"title": request.title,
"description": request.description,
"order_index": request.order_index,
"is_required": request.is_required,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"scenario_id": str(row.scenario_id) if row.scenario_id else None,
"title": row.title,
"description": row.description,
"order_index": row.order_index,
"is_required": row.is_required,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.put("/checklists/{checklist_id}")
async def update_checklist(
checklist_id: str,
request: ChecklistUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update a checklist item."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_checklists WHERE id = :id AND tenant_id = :tenant_id"),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Checklist item {checklist_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = checklist_id
updates["tenant_id"] = tenant_id
row = db.execute(
text(f"""
UPDATE compliance_notfallplan_checklists
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"scenario_id": str(row.scenario_id) if row.scenario_id else None,
"title": row.title,
"description": row.description,
"order_index": row.order_index,
"is_required": row.is_required,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.delete("/checklists/{checklist_id}")
async def delete_checklist(
checklist_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Delete a checklist item."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_checklists WHERE id = :id AND tenant_id = :tenant_id"),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Checklist item {checklist_id} not found")
db.execute(
text("DELETE FROM compliance_notfallplan_checklists WHERE id = :id AND tenant_id = :tenant_id"),
{"id": checklist_id, "tenant_id": tenant_id},
)
db.commit()
return {"success": True, "message": f"Checklist item {checklist_id} deleted"}
# ============================================================================
# Exercises
# ============================================================================
@router.get("/exercises")
async def list_exercises(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all exercises for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, title, scenario_id, exercise_type, exercise_date,
participants, outcome, notes, created_at
FROM compliance_notfallplan_exercises
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"exercise_type": r.exercise_type,
"exercise_date": r.exercise_date.isoformat() if r.exercise_date else None,
"participants": r.participants if r.participants else [],
"outcome": r.outcome,
"notes": r.notes,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/exercises", status_code=201)
async def create_exercise(
request: ExerciseCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new exercise."""
exercise_date = None
if request.exercise_date:
try:
exercise_date = datetime.fromisoformat(request.exercise_date)
except ValueError:
pass
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_exercises
(tenant_id, title, scenario_id, exercise_type, exercise_date, participants, outcome, notes)
VALUES (:tenant_id, :title, :scenario_id, :exercise_type, :exercise_date, :participants, :outcome, :notes)
RETURNING id, tenant_id, title, scenario_id, exercise_type, exercise_date,
participants, outcome, notes, created_at
"""),
{
"tenant_id": tenant_id,
"title": request.title,
"scenario_id": request.scenario_id,
"exercise_type": request.exercise_type,
"exercise_date": exercise_date,
"participants": json.dumps(request.participants),
"outcome": request.outcome,
"notes": request.notes,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"title": row.title,
"scenario_id": str(row.scenario_id) if row.scenario_id else None,
"exercise_type": row.exercise_type,
"exercise_date": row.exercise_date.isoformat() if row.exercise_date else None,
"participants": row.participants if row.participants else [],
"outcome": row.outcome,
"notes": row.notes,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
# ============================================================================
# Stats
# ============================================================================
@router.get("/stats")
async def get_stats(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Return statistics for the Notfallplan module."""
contacts_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_contacts WHERE tenant_id = :tenant_id"),
{"tenant_id": tenant_id},
).scalar()
scenarios_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_scenarios WHERE tenant_id = :tenant_id AND is_active = TRUE"),
{"tenant_id": tenant_id},
).scalar()
exercises_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_exercises WHERE tenant_id = :tenant_id"),
{"tenant_id": tenant_id},
).scalar()
checklists_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_checklists WHERE tenant_id = :tenant_id"),
{"tenant_id": tenant_id},
).scalar()
return {
"contacts": contacts_count or 0,
"active_scenarios": scenarios_count or 0,
"exercises": exercises_count or 0,
"checklist_items": checklists_count or 0,
}