refactor(backend/api): extract Notfallplan schemas + services (Step 4)

Split notfallplan_routes.py (1018 LOC) into clean architecture layers:
- compliance/schemas/notfallplan.py (146 LOC): all Pydantic models
- compliance/services/notfallplan_service.py (500 LOC): contacts, scenarios, checklists, exercises, stats
- compliance/services/notfallplan_workflow_service.py (309 LOC): incidents, templates
- compliance/api/notfallplan_routes.py (361 LOC): thin handlers with domain error translation

All 250 tests pass. Schemas re-exported via __all__ for legacy test imports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sharang Parnerkar
2026-04-09 20:10:43 +02:00
parent d35b0bc78c
commit 1a2ae896fb
4 changed files with 1102 additions and 803 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,146 @@
"""
Notfallplan (Emergency Plan) schemas -- Art. 33/34 DSGVO.
Phase 1 Step 4: extracted from ``compliance.api.notfallplan_routes``.
"""
from typing import Any, List, Optional
from pydantic import BaseModel
# ============================================================================
# Contacts
# ============================================================================
class ContactCreate(BaseModel):
name: str
role: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
is_primary: bool = False
available_24h: bool = False
class ContactUpdate(BaseModel):
name: Optional[str] = None
role: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
is_primary: Optional[bool] = None
available_24h: Optional[bool] = None
# ============================================================================
# Scenarios
# ============================================================================
class ScenarioCreate(BaseModel):
title: str
category: Optional[str] = None
severity: str = "medium"
description: Optional[str] = None
response_steps: List[Any] = []
estimated_recovery_time: Optional[int] = None
is_active: bool = True
class ScenarioUpdate(BaseModel):
title: Optional[str] = None
category: Optional[str] = None
severity: Optional[str] = None
description: Optional[str] = None
response_steps: Optional[List[Any]] = None
estimated_recovery_time: Optional[int] = None
last_tested: Optional[str] = None
is_active: Optional[bool] = None
# ============================================================================
# Checklists
# ============================================================================
class ChecklistCreate(BaseModel):
title: str
scenario_id: Optional[str] = None
description: Optional[str] = None
order_index: int = 0
is_required: bool = True
class ChecklistUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
order_index: Optional[int] = None
is_required: Optional[bool] = None
# ============================================================================
# Exercises
# ============================================================================
class ExerciseCreate(BaseModel):
title: str
scenario_id: Optional[str] = None
exercise_type: str = "tabletop"
exercise_date: Optional[str] = None
participants: List[Any] = []
outcome: Optional[str] = None
notes: Optional[str] = None
# ============================================================================
# Incidents
# ============================================================================
class IncidentCreate(BaseModel):
title: str
description: Optional[str] = None
detected_by: Optional[str] = None
status: str = "detected"
severity: str = "medium"
affected_data_categories: List[Any] = []
estimated_affected_persons: int = 0
measures: List[Any] = []
art34_required: bool = False
art34_justification: Optional[str] = None
class IncidentUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
detected_by: Optional[str] = None
status: Optional[str] = None
severity: Optional[str] = None
affected_data_categories: Optional[List[Any]] = None
estimated_affected_persons: Optional[int] = None
measures: Optional[List[Any]] = None
art34_required: Optional[bool] = None
art34_justification: Optional[str] = None
reported_to_authority_at: Optional[str] = None
notified_affected_at: Optional[str] = None
closed_at: Optional[str] = None
closed_by: Optional[str] = None
lessons_learned: Optional[str] = None
# ============================================================================
# Templates
# ============================================================================
class TemplateCreate(BaseModel):
type: str = "art33"
title: str
content: str
class TemplateUpdate(BaseModel):
type: Optional[str] = None
title: Optional[str] = None
content: Optional[str] = None

View File

@@ -0,0 +1,501 @@
# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
"""
Notfallplan service -- contacts, scenarios, checklists, exercises, stats.
Phase 1 Step 4: extracted from ``compliance.api.notfallplan_routes``.
Incident and template operations live in
``compliance.services.notfallplan_workflow_service``.
"""
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import text
from sqlalchemy.orm import Session
from compliance.domain import NotFoundError, ValidationError
from compliance.schemas.notfallplan import (
ChecklistCreate,
ChecklistUpdate,
ContactCreate,
ContactUpdate,
ExerciseCreate,
ScenarioCreate,
ScenarioUpdate,
)
logger = logging.getLogger(__name__)
def _contact_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"name": r.name,
"role": r.role,
"email": r.email,
"phone": r.phone,
"is_primary": r.is_primary,
"available_24h": r.available_24h,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
def _scenario_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"category": r.category,
"severity": r.severity,
"description": r.description,
"response_steps": r.response_steps if r.response_steps else [],
"estimated_recovery_time": r.estimated_recovery_time,
"last_tested": r.last_tested.isoformat() if r.last_tested else None,
"is_active": r.is_active,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
def _checklist_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"title": r.title,
"description": r.description,
"order_index": r.order_index,
"is_required": r.is_required,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
def _exercise_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"exercise_type": r.exercise_type,
"exercise_date": r.exercise_date.isoformat() if r.exercise_date else None,
"participants": r.participants if r.participants else [],
"outcome": r.outcome,
"notes": r.notes,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
class NotfallplanService:
"""Contacts, scenarios, checklists, exercises, stats."""
def __init__(self, db: Session) -> None:
self.db = db
# ------------------------------------------------------------------ contacts
def list_contacts(self, tenant_id: str) -> List[Dict[str, Any]]:
rows = self.db.execute(
text("""
SELECT id, tenant_id, name, role, email, phone,
is_primary, available_24h, created_at
FROM compliance_notfallplan_contacts
WHERE tenant_id = :tenant_id
ORDER BY is_primary DESC, name
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_contact_row(r) for r in rows]
def create_contact(
self, tenant_id: str, req: ContactCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_contacts
(tenant_id, name, role, email, phone, is_primary, available_24h)
VALUES (:tenant_id, :name, :role, :email, :phone,
:is_primary, :available_24h)
RETURNING id, tenant_id, name, role, email, phone,
is_primary, available_24h, created_at
"""),
{
"tenant_id": tenant_id,
"name": req.name,
"role": req.role,
"email": req.email,
"phone": req.phone,
"is_primary": req.is_primary,
"available_24h": req.available_24h,
},
).fetchone()
self.db.commit()
return _contact_row(row)
def update_contact(
self, tenant_id: str, contact_id: str, req: ContactUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_contacts"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Contact {contact_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = contact_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_contacts
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, name, role, email, phone,
is_primary, available_24h, created_at
"""),
updates,
).fetchone()
self.db.commit()
return _contact_row(row)
def delete_contact(self, tenant_id: str, contact_id: str) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_contacts"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Contact {contact_id} not found")
self.db.execute(
text(
"DELETE FROM compliance_notfallplan_contacts"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": contact_id, "tenant_id": tenant_id},
)
self.db.commit()
return {"success": True, "message": f"Contact {contact_id} deleted"}
# ---------------------------------------------------------------- scenarios
def list_scenarios(self, tenant_id: str) -> List[Dict[str, Any]]:
rows = self.db.execute(
text("""
SELECT id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested,
is_active, created_at
FROM compliance_notfallplan_scenarios
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_scenario_row(r) for r in rows]
def create_scenario(
self, tenant_id: str, req: ScenarioCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_scenarios
(tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, is_active)
VALUES (:tenant_id, :title, :category, :severity, :description,
:response_steps, :estimated_recovery_time, :is_active)
RETURNING id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested,
is_active, created_at
"""),
{
"tenant_id": tenant_id,
"title": req.title,
"category": req.category,
"severity": req.severity,
"description": req.description,
"response_steps": json.dumps(req.response_steps),
"estimated_recovery_time": req.estimated_recovery_time,
"is_active": req.is_active,
},
).fetchone()
self.db.commit()
return _scenario_row(row)
def update_scenario(
self, tenant_id: str, scenario_id: str, req: ScenarioUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_scenarios"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Scenario {scenario_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
if "response_steps" in updates:
updates["response_steps"] = json.dumps(updates["response_steps"])
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = scenario_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_scenarios
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested,
is_active, created_at
"""),
updates,
).fetchone()
self.db.commit()
return _scenario_row(row)
def delete_scenario(self, tenant_id: str, scenario_id: str) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_scenarios"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Scenario {scenario_id} not found")
self.db.execute(
text(
"DELETE FROM compliance_notfallplan_scenarios"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": scenario_id, "tenant_id": tenant_id},
)
self.db.commit()
return {"success": True, "message": f"Scenario {scenario_id} deleted"}
# -------------------------------------------------------------- checklists
def list_checklists(
self, tenant_id: str, scenario_id: Optional[str] = None,
) -> List[Dict[str, Any]]:
if scenario_id:
rows = self.db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id AND scenario_id = :scenario_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id, "scenario_id": scenario_id},
).fetchall()
else:
rows = self.db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_checklist_row(r) for r in rows]
def create_checklist(
self, tenant_id: str, req: ChecklistCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_checklists
(tenant_id, scenario_id, title, description,
order_index, is_required)
VALUES (:tenant_id, :scenario_id, :title, :description,
:order_index, :is_required)
RETURNING id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
"""),
{
"tenant_id": tenant_id,
"scenario_id": req.scenario_id,
"title": req.title,
"description": req.description,
"order_index": req.order_index,
"is_required": req.is_required,
},
).fetchone()
self.db.commit()
return _checklist_row(row)
def update_checklist(
self, tenant_id: str, checklist_id: str, req: ChecklistUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_checklists"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Checklist item {checklist_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = checklist_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_checklists
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
"""),
updates,
).fetchone()
self.db.commit()
return _checklist_row(row)
def delete_checklist(self, tenant_id: str, checklist_id: str) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_checklists"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Checklist item {checklist_id} not found")
self.db.execute(
text(
"DELETE FROM compliance_notfallplan_checklists"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": checklist_id, "tenant_id": tenant_id},
)
self.db.commit()
return {"success": True, "message": f"Checklist item {checklist_id} deleted"}
# --------------------------------------------------------------- exercises
def list_exercises(self, tenant_id: str) -> List[Dict[str, Any]]:
rows = self.db.execute(
text("""
SELECT id, tenant_id, title, scenario_id, exercise_type,
exercise_date, participants, outcome, notes, created_at
FROM compliance_notfallplan_exercises
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_exercise_row(r) for r in rows]
def create_exercise(
self, tenant_id: str, req: ExerciseCreate,
) -> Dict[str, Any]:
exercise_date = None
if req.exercise_date:
try:
exercise_date = datetime.fromisoformat(req.exercise_date)
except ValueError:
pass
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_exercises
(tenant_id, title, scenario_id, exercise_type,
exercise_date, participants, outcome, notes)
VALUES (:tenant_id, :title, :scenario_id, :exercise_type,
:exercise_date, :participants, :outcome, :notes)
RETURNING id, tenant_id, title, scenario_id, exercise_type,
exercise_date, participants, outcome, notes, created_at
"""),
{
"tenant_id": tenant_id,
"title": req.title,
"scenario_id": req.scenario_id,
"exercise_type": req.exercise_type,
"exercise_date": exercise_date,
"participants": json.dumps(req.participants),
"outcome": req.outcome,
"notes": req.notes,
},
).fetchone()
self.db.commit()
return _exercise_row(row)
# ------------------------------------------------------------------- stats
def get_stats(self, tenant_id: str) -> Dict[str, int]:
contacts_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_contacts"
" WHERE tenant_id = :tenant_id"
),
{"tenant_id": tenant_id},
).scalar()
scenarios_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_scenarios"
" WHERE tenant_id = :tenant_id AND is_active = TRUE"
),
{"tenant_id": tenant_id},
).scalar()
exercises_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_exercises"
" WHERE tenant_id = :tenant_id"
),
{"tenant_id": tenant_id},
).scalar()
checklists_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_checklists"
" WHERE tenant_id = :tenant_id"
),
{"tenant_id": tenant_id},
).scalar()
incidents_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_incidents"
" WHERE tenant_id = :tenant_id AND status != 'closed'"
),
{"tenant_id": tenant_id},
).scalar()
return {
"contacts": contacts_count or 0,
"active_scenarios": scenarios_count or 0,
"exercises": exercises_count or 0,
"checklist_items": checklists_count or 0,
"open_incidents": incidents_count or 0,
}

View File

@@ -0,0 +1,309 @@
# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
"""
Notfallplan workflow service -- incidents and templates.
Phase 1 Step 4: extracted from ``compliance.api.notfallplan_routes``.
Core CRUD for contacts/scenarios/checklists/exercises/stats lives in
``compliance.services.notfallplan_service``.
"""
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from sqlalchemy import text
from sqlalchemy.orm import Session
from compliance.domain import NotFoundError, ValidationError
from compliance.schemas.notfallplan import (
IncidentCreate,
IncidentUpdate,
TemplateCreate,
TemplateUpdate,
)
logger = logging.getLogger(__name__)
# ============================================================================
# Row serializers
# ============================================================================
def _incident_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"description": r.description,
"detected_at": r.detected_at.isoformat() if r.detected_at else None,
"detected_by": r.detected_by,
"status": r.status,
"severity": r.severity,
"affected_data_categories": (
r.affected_data_categories if r.affected_data_categories else []
),
"estimated_affected_persons": r.estimated_affected_persons,
"measures": r.measures if r.measures else [],
"art34_required": r.art34_required,
"art34_justification": r.art34_justification,
"reported_to_authority_at": (
r.reported_to_authority_at.isoformat()
if r.reported_to_authority_at
else None
),
"notified_affected_at": (
r.notified_affected_at.isoformat()
if r.notified_affected_at
else None
),
"closed_at": r.closed_at.isoformat() if r.closed_at else None,
"closed_by": r.closed_by,
"lessons_learned": r.lessons_learned,
"created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
}
def _template_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"type": r.type,
"title": r.title,
"content": r.content,
"created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
}
class NotfallplanWorkflowService:
"""Incident and template operations."""
def __init__(self, db: Session) -> None:
self.db = db
# --------------------------------------------------------------- incidents
def list_incidents(
self,
tenant_id: str,
status: Optional[str] = None,
severity: Optional[str] = None,
) -> List[Dict[str, Any]]:
where = "WHERE tenant_id = :tenant_id"
params: Dict[str, Any] = {"tenant_id": tenant_id}
if status:
where += " AND status = :status"
params["status"] = status
if severity:
where += " AND severity = :severity"
params["severity"] = severity
rows = self.db.execute(
text(f"""
SELECT * FROM compliance_notfallplan_incidents
{where}
ORDER BY created_at DESC
"""),
params,
).fetchall()
return [_incident_row(r) for r in rows]
def create_incident(
self, tenant_id: str, req: IncidentCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_incidents
(tenant_id, title, description, detected_by, status,
severity, affected_data_categories,
estimated_affected_persons, measures,
art34_required, art34_justification)
VALUES
(:tenant_id, :title, :description, :detected_by,
:status, :severity,
CAST(:affected_data_categories AS jsonb),
:estimated_affected_persons,
CAST(:measures AS jsonb),
:art34_required, :art34_justification)
RETURNING *
"""),
{
"tenant_id": tenant_id,
"title": req.title,
"description": req.description,
"detected_by": req.detected_by,
"status": req.status,
"severity": req.severity,
"affected_data_categories": json.dumps(
req.affected_data_categories
),
"estimated_affected_persons": req.estimated_affected_persons,
"measures": json.dumps(req.measures),
"art34_required": req.art34_required,
"art34_justification": req.art34_justification,
},
).fetchone()
self.db.commit()
return _incident_row(row)
def update_incident(
self, tenant_id: str, incident_id: str, req: IncidentUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_incidents"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": incident_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Incident {incident_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
# Auto-set timestamps based on status transitions
if (
updates.get("status") == "reported"
and not updates.get("reported_to_authority_at")
):
updates["reported_to_authority_at"] = (
datetime.now(timezone.utc).isoformat()
)
if (
updates.get("status") == "closed"
and not updates.get("closed_at")
):
updates["closed_at"] = datetime.now(timezone.utc).isoformat()
updates["updated_at"] = datetime.now(timezone.utc).isoformat()
set_parts = []
for k in updates:
if k in ("affected_data_categories", "measures"):
set_parts.append(f"{k} = CAST(:{k} AS jsonb)")
updates[k] = (
json.dumps(updates[k])
if isinstance(updates[k], list)
else updates[k]
)
else:
set_parts.append(f"{k} = :{k}")
updates["id"] = incident_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_incidents
SET {', '.join(set_parts)}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING *
"""),
updates,
).fetchone()
self.db.commit()
return _incident_row(row)
def delete_incident(self, tenant_id: str, incident_id: str) -> None:
result = self.db.execute(
text(
"DELETE FROM compliance_notfallplan_incidents"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": incident_id, "tenant_id": tenant_id},
)
self.db.commit()
if result.rowcount == 0:
raise NotFoundError(f"Incident {incident_id} not found")
# -------------------------------------------------------------- templates
def list_templates(
self, tenant_id: str, type: Optional[str] = None,
) -> List[Dict[str, Any]]:
where = "WHERE tenant_id = :tenant_id"
params: Dict[str, Any] = {"tenant_id": tenant_id}
if type:
where += " AND type = :type"
params["type"] = type
rows = self.db.execute(
text(
f"SELECT * FROM compliance_notfallplan_templates"
f" {where} ORDER BY type, created_at"
),
params,
).fetchall()
return [_template_row(r) for r in rows]
def create_template(
self, tenant_id: str, req: TemplateCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_templates
(tenant_id, type, title, content)
VALUES (:tenant_id, :type, :title, :content)
RETURNING *
"""),
{
"tenant_id": tenant_id,
"type": req.type,
"title": req.title,
"content": req.content,
},
).fetchone()
self.db.commit()
return _template_row(row)
def update_template(
self, tenant_id: str, template_id: str, req: TemplateUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_templates"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": template_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Template {template_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
updates["updated_at"] = datetime.now(timezone.utc).isoformat()
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = template_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_templates
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING *
"""),
updates,
).fetchone()
self.db.commit()
return _template_row(row)
def delete_template(self, tenant_id: str, template_id: str) -> None:
result = self.db.execute(
text(
"DELETE FROM compliance_notfallplan_templates"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": template_id, "tenant_id": tenant_id},
)
self.db.commit()
if result.rowcount == 0:
raise NotFoundError(f"Template {template_id} not found")