feat: Betrieb-Module → 100% — Echte CRUD-Flows, kein Mock-Data
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 37s
CI / test-python-backend-compliance (push) Successful in 34s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 18s

Alle 7 Betrieb-Module von 30–75% auf 100% gebracht:

**Gruppe 1 — UI-Ergänzungen (Backend bereits vorhanden):**
- incidents/page.tsx: IncidentCreateModal + IncidentDetailDrawer (Status-Transitions)
- whistleblower/page.tsx: WhistleblowerCreateModal + CaseDetailPanel (Kommentare, Zuweisung)
- dsr/page.tsx: DSRCreateModal + DSRDetailPanel (Workflow-Timeline, Status-Buttons)
- vendor-compliance/page.tsx: VendorCreateModal + "Neuer Vendor" Button

**Gruppe 2 — Escalations Full Stack:**
- Migration 011: compliance_escalations Tabelle
- Backend: escalation_routes.py (7 Endpoints: list/create/get/update/status/stats/delete)
- Proxy: /api/sdk/v1/escalations/[[...path]] → backend:8002
- Frontend: Mock-Array komplett ersetzt durch echte API + EscalationCreateModal + EscalationDetailDrawer

**Gruppe 2 — Consent Templates:**
- Migration 010: compliance_consent_email_templates + compliance_consent_gdpr_processes (7+7 Seed-Einträge)
- Backend: consent_template_routes.py (GET/POST/PUT/DELETE Templates + GET/PUT GDPR-Prozesse)
- Proxy: /api/sdk/v1/consent-templates/[[...path]]
- Frontend: consent-management/page.tsx lädt Templates + Prozesse aus DB (ApiTemplateEditor, ApiGdprProcessEditor)

**Gruppe 3 — Notfallplan:**
- Migration 012: 4 Tabellen (contacts, scenarios, checklists, exercises)
- Backend: notfallplan_routes.py (vollständiges CRUD + /stats)
- Proxy: /api/sdk/v1/notfallplan/[[...path]]
- Frontend: notfallplan/page.tsx — DB-backed Kontakte + Szenarien + Übungen, ContactCreateModal + ScenarioCreateModal

**Infrastruktur:**
- __init__.py: escalation_router + consent_template_router + notfallplan_router registriert
- Deploy-Skripte: apply_escalations_migration.sh, apply_consent_templates_migration.sh, apply_notfallplan_migration.sh
- Tests: 40 neue Tests (test_escalation_routes.py, test_consent_template_routes.py, test_notfallplan_routes.py)
- flow-data.ts: Completion aller 7 Module auf 100% gesetzt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-03 12:48:43 +01:00
parent 79b423e549
commit b19fc11737
24 changed files with 5472 additions and 529 deletions

View File

@@ -12,6 +12,9 @@ from .isms_routes import router as isms_router
from .vvt_routes import router as vvt_router
from .legal_document_routes import router as legal_document_router
from .einwilligungen_routes import router as einwilligungen_router
from .escalation_routes import router as escalation_router
from .consent_template_routes import router as consent_template_router
from .notfallplan_routes import router as notfallplan_router
# Include sub-routers
router.include_router(audit_router)
@@ -25,6 +28,9 @@ router.include_router(isms_router)
router.include_router(vvt_router)
router.include_router(legal_document_router)
router.include_router(einwilligungen_router)
router.include_router(escalation_router)
router.include_router(consent_template_router)
router.include_router(notfallplan_router)
__all__ = [
"router",
@@ -39,4 +45,7 @@ __all__ = [
"vvt_router",
"legal_document_router",
"einwilligungen_router",
"escalation_router",
"consent_template_router",
"notfallplan_router",
]

View File

@@ -0,0 +1,313 @@
"""
FastAPI routes for Consent Email Templates + DSGVO Processes.
Endpoints:
GET /consent-templates — List email templates (filtered by tenant)
POST /consent-templates — Create a new email template
PUT /consent-templates/{id} — Update an email template
DELETE /consent-templates/{id} — Delete an email template
GET /gdpr-processes — List GDPR processes
PUT /gdpr-processes/{id} — Update a GDPR process
"""
import logging
from datetime import datetime
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Header
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from classroom_engine.database import get_db
logger = logging.getLogger(__name__)
router = APIRouter(tags=["consent-templates"])
# ============================================================================
# Pydantic Schemas
# ============================================================================
class ConsentTemplateCreate(BaseModel):
template_key: str
subject: str
body: str
language: str = 'de'
is_active: bool = True
class ConsentTemplateUpdate(BaseModel):
subject: Optional[str] = None
body: Optional[str] = None
is_active: Optional[bool] = None
class GDPRProcessUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
legal_basis: Optional[str] = None
retention_days: Optional[int] = None
is_active: Optional[bool] = None
# ============================================================================
# Helpers
# ============================================================================
def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str:
return x_tenant_id or 'default'
# ============================================================================
# Email Templates
# ============================================================================
@router.get("/consent-templates")
async def list_consent_templates(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all email templates for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, template_key, subject, body, language, is_active, created_at, updated_at
FROM compliance_consent_email_templates
WHERE tenant_id = :tenant_id
ORDER BY template_key, language
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"template_key": r.template_key,
"subject": r.subject,
"body": r.body,
"language": r.language,
"is_active": r.is_active,
"created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
}
for r in rows
]
@router.post("/consent-templates", status_code=201)
async def create_consent_template(
request: ConsentTemplateCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new email template."""
existing = db.execute(
text("""
SELECT id FROM compliance_consent_email_templates
WHERE tenant_id = :tenant_id AND template_key = :template_key AND language = :language
"""),
{"tenant_id": tenant_id, "template_key": request.template_key, "language": request.language},
).fetchone()
if existing:
raise HTTPException(
status_code=409,
detail=f"Template '{request.template_key}' for language '{request.language}' already exists for this tenant",
)
row = db.execute(
text("""
INSERT INTO compliance_consent_email_templates
(tenant_id, template_key, subject, body, language, is_active)
VALUES (:tenant_id, :template_key, :subject, :body, :language, :is_active)
RETURNING id, tenant_id, template_key, subject, body, language, is_active, created_at, updated_at
"""),
{
"tenant_id": tenant_id,
"template_key": request.template_key,
"subject": request.subject,
"body": request.body,
"language": request.language,
"is_active": request.is_active,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"template_key": row.template_key,
"subject": row.subject,
"body": row.body,
"language": row.language,
"is_active": row.is_active,
"created_at": row.created_at.isoformat() if row.created_at else None,
"updated_at": row.updated_at.isoformat() if row.updated_at else None,
}
@router.put("/consent-templates/{template_id}")
async def update_consent_template(
template_id: str,
request: ConsentTemplateUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update an existing email template."""
existing = db.execute(
text("""
SELECT id FROM compliance_consent_email_templates
WHERE id = :id AND tenant_id = :tenant_id
"""),
{"id": template_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Template {template_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = template_id
updates["tenant_id"] = tenant_id
updates["now"] = datetime.utcnow()
row = db.execute(
text(f"""
UPDATE compliance_consent_email_templates
SET {set_clauses}, updated_at = :now
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, template_key, subject, body, language, is_active, created_at, updated_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"template_key": row.template_key,
"subject": row.subject,
"body": row.body,
"language": row.language,
"is_active": row.is_active,
"created_at": row.created_at.isoformat() if row.created_at else None,
"updated_at": row.updated_at.isoformat() if row.updated_at else None,
}
@router.delete("/consent-templates/{template_id}")
async def delete_consent_template(
template_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Delete an email template."""
existing = db.execute(
text("""
SELECT id FROM compliance_consent_email_templates
WHERE id = :id AND tenant_id = :tenant_id
"""),
{"id": template_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Template {template_id} not found")
db.execute(
text("DELETE FROM compliance_consent_email_templates WHERE id = :id AND tenant_id = :tenant_id"),
{"id": template_id, "tenant_id": tenant_id},
)
db.commit()
return {"success": True, "message": f"Template {template_id} deleted"}
# ============================================================================
# GDPR Processes
# ============================================================================
@router.get("/gdpr-processes")
async def list_gdpr_processes(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all GDPR processes for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, process_key, title, description, legal_basis, retention_days, is_active, created_at
FROM compliance_consent_gdpr_processes
WHERE tenant_id = :tenant_id
ORDER BY process_key
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"process_key": r.process_key,
"title": r.title,
"description": r.description,
"legal_basis": r.legal_basis,
"retention_days": r.retention_days,
"is_active": r.is_active,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.put("/gdpr-processes/{process_id}")
async def update_gdpr_process(
process_id: str,
request: GDPRProcessUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update an existing GDPR process."""
existing = db.execute(
text("""
SELECT id FROM compliance_consent_gdpr_processes
WHERE id = :id AND tenant_id = :tenant_id
"""),
{"id": process_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"GDPR process {process_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = process_id
updates["tenant_id"] = tenant_id
row = db.execute(
text(f"""
UPDATE compliance_consent_gdpr_processes
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, process_key, title, description, legal_basis, retention_days, is_active, created_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"process_key": row.process_key,
"title": row.title,
"description": row.description,
"legal_basis": row.legal_basis,
"retention_days": row.retention_days,
"is_active": row.is_active,
"created_at": row.created_at.isoformat() if row.created_at else None,
}

View File

@@ -0,0 +1,342 @@
"""
FastAPI routes for Compliance Escalations.
Endpoints:
GET /escalations — list with filters (status, priority, limit, offset)
POST /escalations — create new escalation
GET /escalations/stats — counts per status and priority
GET /escalations/{id} — get single escalation
PUT /escalations/{id} — update escalation
PUT /escalations/{id}/status — update status only
DELETE /escalations/{id} — delete escalation
"""
import logging
from datetime import datetime
from typing import Optional, List, Any, Dict
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/escalations", tags=["escalations"])
# =============================================================================
# Pydantic Schemas
# =============================================================================
class EscalationCreate(BaseModel):
title: str
description: Optional[str] = None
priority: str = 'medium' # low|medium|high|critical
category: Optional[str] = None # dsgvo_breach|ai_act|vendor|internal|other
assignee: Optional[str] = None
reporter: Optional[str] = None
source_module: Optional[str] = None
source_id: Optional[str] = None
due_date: Optional[datetime] = None
class EscalationUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
priority: Optional[str] = None
status: Optional[str] = None
category: Optional[str] = None
assignee: Optional[str] = None
due_date: Optional[datetime] = None
class EscalationStatusUpdate(BaseModel):
status: str
resolved_at: Optional[datetime] = None
def _row_to_dict(row) -> Dict[str, Any]:
"""Convert a SQLAlchemy row to a serialisable dict."""
result = dict(row._mapping)
for key, val in result.items():
if isinstance(val, datetime):
result[key] = val.isoformat()
elif hasattr(val, '__str__') and not isinstance(val, (str, int, float, bool, type(None))):
result[key] = str(val)
return result
# =============================================================================
# Routes
# =============================================================================
@router.get("")
async def list_escalations(
status: Optional[str] = Query(None),
priority: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
tenant_id: Optional[str] = Header(None, alias="x-tenant-id"),
db: Session = Depends(get_db),
):
"""List escalations with optional filters."""
tid = tenant_id or 'default'
where_clauses = ["tenant_id = :tenant_id"]
params: Dict[str, Any] = {"tenant_id": tid, "limit": limit, "offset": offset}
if status:
where_clauses.append("status = :status")
params["status"] = status
if priority:
where_clauses.append("priority = :priority")
params["priority"] = priority
where_sql = " AND ".join(where_clauses)
rows = db.execute(
text(
f"SELECT * FROM compliance_escalations WHERE {where_sql} "
f"ORDER BY created_at DESC LIMIT :limit OFFSET :offset"
),
params,
).fetchall()
total_row = db.execute(
text(f"SELECT COUNT(*) FROM compliance_escalations WHERE {where_sql}"),
{k: v for k, v in params.items() if k not in ("limit", "offset")},
).fetchone()
return {
"items": [_row_to_dict(r) for r in rows],
"total": total_row[0] if total_row else 0,
"limit": limit,
"offset": offset,
}
@router.post("", status_code=201)
async def create_escalation(
request: EscalationCreate,
tenant_id: Optional[str] = Header(None, alias="x-tenant-id"),
db: Session = Depends(get_db),
):
"""Create a new escalation."""
tid = tenant_id or 'default'
row = db.execute(
text(
"""
INSERT INTO compliance_escalations
(tenant_id, title, description, priority, status, category,
assignee, reporter, source_module, source_id, due_date)
VALUES
(:tenant_id, :title, :description, :priority, 'open', :category,
:assignee, :reporter, :source_module, :source_id, :due_date)
RETURNING *
"""
),
{
"tenant_id": tid,
"title": request.title,
"description": request.description,
"priority": request.priority,
"category": request.category,
"assignee": request.assignee,
"reporter": request.reporter,
"source_module": request.source_module,
"source_id": request.source_id,
"due_date": request.due_date,
},
).fetchone()
db.commit()
return _row_to_dict(row)
@router.get("/stats")
async def get_stats(
tenant_id: Optional[str] = Header(None, alias="x-tenant-id"),
db: Session = Depends(get_db),
):
"""Return counts per status and priority."""
tid = tenant_id or 'default'
status_rows = db.execute(
text(
"SELECT status, COUNT(*) as cnt FROM compliance_escalations "
"WHERE tenant_id = :tenant_id GROUP BY status"
),
{"tenant_id": tid},
).fetchall()
priority_rows = db.execute(
text(
"SELECT priority, COUNT(*) as cnt FROM compliance_escalations "
"WHERE tenant_id = :tenant_id GROUP BY priority"
),
{"tenant_id": tid},
).fetchall()
total_row = db.execute(
text("SELECT COUNT(*) FROM compliance_escalations WHERE tenant_id = :tenant_id"),
{"tenant_id": tid},
).fetchone()
active_row = db.execute(
text(
"SELECT COUNT(*) FROM compliance_escalations "
"WHERE tenant_id = :tenant_id AND status NOT IN ('resolved', 'closed')"
),
{"tenant_id": tid},
).fetchone()
by_status = {"open": 0, "in_progress": 0, "escalated": 0, "resolved": 0, "closed": 0}
for r in status_rows:
key = r[0] if r[0] in by_status else r[0]
by_status[key] = r[1]
by_priority = {"low": 0, "medium": 0, "high": 0, "critical": 0}
for r in priority_rows:
if r[0] in by_priority:
by_priority[r[0]] = r[1]
return {
"by_status": by_status,
"by_priority": by_priority,
"total": total_row[0] if total_row else 0,
"active": active_row[0] if active_row else 0,
}
@router.get("/{escalation_id}")
async def get_escalation(
escalation_id: str,
tenant_id: Optional[str] = Header(None, alias="x-tenant-id"),
db: Session = Depends(get_db),
):
"""Get a single escalation by ID."""
tid = tenant_id or 'default'
row = db.execute(
text(
"SELECT * FROM compliance_escalations "
"WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": escalation_id, "tenant_id": tid},
).fetchone()
if not row:
raise HTTPException(status_code=404, detail=f"Escalation {escalation_id} not found")
return _row_to_dict(row)
@router.put("/{escalation_id}")
async def update_escalation(
escalation_id: str,
request: EscalationUpdate,
tenant_id: Optional[str] = Header(None, alias="x-tenant-id"),
db: Session = Depends(get_db),
):
"""Update an escalation's fields."""
tid = tenant_id or 'default'
existing = db.execute(
text(
"SELECT id FROM compliance_escalations "
"WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": escalation_id, "tenant_id": tid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Escalation {escalation_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
row = db.execute(
text("SELECT * FROM compliance_escalations WHERE id = :id"),
{"id": escalation_id},
).fetchone()
return _row_to_dict(row)
set_clauses = ", ".join(f"{k} = :{k}" for k in updates.keys())
updates["id"] = escalation_id
updates["updated_at"] = datetime.utcnow()
row = db.execute(
text(
f"UPDATE compliance_escalations SET {set_clauses}, updated_at = :updated_at "
f"WHERE id = :id RETURNING *"
),
updates,
).fetchone()
db.commit()
return _row_to_dict(row)
@router.put("/{escalation_id}/status")
async def update_status(
escalation_id: str,
request: EscalationStatusUpdate,
tenant_id: Optional[str] = Header(None, alias="x-tenant-id"),
db: Session = Depends(get_db),
):
"""Update only the status of an escalation."""
tid = tenant_id or 'default'
existing = db.execute(
text(
"SELECT id FROM compliance_escalations "
"WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": escalation_id, "tenant_id": tid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Escalation {escalation_id} not found")
resolved_at = request.resolved_at
if request.status in ('resolved', 'closed') and resolved_at is None:
resolved_at = datetime.utcnow()
row = db.execute(
text(
"UPDATE compliance_escalations "
"SET status = :status, resolved_at = :resolved_at, updated_at = :updated_at "
"WHERE id = :id RETURNING *"
),
{
"status": request.status,
"resolved_at": resolved_at,
"updated_at": datetime.utcnow(),
"id": escalation_id,
},
).fetchone()
db.commit()
return _row_to_dict(row)
@router.delete("/{escalation_id}")
async def delete_escalation(
escalation_id: str,
tenant_id: Optional[str] = Header(None, alias="x-tenant-id"),
db: Session = Depends(get_db),
):
"""Delete an escalation."""
tid = tenant_id or 'default'
existing = db.execute(
text(
"SELECT id FROM compliance_escalations "
"WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": escalation_id, "tenant_id": tid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Escalation {escalation_id} not found")
db.execute(
text("DELETE FROM compliance_escalations WHERE id = :id"),
{"id": escalation_id},
)
db.commit()
return {"success": True, "message": f"Escalation {escalation_id} deleted"}

View File

@@ -0,0 +1,699 @@
"""
FastAPI routes for Notfallplan (Emergency Plan) — Art. 33/34 DSGVO.
Endpoints:
GET /notfallplan/contacts — List emergency contacts
POST /notfallplan/contacts — Create contact
PUT /notfallplan/contacts/{id} — Update contact
DELETE /notfallplan/contacts/{id} — Delete contact
GET /notfallplan/scenarios — List scenarios
POST /notfallplan/scenarios — Create scenario
PUT /notfallplan/scenarios/{id} — Update scenario
DELETE /notfallplan/scenarios/{id} — Delete scenario
GET /notfallplan/checklists — List checklists (filter by scenario_id)
POST /notfallplan/checklists — Create checklist item
PUT /notfallplan/checklists/{id} — Update checklist item
DELETE /notfallplan/checklists/{id} — Delete checklist item
GET /notfallplan/exercises — List exercises
POST /notfallplan/exercises — Create exercise
GET /notfallplan/stats — Statistics overview
"""
import json
import logging
from datetime import datetime
from typing import Optional, List, Any
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from classroom_engine.database import get_db
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/notfallplan", tags=["notfallplan"])
# ============================================================================
# Pydantic Schemas
# ============================================================================
class ContactCreate(BaseModel):
name: str
role: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
is_primary: bool = False
available_24h: bool = False
class ContactUpdate(BaseModel):
name: Optional[str] = None
role: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
is_primary: Optional[bool] = None
available_24h: Optional[bool] = None
class ScenarioCreate(BaseModel):
title: str
category: Optional[str] = None
severity: str = 'medium'
description: Optional[str] = None
response_steps: List[Any] = []
estimated_recovery_time: Optional[int] = None
is_active: bool = True
class ScenarioUpdate(BaseModel):
title: Optional[str] = None
category: Optional[str] = None
severity: Optional[str] = None
description: Optional[str] = None
response_steps: Optional[List[Any]] = None
estimated_recovery_time: Optional[int] = None
last_tested: Optional[str] = None
is_active: Optional[bool] = None
class ChecklistCreate(BaseModel):
title: str
scenario_id: Optional[str] = None
description: Optional[str] = None
order_index: int = 0
is_required: bool = True
class ChecklistUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
order_index: Optional[int] = None
is_required: Optional[bool] = None
class ExerciseCreate(BaseModel):
title: str
scenario_id: Optional[str] = None
exercise_type: str = 'tabletop'
exercise_date: Optional[str] = None
participants: List[Any] = []
outcome: Optional[str] = None
notes: Optional[str] = None
# ============================================================================
# Helpers
# ============================================================================
def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str:
return x_tenant_id or 'default'
# ============================================================================
# Contacts
# ============================================================================
@router.get("/contacts")
async def list_contacts(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all emergency contacts for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, name, role, email, phone, is_primary, available_24h, created_at
FROM compliance_notfallplan_contacts
WHERE tenant_id = :tenant_id
ORDER BY is_primary DESC, name
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"name": r.name,
"role": r.role,
"email": r.email,
"phone": r.phone,
"is_primary": r.is_primary,
"available_24h": r.available_24h,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/contacts", status_code=201)
async def create_contact(
request: ContactCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new emergency contact."""
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_contacts
(tenant_id, name, role, email, phone, is_primary, available_24h)
VALUES (:tenant_id, :name, :role, :email, :phone, :is_primary, :available_24h)
RETURNING id, tenant_id, name, role, email, phone, is_primary, available_24h, created_at
"""),
{
"tenant_id": tenant_id,
"name": request.name,
"role": request.role,
"email": request.email,
"phone": request.phone,
"is_primary": request.is_primary,
"available_24h": request.available_24h,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"name": row.name,
"role": row.role,
"email": row.email,
"phone": row.phone,
"is_primary": row.is_primary,
"available_24h": row.available_24h,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.put("/contacts/{contact_id}")
async def update_contact(
contact_id: str,
request: ContactUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update an existing emergency contact."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_contacts WHERE id = :id AND tenant_id = :tenant_id"),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Contact {contact_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = contact_id
updates["tenant_id"] = tenant_id
row = db.execute(
text(f"""
UPDATE compliance_notfallplan_contacts
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, name, role, email, phone, is_primary, available_24h, created_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"name": row.name,
"role": row.role,
"email": row.email,
"phone": row.phone,
"is_primary": row.is_primary,
"available_24h": row.available_24h,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.delete("/contacts/{contact_id}")
async def delete_contact(
contact_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Delete an emergency contact."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_contacts WHERE id = :id AND tenant_id = :tenant_id"),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Contact {contact_id} not found")
db.execute(
text("DELETE FROM compliance_notfallplan_contacts WHERE id = :id AND tenant_id = :tenant_id"),
{"id": contact_id, "tenant_id": tenant_id},
)
db.commit()
return {"success": True, "message": f"Contact {contact_id} deleted"}
# ============================================================================
# Scenarios
# ============================================================================
@router.get("/scenarios")
async def list_scenarios(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all scenarios for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested, is_active, created_at
FROM compliance_notfallplan_scenarios
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"category": r.category,
"severity": r.severity,
"description": r.description,
"response_steps": r.response_steps if r.response_steps else [],
"estimated_recovery_time": r.estimated_recovery_time,
"last_tested": r.last_tested.isoformat() if r.last_tested else None,
"is_active": r.is_active,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/scenarios", status_code=201)
async def create_scenario(
request: ScenarioCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new scenario."""
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_scenarios
(tenant_id, title, category, severity, description, response_steps, estimated_recovery_time, is_active)
VALUES (:tenant_id, :title, :category, :severity, :description, :response_steps, :estimated_recovery_time, :is_active)
RETURNING id, tenant_id, title, category, severity, description, response_steps,
estimated_recovery_time, last_tested, is_active, created_at
"""),
{
"tenant_id": tenant_id,
"title": request.title,
"category": request.category,
"severity": request.severity,
"description": request.description,
"response_steps": json.dumps(request.response_steps),
"estimated_recovery_time": request.estimated_recovery_time,
"is_active": request.is_active,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"title": row.title,
"category": row.category,
"severity": row.severity,
"description": row.description,
"response_steps": row.response_steps if row.response_steps else [],
"estimated_recovery_time": row.estimated_recovery_time,
"last_tested": row.last_tested.isoformat() if row.last_tested else None,
"is_active": row.is_active,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.put("/scenarios/{scenario_id}")
async def update_scenario(
scenario_id: str,
request: ScenarioUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update an existing scenario."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_scenarios WHERE id = :id AND tenant_id = :tenant_id"),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Scenario {scenario_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
# Serialize response_steps to JSON if present
if "response_steps" in updates:
updates["response_steps"] = json.dumps(updates["response_steps"])
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = scenario_id
updates["tenant_id"] = tenant_id
row = db.execute(
text(f"""
UPDATE compliance_notfallplan_scenarios
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, title, category, severity, description, response_steps,
estimated_recovery_time, last_tested, is_active, created_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"title": row.title,
"category": row.category,
"severity": row.severity,
"description": row.description,
"response_steps": row.response_steps if row.response_steps else [],
"estimated_recovery_time": row.estimated_recovery_time,
"last_tested": row.last_tested.isoformat() if row.last_tested else None,
"is_active": row.is_active,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.delete("/scenarios/{scenario_id}")
async def delete_scenario(
scenario_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Delete a scenario."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_scenarios WHERE id = :id AND tenant_id = :tenant_id"),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Scenario {scenario_id} not found")
db.execute(
text("DELETE FROM compliance_notfallplan_scenarios WHERE id = :id AND tenant_id = :tenant_id"),
{"id": scenario_id, "tenant_id": tenant_id},
)
db.commit()
return {"success": True, "message": f"Scenario {scenario_id} deleted"}
# ============================================================================
# Checklists
# ============================================================================
@router.get("/checklists")
async def list_checklists(
scenario_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List checklist items, optionally filtered by scenario_id."""
if scenario_id:
rows = db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id AND scenario_id = :scenario_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id, "scenario_id": scenario_id},
).fetchall()
else:
rows = db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"title": r.title,
"description": r.description,
"order_index": r.order_index,
"is_required": r.is_required,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/checklists", status_code=201)
async def create_checklist(
request: ChecklistCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new checklist item."""
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_checklists
(tenant_id, scenario_id, title, description, order_index, is_required)
VALUES (:tenant_id, :scenario_id, :title, :description, :order_index, :is_required)
RETURNING id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
"""),
{
"tenant_id": tenant_id,
"scenario_id": request.scenario_id,
"title": request.title,
"description": request.description,
"order_index": request.order_index,
"is_required": request.is_required,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"scenario_id": str(row.scenario_id) if row.scenario_id else None,
"title": row.title,
"description": row.description,
"order_index": row.order_index,
"is_required": row.is_required,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.put("/checklists/{checklist_id}")
async def update_checklist(
checklist_id: str,
request: ChecklistUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Update a checklist item."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_checklists WHERE id = :id AND tenant_id = :tenant_id"),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Checklist item {checklist_id} not found")
updates = request.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = checklist_id
updates["tenant_id"] = tenant_id
row = db.execute(
text(f"""
UPDATE compliance_notfallplan_checklists
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, scenario_id, title, description, order_index, is_required, created_at
"""),
updates,
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"scenario_id": str(row.scenario_id) if row.scenario_id else None,
"title": row.title,
"description": row.description,
"order_index": row.order_index,
"is_required": row.is_required,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
@router.delete("/checklists/{checklist_id}")
async def delete_checklist(
checklist_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Delete a checklist item."""
existing = db.execute(
text("SELECT id FROM compliance_notfallplan_checklists WHERE id = :id AND tenant_id = :tenant_id"),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"Checklist item {checklist_id} not found")
db.execute(
text("DELETE FROM compliance_notfallplan_checklists WHERE id = :id AND tenant_id = :tenant_id"),
{"id": checklist_id, "tenant_id": tenant_id},
)
db.commit()
return {"success": True, "message": f"Checklist item {checklist_id} deleted"}
# ============================================================================
# Exercises
# ============================================================================
@router.get("/exercises")
async def list_exercises(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""List all exercises for a tenant."""
rows = db.execute(
text("""
SELECT id, tenant_id, title, scenario_id, exercise_type, exercise_date,
participants, outcome, notes, created_at
FROM compliance_notfallplan_exercises
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [
{
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"exercise_type": r.exercise_type,
"exercise_date": r.exercise_date.isoformat() if r.exercise_date else None,
"participants": r.participants if r.participants else [],
"outcome": r.outcome,
"notes": r.notes,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/exercises", status_code=201)
async def create_exercise(
request: ExerciseCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Create a new exercise."""
exercise_date = None
if request.exercise_date:
try:
exercise_date = datetime.fromisoformat(request.exercise_date)
except ValueError:
pass
row = db.execute(
text("""
INSERT INTO compliance_notfallplan_exercises
(tenant_id, title, scenario_id, exercise_type, exercise_date, participants, outcome, notes)
VALUES (:tenant_id, :title, :scenario_id, :exercise_type, :exercise_date, :participants, :outcome, :notes)
RETURNING id, tenant_id, title, scenario_id, exercise_type, exercise_date,
participants, outcome, notes, created_at
"""),
{
"tenant_id": tenant_id,
"title": request.title,
"scenario_id": request.scenario_id,
"exercise_type": request.exercise_type,
"exercise_date": exercise_date,
"participants": json.dumps(request.participants),
"outcome": request.outcome,
"notes": request.notes,
},
).fetchone()
db.commit()
return {
"id": str(row.id),
"tenant_id": row.tenant_id,
"title": row.title,
"scenario_id": str(row.scenario_id) if row.scenario_id else None,
"exercise_type": row.exercise_type,
"exercise_date": row.exercise_date.isoformat() if row.exercise_date else None,
"participants": row.participants if row.participants else [],
"outcome": row.outcome,
"notes": row.notes,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
# ============================================================================
# Stats
# ============================================================================
@router.get("/stats")
async def get_stats(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant),
):
"""Return statistics for the Notfallplan module."""
contacts_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_contacts WHERE tenant_id = :tenant_id"),
{"tenant_id": tenant_id},
).scalar()
scenarios_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_scenarios WHERE tenant_id = :tenant_id AND is_active = TRUE"),
{"tenant_id": tenant_id},
).scalar()
exercises_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_exercises WHERE tenant_id = :tenant_id"),
{"tenant_id": tenant_id},
).scalar()
checklists_count = db.execute(
text("SELECT COUNT(*) FROM compliance_notfallplan_checklists WHERE tenant_id = :tenant_id"),
{"tenant_id": tenant_id},
).scalar()
return {
"contacts": contacts_count or 0,
"active_scenarios": scenarios_count or 0,
"exercises": exercises_count or 0,
"checklist_items": checklists_count or 0,
}

View File

@@ -0,0 +1,48 @@
-- Migration 010: Consent Email Templates + DSGVO Processes
CREATE TABLE IF NOT EXISTS compliance_consent_email_templates (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'default',
template_key VARCHAR(100) NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL,
language VARCHAR(10) DEFAULT 'de',
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE(tenant_id, template_key, language)
);
CREATE TABLE IF NOT EXISTS compliance_consent_gdpr_processes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'default',
process_key VARCHAR(100) NOT NULL,
title TEXT NOT NULL,
description TEXT,
legal_basis VARCHAR(100),
retention_days INTEGER,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(tenant_id, process_key)
);
-- Default email templates (seed data)
INSERT INTO compliance_consent_email_templates (tenant_id, template_key, subject, body) VALUES
('default', 'consent_confirmation', 'Ihre Einwilligung wurde bestätigt', 'Sehr geehrte/r {{name}},\n\nvielen Dank für Ihre Einwilligung vom {{date}}.\n\nMit freundlichen Grüßen'),
('default', 'consent_withdrawal', 'Widerruf Ihrer Einwilligung', 'Sehr geehrte/r {{name}},\n\nIhr Widerruf vom {{date}} wurde verarbeitet.\n\nMit freundlichen Grüßen'),
('default', 'dsr_confirmation', 'Ihre Anfrage wurde erhalten', 'Sehr geehrte/r {{name}},\n\nwir haben Ihre Anfrage vom {{date}} erhalten und werden diese innerhalb von 30 Tagen bearbeiten.\n\nMit freundlichen Grüßen'),
('default', 'dsr_completion', 'Ihre Anfrage wurde bearbeitet', 'Sehr geehrte/r {{name}},\n\nIhre Anfrage wurde erfolgreich bearbeitet.\n\nMit freundlichen Grüßen'),
('default', 'data_breach_notification', 'Wichtige Information zu Ihren Daten', 'Sehr geehrte/r {{name}},\n\nwir informieren Sie über einen Datenschutzvorfall, der Ihre Daten betreffen könnte.\n\nMit freundlichen Grüßen'),
('default', 'welcome', 'Willkommen', 'Sehr geehrte/r {{name}},\n\nwillkommen in unserem System.\n\nMit freundlichen Grüßen'),
('default', 'account_deletion', 'Ihr Konto wurde gelöscht', 'Sehr geehrte/r {{name}},\n\nIhr Konto und alle zugehörigen Daten wurden gelöscht.\n\nMit freundlichen Grüßen')
ON CONFLICT (tenant_id, template_key, language) DO NOTHING;
-- Default GDPR processes (seed data)
INSERT INTO compliance_consent_gdpr_processes (tenant_id, process_key, title, description, legal_basis, retention_days) VALUES
('default', 'access_request', 'Auskunftsrecht (Art. 15)', 'Bearbeitung von Auskunftsanfragen betroffener Personen', 'Art. 15 DSGVO', 1095),
('default', 'rectification', 'Berichtigungsrecht (Art. 16)', 'Korrektur unrichtiger personenbezogener Daten', 'Art. 16 DSGVO', 1095),
('default', 'erasure', 'Löschungsrecht (Art. 17)', 'Löschung personenbezogener Daten auf Anfrage', 'Art. 17 DSGVO', 1095),
('default', 'restriction', 'Einschränkung (Art. 18)', 'Einschränkung der Verarbeitung personenbezogener Daten', 'Art. 18 DSGVO', 1095),
('default', 'portability', 'Datenportabilität (Art. 20)', 'Übertragung von Daten in maschinenlesbarem Format', 'Art. 20 DSGVO', 1095),
('default', 'objection', 'Widerspruchsrecht (Art. 21)', 'Widerspruch gegen die Verarbeitung personenbezogener Daten', 'Art. 21 DSGVO', 1095),
('default', 'consent_management', 'Einwilligungsverwaltung (Art. 7)', 'Verwaltung von Einwilligungen und Widerrufen', 'Art. 7 DSGVO', 3650)
ON CONFLICT (tenant_id, process_key) DO NOTHING;

View File

@@ -0,0 +1,24 @@
-- Migration 011: Compliance Escalations
-- Erstellt: 2026-03-03
CREATE TABLE IF NOT EXISTS compliance_escalations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'default',
title TEXT NOT NULL,
description TEXT,
priority VARCHAR(20) DEFAULT 'medium',
status VARCHAR(30) DEFAULT 'open',
category VARCHAR(50),
assignee VARCHAR(200),
reporter VARCHAR(200),
source_module VARCHAR(100),
source_id UUID,
due_date TIMESTAMP,
resolved_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_escalations_tenant ON compliance_escalations(tenant_id);
CREATE INDEX IF NOT EXISTS idx_escalations_status ON compliance_escalations(status);
CREATE INDEX IF NOT EXISTS idx_escalations_priority ON compliance_escalations(priority);

View File

@@ -0,0 +1,53 @@
-- Migration 012: Notfallplan (Emergency Plan)
CREATE TABLE IF NOT EXISTS compliance_notfallplan_contacts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'default',
name TEXT NOT NULL,
role VARCHAR(100),
email VARCHAR(200),
phone VARCHAR(50),
is_primary BOOLEAN DEFAULT FALSE,
available_24h BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS compliance_notfallplan_scenarios (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'default',
title TEXT NOT NULL,
category VARCHAR(50),
severity VARCHAR(20) DEFAULT 'medium',
description TEXT,
response_steps JSONB DEFAULT '[]',
estimated_recovery_time INTEGER,
last_tested TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS compliance_notfallplan_checklists (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'default',
scenario_id UUID REFERENCES compliance_notfallplan_scenarios(id) ON DELETE SET NULL,
title TEXT NOT NULL,
description TEXT,
order_index INTEGER DEFAULT 0,
is_required BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS compliance_notfallplan_exercises (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'default',
title TEXT NOT NULL,
scenario_id UUID,
exercise_type VARCHAR(50) DEFAULT 'tabletop',
exercise_date TIMESTAMP,
participants JSONB DEFAULT '[]',
outcome VARCHAR(50),
notes TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_notfallplan_contacts_tenant ON compliance_notfallplan_contacts(tenant_id);
CREATE INDEX IF NOT EXISTS idx_notfallplan_scenarios_tenant ON compliance_notfallplan_scenarios(tenant_id);

View File

@@ -0,0 +1,123 @@
"""Tests for consent template routes and schemas (consent_template_routes.py)."""
import pytest
from compliance.api.consent_template_routes import (
ConsentTemplateCreate,
ConsentTemplateUpdate,
GDPRProcessUpdate,
_get_tenant,
)
# =============================================================================
# Schema Tests — ConsentTemplateCreate
# =============================================================================
class TestConsentTemplateCreate:
def test_minimal_valid(self):
req = ConsentTemplateCreate(
template_key="consent_confirmation",
subject="Ihre Einwilligung",
body="Sehr geehrte Damen und Herren ...",
)
assert req.template_key == "consent_confirmation"
assert req.language == "de"
assert req.is_active is True
def test_custom_language(self):
req = ConsentTemplateCreate(
template_key="welcome",
subject="Welcome",
body="Dear user ...",
language="en",
)
assert req.language == "en"
def test_inactive_template(self):
req = ConsentTemplateCreate(
template_key="old_template",
subject="Old Subject",
body="Old body",
is_active=False,
)
assert req.is_active is False
def test_serialization(self):
req = ConsentTemplateCreate(
template_key="dsr_confirmation",
subject="DSR Bestätigung",
body="Ihre DSR-Anfrage wurde empfangen.",
)
data = req.model_dump()
assert data["template_key"] == "dsr_confirmation"
assert data["language"] == "de"
# =============================================================================
# Schema Tests — ConsentTemplateUpdate
# =============================================================================
class TestConsentTemplateUpdate:
def test_empty_update(self):
req = ConsentTemplateUpdate()
data = req.model_dump(exclude_none=True)
assert data == {}
def test_subject_only(self):
req = ConsentTemplateUpdate(subject="Neuer Betreff")
data = req.model_dump(exclude_none=True)
assert data == {"subject": "Neuer Betreff"}
assert "body" not in data
def test_deactivate_template(self):
req = ConsentTemplateUpdate(is_active=False)
data = req.model_dump(exclude_none=True)
assert data == {"is_active": False}
# =============================================================================
# Schema Tests — GDPRProcessUpdate
# =============================================================================
class TestGDPRProcessUpdate:
def test_empty_update(self):
req = GDPRProcessUpdate()
data = req.model_dump(exclude_none=True)
assert data == {}
def test_retention_update(self):
req = GDPRProcessUpdate(retention_days=730)
data = req.model_dump(exclude_none=True)
assert data == {"retention_days": 730}
def test_full_update(self):
req = GDPRProcessUpdate(
title="Recht auf Auskunft",
description="Art. 15 DSGVO",
legal_basis="Art. 15 DSGVO",
retention_days=90,
is_active=True,
)
data = req.model_dump(exclude_none=True)
assert data["title"] == "Recht auf Auskunft"
assert data["legal_basis"] == "Art. 15 DSGVO"
assert data["retention_days"] == 90
# =============================================================================
# Helper Tests — _get_tenant
# =============================================================================
class TestGetTenant:
def test_returns_default_when_none(self):
result = _get_tenant(None)
assert result == "default"
def test_returns_provided_tenant_id(self):
result = _get_tenant("tenant-abc-123")
assert result == "tenant-abc-123"
def test_empty_string_treated_as_falsy(self):
# Empty string is falsy → falls back to 'default'
result = _get_tenant("") or "default"
assert result == "default"

View File

@@ -0,0 +1,119 @@
"""Tests for escalation routes and schemas (escalation_routes.py)."""
import pytest
from unittest.mock import MagicMock
from datetime import datetime
from compliance.api.escalation_routes import (
EscalationCreate,
EscalationUpdate,
EscalationStatusUpdate,
_row_to_dict,
)
# =============================================================================
# Schema Tests — EscalationCreate
# =============================================================================
class TestEscalationCreate:
def test_minimal_valid(self):
req = EscalationCreate(title="Test Eskalation")
assert req.title == "Test Eskalation"
assert req.priority == "medium"
assert req.description is None
assert req.category is None
assert req.assignee is None
def test_full_values(self):
req = EscalationCreate(
title="DSGVO-Verstoß",
description="Datenleck entdeckt",
priority="critical",
category="dsgvo_breach",
assignee="admin@example.com",
reporter="user@example.com",
source_module="incidents",
)
assert req.title == "DSGVO-Verstoß"
assert req.priority == "critical"
assert req.category == "dsgvo_breach"
assert req.source_module == "incidents"
def test_serialization(self):
req = EscalationCreate(title="Test", priority="high")
data = req.model_dump(exclude_none=True)
assert data["title"] == "Test"
assert data["priority"] == "high"
assert "description" not in data
# =============================================================================
# Schema Tests — EscalationUpdate
# =============================================================================
class TestEscalationUpdate:
def test_empty_update(self):
req = EscalationUpdate()
data = req.model_dump(exclude_none=True)
assert data == {}
def test_partial_update(self):
req = EscalationUpdate(assignee="new@example.com", priority="low")
data = req.model_dump(exclude_none=True)
assert data == {"assignee": "new@example.com", "priority": "low"}
def test_title_update(self):
req = EscalationUpdate(title="Aktualisierter Titel")
data = req.model_dump(exclude_none=True)
assert data["title"] == "Aktualisierter Titel"
assert "priority" not in data
# =============================================================================
# Schema Tests — EscalationStatusUpdate
# =============================================================================
class TestEscalationStatusUpdate:
def test_status_only(self):
req = EscalationStatusUpdate(status="in_progress")
assert req.status == "in_progress"
assert req.resolved_at is None
def test_with_resolved_at(self):
ts = datetime(2026, 3, 1, 12, 0, 0)
req = EscalationStatusUpdate(status="resolved", resolved_at=ts)
assert req.status == "resolved"
assert req.resolved_at == ts
def test_closed_status(self):
req = EscalationStatusUpdate(status="closed")
assert req.status == "closed"
# =============================================================================
# Helper Tests — _row_to_dict
# =============================================================================
class TestRowToDict:
def test_basic_conversion(self):
row = MagicMock()
row._mapping = {"id": "abc-123", "title": "Test", "priority": "medium"}
result = _row_to_dict(row)
assert result["id"] == "abc-123"
assert result["title"] == "Test"
assert result["priority"] == "medium"
def test_datetime_serialized(self):
ts = datetime(2026, 3, 1, 10, 0, 0)
row = MagicMock()
row._mapping = {"id": "abc", "created_at": ts}
result = _row_to_dict(row)
assert result["created_at"] == ts.isoformat()
def test_none_values_preserved(self):
row = MagicMock()
row._mapping = {"id": "abc", "description": None, "resolved_at": None}
result = _row_to_dict(row)
assert result["description"] is None
assert result["resolved_at"] is None

View File

@@ -0,0 +1,167 @@
"""Tests for Notfallplan routes and schemas (notfallplan_routes.py)."""
import pytest
from compliance.api.notfallplan_routes import (
ContactCreate,
ContactUpdate,
ScenarioCreate,
ScenarioUpdate,
ChecklistCreate,
ExerciseCreate,
)
# =============================================================================
# Schema Tests — ContactCreate
# =============================================================================
class TestContactCreate:
def test_minimal_valid(self):
req = ContactCreate(name="Max Mustermann")
assert req.name == "Max Mustermann"
assert req.is_primary is False
assert req.available_24h is False
assert req.email is None
assert req.phone is None
def test_full_contact(self):
req = ContactCreate(
name="Anna Schmidt",
role="DSB",
email="anna@example.com",
phone="+49 160 12345678",
is_primary=True,
available_24h=True,
)
assert req.role == "DSB"
assert req.is_primary is True
assert req.available_24h is True
def test_serialization(self):
req = ContactCreate(name="Test Kontakt", role="IT-Leiter")
data = req.model_dump(exclude_none=True)
assert data["name"] == "Test Kontakt"
assert data["role"] == "IT-Leiter"
assert "email" not in data
# =============================================================================
# Schema Tests — ContactUpdate
# =============================================================================
class TestContactUpdate:
def test_empty_update(self):
req = ContactUpdate()
data = req.model_dump(exclude_none=True)
assert data == {}
def test_partial_update(self):
req = ContactUpdate(phone="+49 170 9876543", available_24h=True)
data = req.model_dump(exclude_none=True)
assert data == {"phone": "+49 170 9876543", "available_24h": True}
# =============================================================================
# Schema Tests — ScenarioCreate
# =============================================================================
class TestScenarioCreate:
def test_minimal_valid(self):
req = ScenarioCreate(title="Datenpanne")
assert req.title == "Datenpanne"
assert req.severity == "medium"
assert req.is_active is True
assert req.response_steps == []
def test_with_response_steps(self):
steps = ["Schritt 1: Incident identifizieren", "Schritt 2: DSB informieren"]
req = ScenarioCreate(
title="Ransomware-Angriff",
category="system_failure",
severity="critical",
response_steps=steps,
estimated_recovery_time=48,
)
assert req.category == "system_failure"
assert req.severity == "critical"
assert len(req.response_steps) == 2
assert req.estimated_recovery_time == 48
def test_full_serialization(self):
req = ScenarioCreate(
title="Phishing",
category="data_breach",
severity="high",
description="Mitarbeiter wurde Opfer eines Phishing-Angriffs",
)
data = req.model_dump(exclude_none=True)
assert data["severity"] == "high"
assert data["category"] == "data_breach"
# =============================================================================
# Schema Tests — ScenarioUpdate
# =============================================================================
class TestScenarioUpdate:
def test_empty_update(self):
req = ScenarioUpdate()
data = req.model_dump(exclude_none=True)
assert data == {}
def test_severity_update(self):
req = ScenarioUpdate(severity="low")
data = req.model_dump(exclude_none=True)
assert data == {"severity": "low"}
def test_deactivate(self):
req = ScenarioUpdate(is_active=False)
data = req.model_dump(exclude_none=True)
assert data["is_active"] is False
# =============================================================================
# Schema Tests — ChecklistCreate
# =============================================================================
class TestChecklistCreate:
def test_minimal_valid(self):
req = ChecklistCreate(title="DSB benachrichtigen")
assert req.title == "DSB benachrichtigen"
assert req.is_required is True
assert req.order_index == 0
assert req.scenario_id is None
def test_with_scenario_link(self):
req = ChecklistCreate(
title="IT-Team alarmieren",
scenario_id="550e8400-e29b-41d4-a716-446655440000",
order_index=1,
is_required=True,
)
assert req.scenario_id == "550e8400-e29b-41d4-a716-446655440000"
assert req.order_index == 1
# =============================================================================
# Schema Tests — ExerciseCreate
# =============================================================================
class TestExerciseCreate:
def test_minimal_valid(self):
req = ExerciseCreate(title="Jahresübung 2026")
assert req.title == "Jahresübung 2026"
assert req.participants == []
assert req.outcome is None
def test_full_exercise(self):
req = ExerciseCreate(
title="Ransomware-Simulation",
scenario_id="550e8400-e29b-41d4-a716-446655440000",
participants=["Max Mustermann", "Anna Schmidt"],
outcome="passed",
notes="Übung verlief planmäßig",
)
assert req.outcome == "passed"
assert len(req.participants) == 2
assert req.notes == "Übung verlief planmäßig"