All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 32s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 19s
CI / test-python-dsms-gateway (push) Successful in 16s
- escalations/route.ts: X-Tenant-Id + X-User-Id Default-Header ergaenzt, X-User-Id aus Request weitergeleitet - escalation_routes.py: DEFAULT_TENANT_ID Konstante (9282a473-...) statt 'default' - test_escalation_routes.py: vollstaendige Test-Suite ergaenzt (+337 Zeilen) - main.go + escalation_handlers.go: DEPRECATED-Kommentare — UCCA-Escalations bleiben fuer Assessment-Review, Haupt-Escalation-System ist Python-Backend Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
465 lines
18 KiB
Python
465 lines
18 KiB
Python
"""Tests for escalation routes and schemas (escalation_routes.py)."""
|
|
|
|
import pytest
|
|
from unittest.mock import MagicMock, patch
|
|
from datetime import datetime
|
|
from fastapi.testclient import TestClient
|
|
from fastapi import FastAPI
|
|
|
|
from compliance.api.escalation_routes import (
|
|
router,
|
|
EscalationCreate,
|
|
EscalationUpdate,
|
|
EscalationStatusUpdate,
|
|
_row_to_dict,
|
|
DEFAULT_TENANT_ID,
|
|
)
|
|
|
|
from classroom_engine.database import get_db
|
|
|
|
# =============================================================================
|
|
# App + Client setup
|
|
# =============================================================================
|
|
|
|
app = FastAPI()
|
|
app.include_router(router)
|
|
|
|
ESCALATION_ID = "eeeeeeee-0001-0001-0001-000000000001"
|
|
UNKNOWN_ID = "aaaaaaaa-9999-9999-9999-999999999999"
|
|
|
|
|
|
# =============================================================================
|
|
# Mock helpers
|
|
# =============================================================================
|
|
|
|
class _MockRow:
|
|
"""Simulates a SQLAlchemy row with _mapping attribute."""
|
|
def __init__(self, data: dict):
|
|
self._mapping = data
|
|
|
|
def __getitem__(self, idx):
|
|
vals = list(self._mapping.values())
|
|
return vals[idx]
|
|
|
|
|
|
def _make_escalation_row(overrides=None):
|
|
now = datetime(2026, 3, 6, 12, 0, 0)
|
|
data = {
|
|
"id": ESCALATION_ID,
|
|
"tenant_id": DEFAULT_TENANT_ID,
|
|
"title": "Test Eskalation",
|
|
"description": "Beschreibung",
|
|
"priority": "medium",
|
|
"status": "open",
|
|
"category": "dsgvo_breach",
|
|
"assignee": "admin@example.com",
|
|
"reporter": "user@example.com",
|
|
"source_module": None,
|
|
"source_id": None,
|
|
"due_date": None,
|
|
"resolved_at": None,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
if overrides:
|
|
data.update(overrides)
|
|
return _MockRow(data)
|
|
|
|
|
|
def _count_row(val):
|
|
"""Simulates a COUNT(*) row — fetchone()[0] returns the value."""
|
|
row = MagicMock()
|
|
row.__getitem__ = lambda self, idx: val
|
|
return row
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_db():
|
|
db = MagicMock()
|
|
app.dependency_overrides[get_db] = lambda: db
|
|
yield db
|
|
app.dependency_overrides.pop(get_db, None)
|
|
|
|
|
|
@pytest.fixture
|
|
def client(mock_db):
|
|
return TestClient(app)
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests — EscalationCreate
|
|
# =============================================================================
|
|
|
|
class TestEscalationCreate:
|
|
def test_minimal_valid(self):
|
|
req = EscalationCreate(title="Test Eskalation")
|
|
assert req.title == "Test Eskalation"
|
|
assert req.priority == "medium"
|
|
assert req.description is None
|
|
assert req.category is None
|
|
assert req.assignee is None
|
|
|
|
def test_full_values(self):
|
|
req = EscalationCreate(
|
|
title="DSGVO-Verstoß",
|
|
description="Datenleck entdeckt",
|
|
priority="critical",
|
|
category="dsgvo_breach",
|
|
assignee="admin@example.com",
|
|
reporter="user@example.com",
|
|
source_module="incidents",
|
|
)
|
|
assert req.title == "DSGVO-Verstoß"
|
|
assert req.priority == "critical"
|
|
assert req.category == "dsgvo_breach"
|
|
assert req.source_module == "incidents"
|
|
|
|
def test_serialization(self):
|
|
req = EscalationCreate(title="Test", priority="high")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["title"] == "Test"
|
|
assert data["priority"] == "high"
|
|
assert "description" not in data
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests — EscalationUpdate
|
|
# =============================================================================
|
|
|
|
class TestEscalationUpdate:
|
|
def test_empty_update(self):
|
|
req = EscalationUpdate()
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data == {}
|
|
|
|
def test_partial_update(self):
|
|
req = EscalationUpdate(assignee="new@example.com", priority="low")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data == {"assignee": "new@example.com", "priority": "low"}
|
|
|
|
def test_title_update(self):
|
|
req = EscalationUpdate(title="Aktualisierter Titel")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["title"] == "Aktualisierter Titel"
|
|
assert "priority" not in data
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests — EscalationStatusUpdate
|
|
# =============================================================================
|
|
|
|
class TestEscalationStatusUpdate:
|
|
def test_status_only(self):
|
|
req = EscalationStatusUpdate(status="in_progress")
|
|
assert req.status == "in_progress"
|
|
assert req.resolved_at is None
|
|
|
|
def test_with_resolved_at(self):
|
|
ts = datetime(2026, 3, 1, 12, 0, 0)
|
|
req = EscalationStatusUpdate(status="resolved", resolved_at=ts)
|
|
assert req.status == "resolved"
|
|
assert req.resolved_at == ts
|
|
|
|
def test_closed_status(self):
|
|
req = EscalationStatusUpdate(status="closed")
|
|
assert req.status == "closed"
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Tests — _row_to_dict
|
|
# =============================================================================
|
|
|
|
class TestRowToDict:
|
|
def test_basic_conversion(self):
|
|
row = MagicMock()
|
|
row._mapping = {"id": "abc-123", "title": "Test", "priority": "medium"}
|
|
result = _row_to_dict(row)
|
|
assert result["id"] == "abc-123"
|
|
assert result["title"] == "Test"
|
|
assert result["priority"] == "medium"
|
|
|
|
def test_datetime_serialized(self):
|
|
ts = datetime(2026, 3, 1, 10, 0, 0)
|
|
row = MagicMock()
|
|
row._mapping = {"id": "abc", "created_at": ts}
|
|
result = _row_to_dict(row)
|
|
assert result["created_at"] == ts.isoformat()
|
|
|
|
def test_none_values_preserved(self):
|
|
row = MagicMock()
|
|
row._mapping = {"id": "abc", "description": None, "resolved_at": None}
|
|
result = _row_to_dict(row)
|
|
assert result["description"] is None
|
|
assert result["resolved_at"] is None
|
|
|
|
|
|
# =============================================================================
|
|
# Route Tests — List Escalations
|
|
# =============================================================================
|
|
|
|
class TestListEscalations:
|
|
def test_list_empty(self, client, mock_db):
|
|
mock_db.execute.side_effect = None
|
|
mock_db.execute.return_value = MagicMock(
|
|
fetchall=MagicMock(return_value=[]),
|
|
fetchone=MagicMock(return_value=_count_row(0)),
|
|
)
|
|
# fetchall for items, then fetchone for count
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchall=MagicMock(return_value=[])),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(0))),
|
|
]
|
|
resp = client.get("/escalations")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["items"] == []
|
|
assert data["total"] == 0
|
|
|
|
def test_list_with_items(self, client, mock_db):
|
|
row = _make_escalation_row()
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchall=MagicMock(return_value=[row])),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(1))),
|
|
]
|
|
resp = client.get("/escalations")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data["items"]) == 1
|
|
assert data["items"][0]["id"] == ESCALATION_ID
|
|
assert data["total"] == 1
|
|
|
|
def test_list_filter_status(self, client, mock_db):
|
|
row = _make_escalation_row({"status": "in_progress"})
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchall=MagicMock(return_value=[row])),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(1))),
|
|
]
|
|
resp = client.get("/escalations?status=in_progress")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data["items"]) == 1
|
|
|
|
def test_list_filter_priority(self, client, mock_db):
|
|
row = _make_escalation_row({"priority": "critical"})
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchall=MagicMock(return_value=[row])),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(1))),
|
|
]
|
|
resp = client.get("/escalations?priority=critical")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["items"][0]["priority"] == "critical"
|
|
|
|
|
|
# =============================================================================
|
|
# Route Tests — Create Escalation
|
|
# =============================================================================
|
|
|
|
class TestCreateEscalation:
|
|
def test_create_basic(self, client, mock_db):
|
|
row = _make_escalation_row()
|
|
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=row))
|
|
resp = client.post("/escalations", json={"title": "Test Eskalation"})
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["id"] == ESCALATION_ID
|
|
assert data["title"] == "Test Eskalation"
|
|
mock_db.commit.assert_called_once()
|
|
|
|
def test_create_full_fields(self, client, mock_db):
|
|
row = _make_escalation_row({
|
|
"priority": "critical",
|
|
"category": "ai_act",
|
|
"assignee": "dsb@example.com",
|
|
})
|
|
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=row))
|
|
resp = client.post("/escalations", json={
|
|
"title": "AI Act Verstoß",
|
|
"description": "Hochrisiko-KI ohne Dokumentation",
|
|
"priority": "critical",
|
|
"category": "ai_act",
|
|
"assignee": "dsb@example.com",
|
|
})
|
|
assert resp.status_code == 201
|
|
assert resp.json()["priority"] == "critical"
|
|
|
|
def test_create_missing_title_422(self, client, mock_db):
|
|
resp = client.post("/escalations", json={"description": "Ohne Titel"})
|
|
assert resp.status_code == 422
|
|
|
|
|
|
# =============================================================================
|
|
# Route Tests — Get Escalation
|
|
# =============================================================================
|
|
|
|
class TestGetEscalation:
|
|
def test_get_existing(self, client, mock_db):
|
|
row = _make_escalation_row()
|
|
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=row))
|
|
resp = client.get(f"/escalations/{ESCALATION_ID}")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["id"] == ESCALATION_ID
|
|
|
|
def test_get_not_found(self, client, mock_db):
|
|
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
|
|
resp = client.get(f"/escalations/{UNKNOWN_ID}")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# =============================================================================
|
|
# Route Tests — Update Escalation
|
|
# =============================================================================
|
|
|
|
class TestUpdateEscalation:
|
|
def test_update_partial(self, client, mock_db):
|
|
existing = _make_escalation_row()
|
|
updated = _make_escalation_row({"priority": "high"})
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchone=MagicMock(return_value=existing)), # existence check
|
|
MagicMock(fetchone=MagicMock(return_value=updated)), # update RETURNING
|
|
]
|
|
resp = client.put(f"/escalations/{ESCALATION_ID}", json={"priority": "high"})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["priority"] == "high"
|
|
mock_db.commit.assert_called_once()
|
|
|
|
def test_update_not_found(self, client, mock_db):
|
|
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
|
|
resp = client.put(f"/escalations/{UNKNOWN_ID}", json={"title": "New"})
|
|
assert resp.status_code == 404
|
|
|
|
def test_update_empty_no_op(self, client, mock_db):
|
|
row = _make_escalation_row()
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchone=MagicMock(return_value=row)), # existence check
|
|
MagicMock(fetchone=MagicMock(return_value=row)), # re-fetch unchanged
|
|
]
|
|
resp = client.put(f"/escalations/{ESCALATION_ID}", json={})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["id"] == ESCALATION_ID
|
|
|
|
|
|
# =============================================================================
|
|
# Route Tests — Status Update
|
|
# =============================================================================
|
|
|
|
class TestUpdateStatus:
|
|
def test_to_in_progress(self, client, mock_db):
|
|
existing = _make_escalation_row()
|
|
updated = _make_escalation_row({"status": "in_progress"})
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchone=MagicMock(return_value=existing)),
|
|
MagicMock(fetchone=MagicMock(return_value=updated)),
|
|
]
|
|
resp = client.put(f"/escalations/{ESCALATION_ID}/status", json={"status": "in_progress"})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "in_progress"
|
|
mock_db.commit.assert_called_once()
|
|
|
|
def test_to_resolved_auto_resolved_at(self, client, mock_db):
|
|
existing = _make_escalation_row()
|
|
now = datetime(2026, 3, 6, 14, 0, 0)
|
|
updated = _make_escalation_row({"status": "resolved", "resolved_at": now})
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchone=MagicMock(return_value=existing)),
|
|
MagicMock(fetchone=MagicMock(return_value=updated)),
|
|
]
|
|
resp = client.put(f"/escalations/{ESCALATION_ID}/status", json={"status": "resolved"})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "resolved"
|
|
assert data["resolved_at"] is not None
|
|
|
|
def test_to_closed(self, client, mock_db):
|
|
existing = _make_escalation_row()
|
|
updated = _make_escalation_row({"status": "closed"})
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchone=MagicMock(return_value=existing)),
|
|
MagicMock(fetchone=MagicMock(return_value=updated)),
|
|
]
|
|
resp = client.put(f"/escalations/{ESCALATION_ID}/status", json={"status": "closed"})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "closed"
|
|
|
|
def test_status_not_found(self, client, mock_db):
|
|
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
|
|
resp = client.put(f"/escalations/{UNKNOWN_ID}/status", json={"status": "closed"})
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# =============================================================================
|
|
# Route Tests — Delete Escalation
|
|
# =============================================================================
|
|
|
|
class TestDeleteEscalation:
|
|
def test_delete_existing(self, client, mock_db):
|
|
row = _make_escalation_row()
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchone=MagicMock(return_value=row)), # existence check
|
|
None, # DELETE
|
|
]
|
|
resp = client.delete(f"/escalations/{ESCALATION_ID}")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["success"] is True
|
|
mock_db.commit.assert_called_once()
|
|
|
|
def test_delete_not_found(self, client, mock_db):
|
|
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
|
|
resp = client.delete(f"/escalations/{UNKNOWN_ID}")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# =============================================================================
|
|
# Route Tests — Stats
|
|
# =============================================================================
|
|
|
|
class TestGetStats:
|
|
def test_stats_empty(self, client, mock_db):
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchall=MagicMock(return_value=[])), # by_status
|
|
MagicMock(fetchall=MagicMock(return_value=[])), # by_priority
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(0))), # total
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(0))), # active
|
|
]
|
|
resp = client.get("/escalations/stats")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
assert data["active"] == 0
|
|
assert data["by_status"]["open"] == 0
|
|
|
|
def test_stats_with_data(self, client, mock_db):
|
|
status_rows = [_MockRow({"0": "open", "1": 3}), _MockRow({"0": "resolved", "1": 1})]
|
|
# These rows need [0] and [1] indexing
|
|
sr1 = MagicMock(); sr1.__getitem__ = lambda s, i: ["open", 3][i]
|
|
sr2 = MagicMock(); sr2.__getitem__ = lambda s, i: ["resolved", 1][i]
|
|
pr1 = MagicMock(); pr1.__getitem__ = lambda s, i: ["high", 2][i]
|
|
pr2 = MagicMock(); pr2.__getitem__ = lambda s, i: ["medium", 2][i]
|
|
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchall=MagicMock(return_value=[sr1, sr2])),
|
|
MagicMock(fetchall=MagicMock(return_value=[pr1, pr2])),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(4))),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(3))),
|
|
]
|
|
resp = client.get("/escalations/stats")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 4
|
|
assert data["active"] == 3
|
|
assert data["by_status"]["open"] == 3
|
|
assert data["by_status"]["resolved"] == 1
|
|
|
|
def test_stats_structure(self, client, mock_db):
|
|
mock_db.execute.side_effect = [
|
|
MagicMock(fetchall=MagicMock(return_value=[])),
|
|
MagicMock(fetchall=MagicMock(return_value=[])),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(0))),
|
|
MagicMock(fetchone=MagicMock(return_value=_count_row(0))),
|
|
]
|
|
resp = client.get("/escalations/stats")
|
|
data = resp.json()
|
|
assert set(data["by_status"].keys()) == {"open", "in_progress", "escalated", "resolved", "closed"}
|
|
assert set(data["by_priority"].keys()) == {"low", "medium", "high", "critical"}
|
|
assert "total" in data
|
|
assert "active" in data
|