refactor(backend/api): extract Incident services (Step 4 — file 11 of 18)

compliance/api/incident_routes.py (916 LOC) -> 280 LOC thin routes +
two services + 95-line schemas file.

Two-service split for DSGVO Art. 33/34 Datenpannen-Management:

  incident_service.py (460 LOC):
    - CRUD (create, list, get, update, delete)
    - Stats, status update, timeline append, close
    - Module-level helpers: _calculate_risk_level, _is_notification_required,
      _calculate_72h_deadline, _incident_to_response, _measure_to_response,
      _parse_jsonb, _append_timeline, DEFAULT_TENANT_ID

  incident_workflow_service.py (329 LOC):
    - Risk assessment (likelihood x impact -> risk_level)
    - Art. 33 authority notification (with 72h deadline tracking)
    - Art. 34 data subject notification
    - Corrective measures CRUD

Both services use raw SQL via sqlalchemy.text() — no ORM models for
incident_incidents / incident_measures tables. Migrated from the Go
ai-compliance-sdk; Python backend is Source of Truth.

Legacy test compat: tests/test_incident_routes.py imports
_calculate_risk_level, _is_notification_required, _calculate_72h_deadline,
_incident_to_response, _measure_to_response, _parse_jsonb,
DEFAULT_TENANT_ID directly from compliance.api.incident_routes — all
re-exported via __all__.

Verified:
  - 223/223 pytest pass (173 core + 50 incident)
  - OpenAPI 360/484 unchanged
  - mypy compliance/ -> Success on 141 source files
  - incident_routes.py 916 -> 280 LOC
  - Hard-cap violations: 8 -> 7

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sharang Parnerkar
2026-04-09 08:35:57 +02:00
parent 0c2e03f294
commit cc1c61947d
6 changed files with 1292 additions and 982 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,95 @@
"""
Incident / Datenpannen schemas (DSGVO Art. 33/34).
Phase 1 Step 4: extracted from ``compliance.api.incident_routes``.
"""
from typing import List, Optional
from pydantic import BaseModel
class IncidentCreate(BaseModel):
title: str
description: Optional[str] = None
category: Optional[str] = "data_breach"
severity: Optional[str] = "medium"
detected_at: Optional[str] = None
affected_data_categories: Optional[List[str]] = None
affected_data_subject_count: Optional[int] = 0
affected_systems: Optional[List[str]] = None
class IncidentUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
status: Optional[str] = None
severity: Optional[str] = None
affected_data_categories: Optional[List[str]] = None
affected_data_subject_count: Optional[int] = None
affected_systems: Optional[List[str]] = None
class StatusUpdate(BaseModel):
status: str
class RiskAssessmentRequest(BaseModel):
likelihood: int
impact: int
notes: Optional[str] = None
class AuthorityNotificationRequest(BaseModel):
authority_name: str
reference_number: Optional[str] = None
contact_person: Optional[str] = None
notes: Optional[str] = None
class DataSubjectNotificationRequest(BaseModel):
notification_text: str
channel: str = "email"
affected_count: Optional[int] = 0
class MeasureCreate(BaseModel):
title: str
description: Optional[str] = None
measure_type: str = "corrective"
responsible: Optional[str] = None
due_date: Optional[str] = None
class MeasureUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
measure_type: Optional[str] = None
status: Optional[str] = None
responsible: Optional[str] = None
due_date: Optional[str] = None
class TimelineEntryRequest(BaseModel):
action: str
details: Optional[str] = None
class CloseIncidentRequest(BaseModel):
root_cause: str
lessons_learned: Optional[str] = None
__all__ = [
"IncidentCreate",
"IncidentUpdate",
"StatusUpdate",
"RiskAssessmentRequest",
"AuthorityNotificationRequest",
"DataSubjectNotificationRequest",
"MeasureCreate",
"MeasureUpdate",
"TimelineEntryRequest",
"CloseIncidentRequest",
]

View File

@@ -0,0 +1,460 @@
# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
"""
Incident service — CRUD + stats + status + timeline + close.
Phase 1 Step 4: extracted from ``compliance.api.incident_routes``. The
workflow side (risk assessment, Art. 33/34 notifications, measures) lives
in ``compliance.services.incident_workflow_service``.
Module-level helpers (_calculate_risk_level, _is_notification_required,
_calculate_72h_deadline, _incident_to_response, _measure_to_response,
_parse_jsonb) are shared by both service modules and re-exported from
``compliance.api.incident_routes`` for legacy test imports.
"""
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from uuid import uuid4
from sqlalchemy import text
from sqlalchemy.orm import Session
from compliance.domain import NotFoundError, ValidationError
from compliance.schemas.incident import (
CloseIncidentRequest,
IncidentCreate,
IncidentUpdate,
StatusUpdate,
TimelineEntryRequest,
)
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# ============================================================================
# Module-level helpers (re-exported by compliance.api.incident_routes)
# ============================================================================
def _calculate_risk_level(likelihood: int, impact: int) -> str:
"""Calculate risk level from likelihood * impact score."""
score = likelihood * impact
if score >= 20:
return "critical"
if score >= 12:
return "high"
if score >= 6:
return "medium"
return "low"
def _is_notification_required(risk_level: str) -> bool:
"""DSGVO Art. 33 — notification required for critical/high risk."""
return risk_level in ("critical", "high")
def _calculate_72h_deadline(detected_at: datetime) -> str:
"""Calculate 72-hour DSGVO Art. 33 deadline."""
return (detected_at + timedelta(hours=72)).isoformat()
def _parse_jsonb(val: Any) -> Any:
"""Parse a JSONB field — already dict/list from psycopg or a JSON string."""
if val is None:
return None
if isinstance(val, (dict, list)):
return val
if isinstance(val, str):
try:
return json.loads(val)
except (json.JSONDecodeError, TypeError):
return val
return val
def _incident_to_response(row: Any) -> dict[str, Any]:
"""Convert a DB row (RowMapping) to incident response dict."""
r = dict(row)
for field in (
"risk_assessment", "authority_notification",
"data_subject_notification", "timeline",
"affected_data_categories", "affected_systems",
):
if field in r:
r[field] = _parse_jsonb(r[field])
for field in ("detected_at", "created_at", "updated_at", "closed_at"):
if field in r and r[field] is not None and hasattr(r[field], "isoformat"):
r[field] = r[field].isoformat()
return r
def _measure_to_response(row: Any) -> dict[str, Any]:
"""Convert a DB measure row to response dict."""
r = dict(row)
for field in ("due_date", "completed_at", "created_at", "updated_at"):
if field in r and r[field] is not None and hasattr(r[field], "isoformat"):
r[field] = r[field].isoformat()
return r
def _append_timeline(db: Session, incident_id: str, entry: dict[str, Any]) -> None:
"""Append a timeline entry to the incident's timeline JSONB array."""
db.execute(
text(
"UPDATE incident_incidents "
"SET timeline = COALESCE(timeline, '[]'::jsonb) || :entry::jsonb, "
"updated_at = NOW() WHERE id = :id"
),
{"id": incident_id, "entry": json.dumps(entry)},
)
# ============================================================================
# Service
# ============================================================================
class IncidentService:
"""CRUD + stats + status + timeline + close."""
def __init__(self, db: Session) -> None:
self.db = db
def _require_exists(self, iid: str) -> None:
row = self.db.execute(
text("SELECT id FROM incident_incidents WHERE id = :id"),
{"id": iid},
).first()
if not row:
raise NotFoundError("incident not found")
def create(
self, tenant_id: str, user_id: str, body: IncidentCreate
) -> dict[str, Any]:
incident_id = str(uuid4())
now = datetime.now(timezone.utc)
detected_at = now
if body.detected_at:
try:
parsed = datetime.fromisoformat(body.detected_at.replace("Z", "+00:00"))
detected_at = parsed if parsed.tzinfo else parsed.replace(tzinfo=timezone.utc)
except (ValueError, AttributeError):
detected_at = now
deadline = detected_at + timedelta(hours=72)
authority_notification = {"status": "pending", "deadline": deadline.isoformat()}
data_subject_notification = {"required": False, "status": "not_required"}
timeline = [{
"timestamp": now.isoformat(),
"action": "incident_created",
"user_id": user_id,
"details": "Incident detected and reported",
}]
self.db.execute(text("""
INSERT INTO incident_incidents (
id, tenant_id, title, description, category, status, severity,
detected_at, reported_by,
affected_data_categories, affected_data_subject_count, affected_systems,
authority_notification, data_subject_notification, timeline,
created_at, updated_at
) VALUES (
:id, :tenant_id, :title, :description, :category, 'detected', :severity,
:detected_at, :reported_by,
CAST(:affected_data_categories AS jsonb),
:affected_data_subject_count,
CAST(:affected_systems AS jsonb),
CAST(:authority_notification AS jsonb),
CAST(:data_subject_notification AS jsonb),
CAST(:timeline AS jsonb),
:now, :now
)
"""), {
"id": incident_id,
"tenant_id": tenant_id,
"title": body.title,
"description": body.description or "",
"category": body.category,
"severity": body.severity,
"detected_at": detected_at.isoformat(),
"reported_by": user_id,
"affected_data_categories": json.dumps(body.affected_data_categories or []),
"affected_data_subject_count": body.affected_data_subject_count or 0,
"affected_systems": json.dumps(body.affected_systems or []),
"authority_notification": json.dumps(authority_notification),
"data_subject_notification": json.dumps(data_subject_notification),
"timeline": json.dumps(timeline),
"now": now.isoformat(),
})
self.db.commit()
row = self.db.execute(
text("SELECT * FROM incident_incidents WHERE id = :id"),
{"id": incident_id},
).mappings().first()
incident_resp = _incident_to_response(row) if row else {}
return {
"incident": incident_resp,
"authority_deadline": deadline.isoformat(),
"hours_until_deadline": (deadline - now).total_seconds() / 3600,
}
def list_incidents(
self,
tenant_id: str,
status: Optional[str],
severity: Optional[str],
category: Optional[str],
limit: int,
offset: int,
) -> dict[str, Any]:
where = ["tenant_id = :tenant_id"]
params: dict[str, Any] = {
"tenant_id": tenant_id, "limit": limit, "offset": offset,
}
if status:
where.append("status = :status")
params["status"] = status
if severity:
where.append("severity = :severity")
params["severity"] = severity
if category:
where.append("category = :category")
params["category"] = category
where_sql = " AND ".join(where)
total = (
self.db.execute(
text(f"SELECT COUNT(*) FROM incident_incidents WHERE {where_sql}"),
params,
).scalar() or 0
)
rows = (
self.db.execute(
text(
f"SELECT * FROM incident_incidents WHERE {where_sql} "
f"ORDER BY created_at DESC LIMIT :limit OFFSET :offset"
),
params,
)
.mappings()
.all()
)
return {
"incidents": [_incident_to_response(r) for r in rows],
"total": total,
}
def stats(self, tenant_id: str) -> dict[str, Any]:
row: Any = (
self.db.execute(
text("""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status != 'closed' THEN 1 ELSE 0 END) AS open,
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) AS closed,
SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) AS critical,
SUM(CASE WHEN severity = 'high' THEN 1 ELSE 0 END) AS high,
SUM(CASE WHEN severity = 'medium' THEN 1 ELSE 0 END) AS medium,
SUM(CASE WHEN severity = 'low' THEN 1 ELSE 0 END) AS low
FROM incident_incidents
WHERE tenant_id = :tenant_id
"""),
{"tenant_id": tenant_id},
)
.mappings()
.first()
) or {}
return {
"total": int(row["total"] or 0),
"open": int(row["open"] or 0),
"closed": int(row["closed"] or 0),
"by_severity": {
"critical": int(row["critical"] or 0),
"high": int(row["high"] or 0),
"medium": int(row["medium"] or 0),
"low": int(row["low"] or 0),
},
}
def get(self, incident_id: str) -> dict[str, Any]:
row = (
self.db.execute(
text("SELECT * FROM incident_incidents WHERE id = :id"),
{"id": incident_id},
)
.mappings()
.first()
)
if not row:
raise NotFoundError("incident not found")
incident = _incident_to_response(row)
measures = [
_measure_to_response(m)
for m in self.db.execute(
text("SELECT * FROM incident_measures WHERE incident_id = :id ORDER BY created_at"),
{"id": incident_id},
)
.mappings()
.all()
]
deadline_info = None
auth_notif = (
_parse_jsonb(row["authority_notification"])
if "authority_notification" in row.keys()
else None
)
if auth_notif and isinstance(auth_notif, dict) and "deadline" in auth_notif:
try:
deadline_dt = datetime.fromisoformat(
auth_notif["deadline"].replace("Z", "+00:00")
)
now = datetime.now(timezone.utc)
hours_remaining = (deadline_dt - now).total_seconds() / 3600
deadline_info = {
"deadline": auth_notif["deadline"],
"hours_remaining": hours_remaining,
"overdue": hours_remaining < 0,
}
except (ValueError, TypeError):
pass
return {"incident": incident, "measures": measures, "deadline_info": deadline_info}
def update(self, incident_id: str, body: IncidentUpdate) -> dict[str, Any]:
self._require_exists(incident_id)
updates: list[str] = []
params: dict[str, Any] = {"id": incident_id}
for field in ("title", "description", "category", "status", "severity"):
val = getattr(body, field, None)
if val is not None:
updates.append(f"{field} = :{field}")
params[field] = val
if body.affected_data_categories is not None:
updates.append("affected_data_categories = CAST(:adc AS jsonb)")
params["adc"] = json.dumps(body.affected_data_categories)
if body.affected_data_subject_count is not None:
updates.append("affected_data_subject_count = :adsc")
params["adsc"] = body.affected_data_subject_count
if body.affected_systems is not None:
updates.append("affected_systems = CAST(:asys AS jsonb)")
params["asys"] = json.dumps(body.affected_systems)
if not updates:
raise ValidationError("no fields to update")
updates.append("updated_at = NOW()")
self.db.execute(
text(f"UPDATE incident_incidents SET {', '.join(updates)} WHERE id = :id"),
params,
)
self.db.commit()
row = (
self.db.execute(
text("SELECT * FROM incident_incidents WHERE id = :id"),
{"id": incident_id},
)
.mappings()
.first()
)
return {"incident": _incident_to_response(row)}
def delete(self, incident_id: str) -> dict[str, Any]:
self._require_exists(incident_id)
self.db.execute(
text("DELETE FROM incident_measures WHERE incident_id = :id"),
{"id": incident_id},
)
self.db.execute(
text("DELETE FROM incident_incidents WHERE id = :id"),
{"id": incident_id},
)
self.db.commit()
return {"message": "incident deleted"}
def update_status(
self, incident_id: str, user_id: str, body: StatusUpdate
) -> dict[str, Any]:
self._require_exists(incident_id)
self.db.execute(
text(
"UPDATE incident_incidents SET status = :status, updated_at = NOW() "
"WHERE id = :id"
),
{"id": incident_id, "status": body.status},
)
_append_timeline(self.db, incident_id, {
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": "status_changed",
"user_id": user_id,
"details": f"Status changed to {body.status}",
})
self.db.commit()
row = (
self.db.execute(
text("SELECT * FROM incident_incidents WHERE id = :id"),
{"id": incident_id},
)
.mappings()
.first()
)
return {"incident": _incident_to_response(row)}
def add_timeline(
self, incident_id: str, user_id: str, body: TimelineEntryRequest
) -> dict[str, Any]:
self._require_exists(incident_id)
now = datetime.now(timezone.utc)
entry = {
"timestamp": now.isoformat(),
"action": body.action,
"user_id": user_id,
"details": body.details or "",
}
_append_timeline(self.db, incident_id, entry)
self.db.commit()
return {"timeline_entry": entry}
def close(
self, incident_id: str, user_id: str, body: CloseIncidentRequest
) -> dict[str, Any]:
self._require_exists(incident_id)
now = datetime.now(timezone.utc)
self.db.execute(
text("""
UPDATE incident_incidents
SET status = 'closed',
root_cause = :root_cause,
lessons_learned = :lessons_learned,
closed_at = :now,
updated_at = :now
WHERE id = :id
"""),
{
"id": incident_id,
"root_cause": body.root_cause,
"lessons_learned": body.lessons_learned or "",
"now": now.isoformat(),
},
)
_append_timeline(self.db, incident_id, {
"timestamp": now.isoformat(),
"action": "incident_closed",
"user_id": user_id,
"details": f"Incident closed. Root cause: {body.root_cause}",
})
self.db.commit()
return {
"message": "incident closed",
"root_cause": body.root_cause,
"lessons_learned": body.lessons_learned or "",
}

View File

@@ -0,0 +1,329 @@
# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
"""
Incident workflow service — risk assessment + Art. 33/34 notifications + measures.
Phase 1 Step 4: extracted from ``compliance.api.incident_routes``. CRUD +
stats + status + timeline + close live in
``compliance.services.incident_service``.
"""
import json
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import uuid4
from sqlalchemy import text
from sqlalchemy.orm import Session
from compliance.domain import NotFoundError, ValidationError
from compliance.schemas.incident import (
AuthorityNotificationRequest,
DataSubjectNotificationRequest,
MeasureCreate,
MeasureUpdate,
RiskAssessmentRequest,
)
from compliance.services.incident_service import (
_append_timeline,
_calculate_72h_deadline,
_calculate_risk_level,
_is_notification_required,
_measure_to_response,
_parse_jsonb,
)
class IncidentWorkflowService:
"""Business logic for incident risk assessment, notifications, measures."""
def __init__(self, db: Session) -> None:
self.db = db
def _incident_row_or_raise(self, incident_id: str, columns: str) -> Any:
row = (
self.db.execute(
text(f"SELECT {columns} FROM incident_incidents WHERE id = :id"),
{"id": incident_id},
)
.mappings()
.first()
)
if not row:
raise NotFoundError("incident not found")
return row
# ------------------------------------------------------------------
# Risk assessment
# ------------------------------------------------------------------
def assess_risk(
self, incident_id: str, user_id: str, body: RiskAssessmentRequest
) -> dict[str, Any]:
row = self._incident_row_or_raise(
incident_id, "id, status, authority_notification"
)
risk_level = _calculate_risk_level(body.likelihood, body.impact)
notification_required = _is_notification_required(risk_level)
now = datetime.now(timezone.utc)
assessment = {
"likelihood": body.likelihood,
"impact": body.impact,
"risk_level": risk_level,
"assessed_at": now.isoformat(),
"assessed_by": user_id,
"notes": body.notes or "",
}
new_status = "assessment"
if notification_required:
new_status = "notification_required"
auth = _parse_jsonb(row["authority_notification"]) or {}
auth["status"] = "pending"
self.db.execute(
text(
"UPDATE incident_incidents SET authority_notification = CAST(:an AS jsonb) "
"WHERE id = :id"
),
{"id": incident_id, "an": json.dumps(auth)},
)
self.db.execute(
text("""
UPDATE incident_incidents
SET risk_assessment = CAST(:ra AS jsonb),
status = :status,
updated_at = NOW()
WHERE id = :id
"""),
{"id": incident_id, "ra": json.dumps(assessment), "status": new_status},
)
_append_timeline(self.db, incident_id, {
"timestamp": now.isoformat(),
"action": "risk_assessed",
"user_id": user_id,
"details": f"Risk level: {risk_level} (likelihood={body.likelihood}, impact={body.impact})",
})
self.db.commit()
return {
"risk_assessment": assessment,
"notification_required": notification_required,
"incident_status": new_status,
}
# ------------------------------------------------------------------
# Art. 33 authority notification
# ------------------------------------------------------------------
def notify_authority(
self, incident_id: str, user_id: str, body: AuthorityNotificationRequest
) -> dict[str, Any]:
row = self._incident_row_or_raise(
incident_id, "id, detected_at, authority_notification"
)
now = datetime.now(timezone.utc)
auth_existing = _parse_jsonb(row["authority_notification"]) or {}
deadline_str = auth_existing.get("deadline")
if not deadline_str and row["detected_at"]:
detected = row["detected_at"]
if hasattr(detected, "isoformat"):
deadline_str = (detected + timedelta(hours=72)).isoformat()
else:
deadline_str = _calculate_72h_deadline(
datetime.fromisoformat(str(detected).replace("Z", "+00:00"))
)
notification = {
"status": "sent",
"deadline": deadline_str,
"submitted_at": now.isoformat(),
"authority_name": body.authority_name,
"reference_number": body.reference_number or "",
"contact_person": body.contact_person or "",
"notes": body.notes or "",
}
self.db.execute(
text("""
UPDATE incident_incidents
SET authority_notification = CAST(:an AS jsonb),
status = 'notification_sent',
updated_at = NOW()
WHERE id = :id
"""),
{"id": incident_id, "an": json.dumps(notification)},
)
_append_timeline(self.db, incident_id, {
"timestamp": now.isoformat(),
"action": "authority_notified",
"user_id": user_id,
"details": f"Authority notification submitted to {body.authority_name}",
})
self.db.commit()
submitted_within_72h = True
if deadline_str:
try:
deadline_dt = datetime.fromisoformat(deadline_str.replace("Z", "+00:00"))
submitted_within_72h = now < deadline_dt
except (ValueError, TypeError):
pass
return {
"authority_notification": notification,
"submitted_within_72h": submitted_within_72h,
}
# ------------------------------------------------------------------
# Art. 34 data subject notification
# ------------------------------------------------------------------
def notify_subjects(
self, incident_id: str, user_id: str, body: DataSubjectNotificationRequest
) -> dict[str, Any]:
row = self._incident_row_or_raise(
incident_id, "id, affected_data_subject_count"
)
now = datetime.now(timezone.utc)
affected_count = body.affected_count or row["affected_data_subject_count"] or 0
notification = {
"required": True,
"status": "sent",
"sent_at": now.isoformat(),
"affected_count": affected_count,
"notification_text": body.notification_text,
"channel": body.channel,
}
self.db.execute(
text("""
UPDATE incident_incidents
SET data_subject_notification = CAST(:dsn AS jsonb),
updated_at = NOW()
WHERE id = :id
"""),
{"id": incident_id, "dsn": json.dumps(notification)},
)
_append_timeline(self.db, incident_id, {
"timestamp": now.isoformat(),
"action": "data_subjects_notified",
"user_id": user_id,
"details": f"Data subjects notified via {body.channel} ({affected_count} affected)",
})
self.db.commit()
return {"data_subject_notification": notification}
# ------------------------------------------------------------------
# Measures
# ------------------------------------------------------------------
def add_measure(
self, incident_id: str, user_id: str, body: MeasureCreate
) -> dict[str, Any]:
self._incident_row_or_raise(incident_id, "id")
measure_id = str(uuid4())
now = datetime.now(timezone.utc)
self.db.execute(
text("""
INSERT INTO incident_measures (
id, incident_id, title, description, measure_type, status,
responsible, due_date, created_at, updated_at
) VALUES (
:id, :incident_id, :title, :description, :measure_type, 'planned',
:responsible, :due_date, :now, :now
)
"""),
{
"id": measure_id,
"incident_id": incident_id,
"title": body.title,
"description": body.description or "",
"measure_type": body.measure_type,
"responsible": body.responsible or "",
"due_date": body.due_date,
"now": now.isoformat(),
},
)
_append_timeline(self.db, incident_id, {
"timestamp": now.isoformat(),
"action": "measure_added",
"user_id": user_id,
"details": f"Measure added: {body.title} ({body.measure_type})",
})
self.db.commit()
measure = (
self.db.execute(
text("SELECT * FROM incident_measures WHERE id = :id"),
{"id": measure_id},
)
.mappings()
.first()
)
return {"measure": _measure_to_response(measure)}
def update_measure(
self, measure_id: str, body: MeasureUpdate
) -> dict[str, Any]:
check = self.db.execute(
text("SELECT id FROM incident_measures WHERE id = :id"),
{"id": measure_id},
).first()
if not check:
raise NotFoundError("measure not found")
updates: list[str] = []
params: dict[str, Any] = {"id": measure_id}
for field in (
"title", "description", "measure_type", "status", "responsible", "due_date",
):
val = getattr(body, field, None)
if val is not None:
updates.append(f"{field} = :{field}")
params[field] = val
if not updates:
raise ValidationError("no fields to update")
updates.append("updated_at = NOW()")
self.db.execute(
text(f"UPDATE incident_measures SET {', '.join(updates)} WHERE id = :id"),
params,
)
self.db.commit()
measure = (
self.db.execute(
text("SELECT * FROM incident_measures WHERE id = :id"),
{"id": measure_id},
)
.mappings()
.first()
)
return {"measure": _measure_to_response(measure)}
def complete_measure(self, measure_id: str) -> dict[str, Any]:
check = self.db.execute(
text("SELECT id FROM incident_measures WHERE id = :id"),
{"id": measure_id},
).first()
if not check:
raise NotFoundError("measure not found")
self.db.execute(
text(
"UPDATE incident_measures "
"SET status = 'completed', completed_at = NOW(), updated_at = NOW() "
"WHERE id = :id"
),
{"id": measure_id},
)
self.db.commit()
return {"message": "measure completed"}

View File

@@ -91,5 +91,7 @@ ignore_errors = False
ignore_errors = False
[mypy-compliance.api.email_template_routes]
ignore_errors = False
[mypy-compliance.api.incident_routes]
ignore_errors = False
[mypy-compliance.api._http_errors]
ignore_errors = False

View File

@@ -19563,218 +19563,6 @@
"title": "ConsentCreate",
"type": "object"
},
"compliance__api__incident_routes__IncidentCreate": {
"properties": {
"affected_data_categories": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Data Categories"
},
"affected_data_subject_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": 0,
"title": "Affected Data Subject Count"
},
"affected_systems": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Systems"
},
"category": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": "data_breach",
"title": "Category"
},
"description": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Description"
},
"detected_at": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Detected At"
},
"severity": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": "medium",
"title": "Severity"
},
"title": {
"title": "Title",
"type": "string"
}
},
"required": [
"title"
],
"title": "IncidentCreate",
"type": "object"
},
"compliance__api__incident_routes__IncidentUpdate": {
"properties": {
"affected_data_categories": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Data Categories"
},
"affected_data_subject_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Affected Data Subject Count"
},
"affected_systems": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Systems"
},
"category": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Category"
},
"description": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Description"
},
"severity": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Severity"
},
"status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Status"
},
"title": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Title"
}
},
"title": "IncidentUpdate",
"type": "object"
},
"compliance__api__incident_routes__StatusUpdate": {
"properties": {
"status": {
"title": "Status",
"type": "string"
}
},
"required": [
"status"
],
"title": "StatusUpdate",
"type": "object"
},
"compliance__api__legal_document_routes__VersionCreate": {
"properties": {
"content": {
@@ -20361,6 +20149,218 @@
},
"title": "VersionUpdate",
"type": "object"
},
"compliance__schemas__incident__IncidentCreate": {
"properties": {
"affected_data_categories": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Data Categories"
},
"affected_data_subject_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": 0,
"title": "Affected Data Subject Count"
},
"affected_systems": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Systems"
},
"category": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": "data_breach",
"title": "Category"
},
"description": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Description"
},
"detected_at": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Detected At"
},
"severity": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": "medium",
"title": "Severity"
},
"title": {
"title": "Title",
"type": "string"
}
},
"required": [
"title"
],
"title": "IncidentCreate",
"type": "object"
},
"compliance__schemas__incident__IncidentUpdate": {
"properties": {
"affected_data_categories": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Data Categories"
},
"affected_data_subject_count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Affected Data Subject Count"
},
"affected_systems": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Affected Systems"
},
"category": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Category"
},
"description": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Description"
},
"severity": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Severity"
},
"status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Status"
},
"title": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Title"
}
},
"title": "IncidentUpdate",
"type": "object"
},
"compliance__schemas__incident__StatusUpdate": {
"properties": {
"status": {
"title": "Status",
"type": "string"
}
},
"required": [
"status"
],
"title": "StatusUpdate",
"type": "object"
}
}
},
@@ -30056,7 +30056,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response List Incidents Api Compliance Incidents Get",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30118,7 +30122,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/compliance__api__incident_routes__IncidentCreate"
"$ref": "#/components/schemas/compliance__schemas__incident__IncidentCreate"
}
}
},
@@ -30128,7 +30132,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Create Incident Api Compliance Incidents Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30176,7 +30184,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Get Stats Api Compliance Incidents Stats Get",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30218,7 +30230,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Delete Incident Api Compliance Incidents Incident Id Delete",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30258,7 +30274,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Get Incident Api Compliance Incidents Incident Id Get",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30298,7 +30318,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/compliance__api__incident_routes__IncidentUpdate"
"$ref": "#/components/schemas/compliance__schemas__incident__IncidentUpdate"
}
}
},
@@ -30308,7 +30328,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Update Incident Api Compliance Incidents Incident Id Put",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30376,7 +30400,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Assess Risk Api Compliance Incidents Incident Id Assess Risk Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30444,7 +30472,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Close Incident Api Compliance Incidents Incident Id Close Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30512,7 +30544,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Add Measure Api Compliance Incidents Incident Id Measures Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30574,7 +30610,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Update Measure Api Compliance Incidents Incident Id Measures Measure Id Put",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30626,7 +30666,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Complete Measure Api Compliance Incidents Incident Id Measures Measure Id Complete Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30694,7 +30738,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Notify Authority Api Compliance Incidents Incident Id Notify Authority Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30762,7 +30810,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Notify Subjects Api Compliance Incidents Incident Id Notify Subjects Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30820,7 +30872,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/compliance__api__incident_routes__StatusUpdate"
"$ref": "#/components/schemas/compliance__schemas__incident__StatusUpdate"
}
}
},
@@ -30830,7 +30882,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Update Status Api Compliance Incidents Incident Id Status Put",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -30898,7 +30954,11 @@
"200": {
"content": {
"application/json": {
"schema": {}
"schema": {
"additionalProperties": true,
"title": "Response Add Timeline Entry Api Compliance Incidents Incident Id Timeline Post",
"type": "object"
}
}
},
"description": "Successful Response"
@@ -35673,7 +35733,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/compliance__api__incident_routes__StatusUpdate"
"$ref": "#/components/schemas/compliance__schemas__incident__StatusUpdate"
}
}
},