Files
breakpilot-compliance/backend-compliance/compliance/services/notfallplan_service.py
Sharang Parnerkar 1a2ae896fb refactor(backend/api): extract Notfallplan schemas + services (Step 4)
Split notfallplan_routes.py (1018 LOC) into clean architecture layers:
- compliance/schemas/notfallplan.py (146 LOC): all Pydantic models
- compliance/services/notfallplan_service.py (500 LOC): contacts, scenarios, checklists, exercises, stats
- compliance/services/notfallplan_workflow_service.py (309 LOC): incidents, templates
- compliance/api/notfallplan_routes.py (361 LOC): thin handlers with domain error translation

All 250 tests pass. Schemas re-exported via __all__ for legacy test imports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 20:10:43 +02:00

501 lines
18 KiB
Python

# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
"""
Notfallplan service -- contacts, scenarios, checklists, exercises, stats.
Phase 1 Step 4: extracted from ``compliance.api.notfallplan_routes``.
Incident and template operations live in
``compliance.services.notfallplan_workflow_service``.
"""
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import text
from sqlalchemy.orm import Session
from compliance.domain import NotFoundError, ValidationError
from compliance.schemas.notfallplan import (
ChecklistCreate,
ChecklistUpdate,
ContactCreate,
ContactUpdate,
ExerciseCreate,
ScenarioCreate,
ScenarioUpdate,
)
logger = logging.getLogger(__name__)
def _contact_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"name": r.name,
"role": r.role,
"email": r.email,
"phone": r.phone,
"is_primary": r.is_primary,
"available_24h": r.available_24h,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
def _scenario_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"category": r.category,
"severity": r.severity,
"description": r.description,
"response_steps": r.response_steps if r.response_steps else [],
"estimated_recovery_time": r.estimated_recovery_time,
"last_tested": r.last_tested.isoformat() if r.last_tested else None,
"is_active": r.is_active,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
def _checklist_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"title": r.title,
"description": r.description,
"order_index": r.order_index,
"is_required": r.is_required,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
def _exercise_row(r: Any) -> Dict[str, Any]:
return {
"id": str(r.id),
"tenant_id": r.tenant_id,
"title": r.title,
"scenario_id": str(r.scenario_id) if r.scenario_id else None,
"exercise_type": r.exercise_type,
"exercise_date": r.exercise_date.isoformat() if r.exercise_date else None,
"participants": r.participants if r.participants else [],
"outcome": r.outcome,
"notes": r.notes,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
class NotfallplanService:
"""Contacts, scenarios, checklists, exercises, stats."""
def __init__(self, db: Session) -> None:
self.db = db
# ------------------------------------------------------------------ contacts
def list_contacts(self, tenant_id: str) -> List[Dict[str, Any]]:
rows = self.db.execute(
text("""
SELECT id, tenant_id, name, role, email, phone,
is_primary, available_24h, created_at
FROM compliance_notfallplan_contacts
WHERE tenant_id = :tenant_id
ORDER BY is_primary DESC, name
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_contact_row(r) for r in rows]
def create_contact(
self, tenant_id: str, req: ContactCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_contacts
(tenant_id, name, role, email, phone, is_primary, available_24h)
VALUES (:tenant_id, :name, :role, :email, :phone,
:is_primary, :available_24h)
RETURNING id, tenant_id, name, role, email, phone,
is_primary, available_24h, created_at
"""),
{
"tenant_id": tenant_id,
"name": req.name,
"role": req.role,
"email": req.email,
"phone": req.phone,
"is_primary": req.is_primary,
"available_24h": req.available_24h,
},
).fetchone()
self.db.commit()
return _contact_row(row)
def update_contact(
self, tenant_id: str, contact_id: str, req: ContactUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_contacts"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Contact {contact_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = contact_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_contacts
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, name, role, email, phone,
is_primary, available_24h, created_at
"""),
updates,
).fetchone()
self.db.commit()
return _contact_row(row)
def delete_contact(self, tenant_id: str, contact_id: str) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_contacts"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": contact_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Contact {contact_id} not found")
self.db.execute(
text(
"DELETE FROM compliance_notfallplan_contacts"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": contact_id, "tenant_id": tenant_id},
)
self.db.commit()
return {"success": True, "message": f"Contact {contact_id} deleted"}
# ---------------------------------------------------------------- scenarios
def list_scenarios(self, tenant_id: str) -> List[Dict[str, Any]]:
rows = self.db.execute(
text("""
SELECT id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested,
is_active, created_at
FROM compliance_notfallplan_scenarios
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_scenario_row(r) for r in rows]
def create_scenario(
self, tenant_id: str, req: ScenarioCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_scenarios
(tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, is_active)
VALUES (:tenant_id, :title, :category, :severity, :description,
:response_steps, :estimated_recovery_time, :is_active)
RETURNING id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested,
is_active, created_at
"""),
{
"tenant_id": tenant_id,
"title": req.title,
"category": req.category,
"severity": req.severity,
"description": req.description,
"response_steps": json.dumps(req.response_steps),
"estimated_recovery_time": req.estimated_recovery_time,
"is_active": req.is_active,
},
).fetchone()
self.db.commit()
return _scenario_row(row)
def update_scenario(
self, tenant_id: str, scenario_id: str, req: ScenarioUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_scenarios"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Scenario {scenario_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
if "response_steps" in updates:
updates["response_steps"] = json.dumps(updates["response_steps"])
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = scenario_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_scenarios
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, title, category, severity, description,
response_steps, estimated_recovery_time, last_tested,
is_active, created_at
"""),
updates,
).fetchone()
self.db.commit()
return _scenario_row(row)
def delete_scenario(self, tenant_id: str, scenario_id: str) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_scenarios"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": scenario_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Scenario {scenario_id} not found")
self.db.execute(
text(
"DELETE FROM compliance_notfallplan_scenarios"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": scenario_id, "tenant_id": tenant_id},
)
self.db.commit()
return {"success": True, "message": f"Scenario {scenario_id} deleted"}
# -------------------------------------------------------------- checklists
def list_checklists(
self, tenant_id: str, scenario_id: Optional[str] = None,
) -> List[Dict[str, Any]]:
if scenario_id:
rows = self.db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id AND scenario_id = :scenario_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id, "scenario_id": scenario_id},
).fetchall()
else:
rows = self.db.execute(
text("""
SELECT id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
FROM compliance_notfallplan_checklists
WHERE tenant_id = :tenant_id
ORDER BY order_index, created_at
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_checklist_row(r) for r in rows]
def create_checklist(
self, tenant_id: str, req: ChecklistCreate,
) -> Dict[str, Any]:
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_checklists
(tenant_id, scenario_id, title, description,
order_index, is_required)
VALUES (:tenant_id, :scenario_id, :title, :description,
:order_index, :is_required)
RETURNING id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
"""),
{
"tenant_id": tenant_id,
"scenario_id": req.scenario_id,
"title": req.title,
"description": req.description,
"order_index": req.order_index,
"is_required": req.is_required,
},
).fetchone()
self.db.commit()
return _checklist_row(row)
def update_checklist(
self, tenant_id: str, checklist_id: str, req: ChecklistUpdate,
) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_checklists"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Checklist item {checklist_id} not found")
updates = req.dict(exclude_none=True)
if not updates:
raise ValidationError("No fields to update")
set_clauses = ", ".join(f"{k} = :{k}" for k in updates)
updates["id"] = checklist_id
updates["tenant_id"] = tenant_id
row = self.db.execute(
text(f"""
UPDATE compliance_notfallplan_checklists
SET {set_clauses}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING id, tenant_id, scenario_id, title, description,
order_index, is_required, created_at
"""),
updates,
).fetchone()
self.db.commit()
return _checklist_row(row)
def delete_checklist(self, tenant_id: str, checklist_id: str) -> Dict[str, Any]:
existing = self.db.execute(
text(
"SELECT id FROM compliance_notfallplan_checklists"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": checklist_id, "tenant_id": tenant_id},
).fetchone()
if not existing:
raise NotFoundError(f"Checklist item {checklist_id} not found")
self.db.execute(
text(
"DELETE FROM compliance_notfallplan_checklists"
" WHERE id = :id AND tenant_id = :tenant_id"
),
{"id": checklist_id, "tenant_id": tenant_id},
)
self.db.commit()
return {"success": True, "message": f"Checklist item {checklist_id} deleted"}
# --------------------------------------------------------------- exercises
def list_exercises(self, tenant_id: str) -> List[Dict[str, Any]]:
rows = self.db.execute(
text("""
SELECT id, tenant_id, title, scenario_id, exercise_type,
exercise_date, participants, outcome, notes, created_at
FROM compliance_notfallplan_exercises
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC
"""),
{"tenant_id": tenant_id},
).fetchall()
return [_exercise_row(r) for r in rows]
def create_exercise(
self, tenant_id: str, req: ExerciseCreate,
) -> Dict[str, Any]:
exercise_date = None
if req.exercise_date:
try:
exercise_date = datetime.fromisoformat(req.exercise_date)
except ValueError:
pass
row = self.db.execute(
text("""
INSERT INTO compliance_notfallplan_exercises
(tenant_id, title, scenario_id, exercise_type,
exercise_date, participants, outcome, notes)
VALUES (:tenant_id, :title, :scenario_id, :exercise_type,
:exercise_date, :participants, :outcome, :notes)
RETURNING id, tenant_id, title, scenario_id, exercise_type,
exercise_date, participants, outcome, notes, created_at
"""),
{
"tenant_id": tenant_id,
"title": req.title,
"scenario_id": req.scenario_id,
"exercise_type": req.exercise_type,
"exercise_date": exercise_date,
"participants": json.dumps(req.participants),
"outcome": req.outcome,
"notes": req.notes,
},
).fetchone()
self.db.commit()
return _exercise_row(row)
# ------------------------------------------------------------------- stats
def get_stats(self, tenant_id: str) -> Dict[str, int]:
contacts_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_contacts"
" WHERE tenant_id = :tenant_id"
),
{"tenant_id": tenant_id},
).scalar()
scenarios_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_scenarios"
" WHERE tenant_id = :tenant_id AND is_active = TRUE"
),
{"tenant_id": tenant_id},
).scalar()
exercises_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_exercises"
" WHERE tenant_id = :tenant_id"
),
{"tenant_id": tenant_id},
).scalar()
checklists_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_checklists"
" WHERE tenant_id = :tenant_id"
),
{"tenant_id": tenant_id},
).scalar()
incidents_count = self.db.execute(
text(
"SELECT COUNT(*) FROM compliance_notfallplan_incidents"
" WHERE tenant_id = :tenant_id AND status != 'closed'"
),
{"tenant_id": tenant_id},
).scalar()
return {
"contacts": contacts_count or 0,
"active_scenarios": scenarios_count or 0,
"exercises": exercises_count or 0,
"checklist_items": checklists_count or 0,
"open_incidents": incidents_count or 0,
}