feat: add compliance modules 2-5 (dashboard, security templates, process manager, evidence collector)
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 32s
CI/CD / test-python-backend-compliance (push) Successful in 34s
CI/CD / test-python-document-crawler (push) Successful in 23s
CI/CD / test-python-dsms-gateway (push) Successful in 21s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Successful in 2s

Module 2: Extended Compliance Dashboard with roadmap, module-status, next-actions, snapshots, score-history
Module 3: 7 German security document templates (IT-Sicherheitskonzept, Datenschutz, Backup, Logging, Incident-Response, Zugriff, Risikomanagement)
Module 4: Compliance Process Manager with CRUD, complete/skip/seed, ~50 seed tasks, 3-tab UI
Module 5: Evidence Collector Extended with automated checks, control-mapping, coverage report, 4-tab UI

Also includes: canonical control library enhancements (verification method, categories, dedup), control generator improvements, RAG client extensions

52 tests pass, frontend builds clean.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-14 21:03:04 +01:00
parent 13d13c8226
commit 49ce417428
35 changed files with 8741 additions and 422 deletions

View File

@@ -0,0 +1,209 @@
"""Tests for extended dashboard routes (roadmap, module-status, next-actions, snapshots)."""
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from fastapi import FastAPI
from datetime import datetime, date, timedelta
from decimal import Decimal
from compliance.api.dashboard_routes import router
from classroom_engine.database import get_db
from compliance.api.tenant_utils import get_tenant_id
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# =============================================================================
# Test App Setup
# =============================================================================
app = FastAPI()
app.include_router(router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
def override_tenant():
return DEFAULT_TENANT_ID
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[get_tenant_id] = override_tenant
client = TestClient(app)
# =============================================================================
# Helpers
# =============================================================================
class MockControl:
def __init__(self, id="ctrl-001", control_id="CTRL-001", title="Test Control",
status_val="planned", domain_val="gov", owner="ISB",
next_review_at=None, category="security"):
self.id = id
self.control_id = control_id
self.title = title
self.status = MagicMock(value=status_val)
self.domain = MagicMock(value=domain_val)
self.owner = owner
self.next_review_at = next_review_at
self.category = category
class MockRisk:
def __init__(self, inherent_risk_val="high", status="open"):
self.inherent_risk = MagicMock(value=inherent_risk_val)
self.status = status
def make_snapshot_row(overrides=None):
data = {
"id": "snap-001",
"tenant_id": DEFAULT_TENANT_ID,
"project_id": None,
"score": Decimal("72.50"),
"controls_total": 20,
"controls_pass": 12,
"controls_partial": 5,
"evidence_total": 10,
"evidence_valid": 8,
"risks_total": 5,
"risks_high": 2,
"snapshot_date": date(2026, 3, 14),
"created_at": datetime(2026, 3, 14),
}
if overrides:
data.update(overrides)
row = MagicMock()
row._mapping = data
return row
# =============================================================================
# Tests
# =============================================================================
class TestDashboardRoadmap:
def test_roadmap_returns_buckets(self):
"""Roadmap returns 4 buckets with controls."""
overdue = datetime.utcnow() - timedelta(days=10)
future = datetime.utcnow() + timedelta(days=30)
with patch("compliance.api.dashboard_routes.ControlRepository") as MockCtrlRepo:
instance = MockCtrlRepo.return_value
instance.get_all.return_value = [
MockControl(id="c1", status_val="planned", category="legal", next_review_at=overdue),
MockControl(id="c2", status_val="partial", category="security"),
MockControl(id="c3", status_val="planned", category="best_practice"),
MockControl(id="c4", status_val="pass"), # should be excluded
]
resp = client.get("/dashboard/roadmap")
assert resp.status_code == 200
data = resp.json()
assert "buckets" in data
assert "counts" in data
# c4 is pass, so excluded; c1 is legal+overdue → quick_wins
total_in_buckets = sum(data["counts"].values())
assert total_in_buckets == 3
def test_roadmap_empty_controls(self):
"""Roadmap with no controls returns empty buckets."""
with patch("compliance.api.dashboard_routes.ControlRepository") as MockCtrlRepo:
MockCtrlRepo.return_value.get_all.return_value = []
resp = client.get("/dashboard/roadmap")
assert resp.status_code == 200
assert all(v == 0 for v in resp.json()["counts"].values())
class TestModuleStatus:
def test_module_status_returns_modules(self):
"""Module status returns list of modules with counts."""
# Mock db.execute for each module's COUNT query
count_result = MagicMock()
count_result.fetchone.return_value = (5,)
mock_db.execute.return_value = count_result
resp = client.get("/dashboard/module-status")
assert resp.status_code == 200
data = resp.json()
assert "modules" in data
assert data["total"] > 0
assert all(m["count"] == 5 for m in data["modules"])
def test_module_status_handles_missing_tables(self):
"""Module status handles missing tables gracefully."""
mock_db.execute.side_effect = Exception("relation does not exist")
resp = client.get("/dashboard/module-status")
assert resp.status_code == 200
data = resp.json()
# All modules should have count=0 and status=not_started
assert all(m["count"] == 0 for m in data["modules"])
assert all(m["status"] == "not_started" for m in data["modules"])
mock_db.execute.side_effect = None # reset
class TestNextActions:
def test_next_actions_returns_sorted(self):
"""Next actions returns controls sorted by urgency."""
overdue = datetime.utcnow() - timedelta(days=30)
with patch("compliance.api.dashboard_routes.ControlRepository") as MockCtrlRepo:
instance = MockCtrlRepo.return_value
instance.get_all.return_value = [
MockControl(id="c1", status_val="planned", category="legal", next_review_at=overdue),
MockControl(id="c2", status_val="partial", category="best_practice"),
MockControl(id="c3", status_val="pass"), # excluded
]
resp = client.get("/dashboard/next-actions?limit=5")
assert resp.status_code == 200
data = resp.json()
assert len(data["actions"]) == 2
# c1 should be first (higher urgency due to legal + overdue)
assert data["actions"][0]["control_id"] == "CTRL-001"
class TestScoreSnapshot:
def test_create_snapshot(self):
"""Creating a snapshot saves current score."""
with patch("compliance.api.dashboard_routes.ControlRepository") as MockCtrlRepo, \
patch("compliance.api.dashboard_routes.EvidenceRepository") as MockEvRepo, \
patch("compliance.api.dashboard_routes.RiskRepository") as MockRiskRepo:
MockCtrlRepo.return_value.get_statistics.return_value = {
"total": 20, "pass": 12, "partial": 5, "by_status": {}
}
MockEvRepo.return_value.get_statistics.return_value = {
"total": 10, "by_status": {"valid": 8}
}
MockRiskRepo.return_value.get_all.return_value = [
MockRisk("high"), MockRisk("critical"), MockRisk("low")
]
snap_row = make_snapshot_row()
mock_db.execute.return_value.fetchone.return_value = snap_row
resp = client.post("/dashboard/snapshot")
assert resp.status_code == 200
data = resp.json()
assert "score" in data
def test_score_history(self):
"""Score history returns snapshots."""
rows = [make_snapshot_row({"snapshot_date": date(2026, 3, i)}) for i in range(1, 4)]
mock_db.execute.return_value.fetchall.return_value = rows
resp = client.get("/dashboard/score-history?months=3")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 3
assert len(data["snapshots"]) == 3

View File

@@ -0,0 +1,374 @@
"""Tests for Evidence Check routes (evidence_check_routes.py)."""
import json
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from datetime import datetime
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.evidence_check_routes import router, VALID_CHECK_TYPES
from classroom_engine.database import get_db
from compliance.api.tenant_utils import get_tenant_id
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(router)
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
CHECK_ID = "ffffffff-0001-0001-0001-000000000001"
RESULT_ID = "eeeeeeee-0001-0001-0001-000000000001"
MAPPING_ID = "dddddddd-0001-0001-0001-000000000001"
EVIDENCE_ID = "cccccccc-0001-0001-0001-000000000001"
NOW = datetime(2026, 3, 14, 12, 0, 0)
def override_get_tenant_id():
return DEFAULT_TENANT_ID
app.dependency_overrides[get_tenant_id] = override_get_tenant_id
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_check_row(overrides=None):
"""Create a mock DB row for a check."""
data = {
"id": CHECK_ID,
"tenant_id": DEFAULT_TENANT_ID,
"project_id": None,
"check_code": "TLS-SCAN-001",
"title": "TLS-Scan Hauptwebseite",
"description": "Prueft TLS",
"check_type": "tls_scan",
"target_url": "https://example.com",
"target_config": {},
"linked_control_ids": [],
"frequency": "monthly",
"last_run_at": None,
"next_run_at": None,
"is_active": True,
"created_at": NOW,
"updated_at": NOW,
}
if overrides:
data.update(overrides)
row = MagicMock()
row._mapping = data
row.__getitem__ = lambda self, i: list(data.values())[i]
return row
def _make_result_row(overrides=None):
"""Create a mock DB row for a result."""
data = {
"id": RESULT_ID,
"check_id": CHECK_ID,
"tenant_id": DEFAULT_TENANT_ID,
"run_status": "passed",
"result_data": {"tls_version": "TLSv1.3"},
"summary": "TLS TLSv1.3",
"findings_count": 0,
"critical_findings": 0,
"evidence_id": None,
"duration_ms": 150,
"run_at": NOW,
}
if overrides:
data.update(overrides)
row = MagicMock()
row._mapping = data
row.__getitem__ = lambda self, i: list(data.values())[i]
return row
def _make_mapping_row(overrides=None):
data = {
"id": MAPPING_ID,
"tenant_id": DEFAULT_TENANT_ID,
"evidence_id": EVIDENCE_ID,
"control_code": "TOM-001",
"mapping_type": "supports",
"verified_at": None,
"verified_by": None,
"notes": "Test mapping",
"created_at": NOW,
}
if overrides:
data.update(overrides)
row = MagicMock()
row._mapping = data
row.__getitem__ = lambda self, i: list(data.values())[i]
return row
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestListChecks:
def test_list_checks(self):
mock_db = MagicMock()
# COUNT query
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 2
# Data rows
rows = [_make_check_row(), _make_check_row({"check_code": "TLS-SCAN-002"})]
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=count_row)),
MagicMock(fetchall=MagicMock(return_value=rows)),
]
app.dependency_overrides[get_db] = lambda: (yield mock_db).__next__() or mock_db
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
resp = client.get("/evidence-checks")
assert resp.status_code == 200
data = resp.json()
assert "checks" in data
assert len(data["checks"]) == 2
class TestCreateCheck:
def test_create_check(self):
mock_db = MagicMock()
mock_db.execute.return_value.fetchone.return_value = _make_check_row()
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
resp = client.post("/evidence-checks", json={
"check_code": "TLS-SCAN-001",
"title": "TLS-Scan Hauptwebseite",
"check_type": "tls_scan",
"frequency": "monthly",
})
assert resp.status_code == 201
data = resp.json()
assert data["check_code"] == "TLS-SCAN-001"
def test_create_check_invalid_type(self):
mock_db = MagicMock()
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
resp = client.post("/evidence-checks", json={
"check_code": "INVALID-001",
"title": "Invalid Check",
"check_type": "invalid_type",
})
assert resp.status_code == 400
assert "Ungueltiger check_type" in resp.json()["detail"]
class TestGetSingleCheck:
def test_get_single_check(self):
mock_db = MagicMock()
check_row = _make_check_row()
result_rows = [_make_result_row()]
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=check_row)),
MagicMock(fetchall=MagicMock(return_value=result_rows)),
]
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
resp = client.get(f"/evidence-checks/{CHECK_ID}")
assert resp.status_code == 200
data = resp.json()
assert data["check_code"] == "TLS-SCAN-001"
assert "recent_results" in data
assert len(data["recent_results"]) == 1
class TestUpdateCheck:
def test_update_check(self):
mock_db = MagicMock()
updated_row = _make_check_row({"title": "Updated Title"})
mock_db.execute.return_value.fetchone.return_value = updated_row
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
resp = client.put(f"/evidence-checks/{CHECK_ID}", json={
"title": "Updated Title",
})
assert resp.status_code == 200
assert resp.json()["title"] == "Updated Title"
class TestDeleteCheck:
def test_delete_check(self):
mock_db = MagicMock()
mock_db.execute.return_value.rowcount = 1
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
resp = client.delete(f"/evidence-checks/{CHECK_ID}")
assert resp.status_code == 204
class TestRunCheckTLS:
def test_run_check_tls(self):
mock_db = MagicMock()
check_row = _make_check_row()
result_insert_row = _make_result_row({"run_status": "running"})
result_update_row = _make_result_row({"run_status": "passed"})
mock_db.execute.side_effect = [
# Load check
MagicMock(fetchone=MagicMock(return_value=check_row)),
# Insert running result
MagicMock(fetchone=MagicMock(return_value=result_insert_row)),
# Update result
MagicMock(fetchone=MagicMock(return_value=result_update_row)),
# Update check timestamps
MagicMock(),
]
mock_db.commit = MagicMock()
tls_result = {
"run_status": "passed",
"result_data": {"tls_version": "TLSv1.3", "findings": []},
"summary": "TLS TLSv1.3, Zertifikat gueltig",
"findings_count": 0,
"critical_findings": 0,
"duration_ms": 100,
}
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
with patch("compliance.api.evidence_check_routes._run_tls_scan", new_callable=AsyncMock, return_value=tls_result):
resp = client.post(f"/evidence-checks/{CHECK_ID}/run")
assert resp.status_code == 200
data = resp.json()
assert data["run_status"] == "passed"
class TestRunCheckHeader:
def test_run_check_header(self):
mock_db = MagicMock()
check_row = _make_check_row({"check_type": "header_check"})
result_insert_row = _make_result_row({"run_status": "running"})
result_update_row = _make_result_row({"run_status": "warning"})
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=check_row)),
MagicMock(fetchone=MagicMock(return_value=result_insert_row)),
MagicMock(fetchone=MagicMock(return_value=result_update_row)),
MagicMock(),
]
mock_db.commit = MagicMock()
header_result = {
"run_status": "warning",
"result_data": {"missing_headers": ["Permissions-Policy"], "findings": []},
"summary": "5/6 Security-Header vorhanden",
"findings_count": 1,
"critical_findings": 0,
"duration_ms": 200,
}
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
with patch("compliance.api.evidence_check_routes._run_header_check", new_callable=AsyncMock, return_value=header_result):
resp = client.post(f"/evidence-checks/{CHECK_ID}/run")
assert resp.status_code == 200
data = resp.json()
assert data["run_status"] == "warning"
class TestSeedChecks:
def test_seed_checks(self):
mock_db = MagicMock()
# Each seed INSERT returns rowcount=1
mock_result = MagicMock()
mock_result.rowcount = 1
mock_db.execute.return_value = mock_result
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
resp = client.post("/evidence-checks/seed")
assert resp.status_code == 200
data = resp.json()
assert data["total_definitions"] == 15
assert data["seeded"] == 15
class TestMappingsCRUD:
def test_mappings_crud(self):
mock_db = MagicMock()
def override_db():
yield mock_db
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
# Create mapping
mapping_row = _make_mapping_row()
mock_db.execute.return_value.fetchone.return_value = mapping_row
resp = client.post("/evidence-checks/mappings", json={
"evidence_id": EVIDENCE_ID,
"control_code": "TOM-001",
"mapping_type": "supports",
"notes": "Test mapping",
})
assert resp.status_code == 201
data = resp.json()
assert data["control_code"] == "TOM-001"
# List mappings
mock_db.execute.return_value.fetchall.return_value = [mapping_row]
resp = client.get("/evidence-checks/mappings")
assert resp.status_code == 200
assert "mappings" in resp.json()
# Delete mapping
mock_db.execute.return_value.rowcount = 1
resp = client.delete(f"/evidence-checks/mappings/{MAPPING_ID}")
assert resp.status_code == 204

View File

@@ -0,0 +1,525 @@
"""Tests for compliance process task routes (process_task_routes.py)."""
import pytest
from unittest.mock import MagicMock, patch, call
from datetime import datetime, date, timedelta
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.process_task_routes import (
router,
ProcessTaskCreate,
ProcessTaskUpdate,
ProcessTaskComplete,
ProcessTaskSkip,
VALID_CATEGORIES,
VALID_FREQUENCIES,
VALID_PRIORITIES,
VALID_STATUSES,
FREQUENCY_DAYS,
)
from classroom_engine.database import get_db
from compliance.api.tenant_utils import get_tenant_id
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
TASK_ID = "ffffffff-0001-0001-0001-000000000001"
app = FastAPI()
app.include_router(router)
# ---------------------------------------------------------------------------
# Mock helpers
# ---------------------------------------------------------------------------
class _MockRow:
"""Simulates a SQLAlchemy row with _mapping attribute."""
def __init__(self, data: dict):
self._mapping = data
def __getitem__(self, idx):
vals = list(self._mapping.values())
return vals[idx]
def _make_task_row(overrides=None):
now = datetime(2026, 3, 14, 12, 0, 0)
data = {
"id": TASK_ID,
"tenant_id": DEFAULT_TENANT_ID,
"project_id": None,
"task_code": "DSGVO-VVT-REVIEW",
"title": "VVT-Review und Aktualisierung",
"description": "Jaehrliche Ueberpruefung des VVT.",
"category": "dsgvo",
"priority": "high",
"frequency": "yearly",
"assigned_to": None,
"responsible_team": None,
"linked_control_ids": [],
"linked_module": "vvt",
"last_completed_at": None,
"next_due_date": date(2027, 3, 14),
"due_reminder_days": 14,
"status": "pending",
"completion_date": None,
"completion_result": None,
"completion_evidence_id": None,
"follow_up_actions": [],
"is_seed": False,
"notes": None,
"tags": [],
"created_at": now,
"updated_at": now,
}
if overrides:
data.update(overrides)
return _MockRow(data)
def _make_history_row(overrides=None):
now = datetime(2026, 3, 14, 12, 0, 0)
data = {
"id": "eeeeeeee-0001-0001-0001-000000000001",
"task_id": TASK_ID,
"completed_by": "admin",
"completed_at": now,
"result": "Alles in Ordnung",
"evidence_id": None,
"notes": "Keine Auffaelligkeiten",
"status": "completed",
}
if overrides:
data.update(overrides)
return _MockRow(data)
def _count_row(val):
"""Simulates a COUNT(*) row — fetchone()[0] returns the value."""
row = MagicMock()
row.__getitem__ = lambda self, idx: val
return row
@pytest.fixture
def mock_db():
db = MagicMock()
app.dependency_overrides[get_db] = lambda: db
yield db
app.dependency_overrides.pop(get_db, None)
@pytest.fixture
def client(mock_db):
return TestClient(app)
# =============================================================================
# Test 1: List Tasks
# =============================================================================
class TestListTasks:
def test_list_tasks(self, client, mock_db):
"""List tasks returns items and total."""
row = _make_task_row()
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=_count_row(1))),
MagicMock(fetchall=MagicMock(return_value=[row])),
]
resp = client.get("/process-tasks")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert len(data["tasks"]) == 1
assert data["tasks"][0]["id"] == TASK_ID
assert data["tasks"][0]["task_code"] == "DSGVO-VVT-REVIEW"
def test_list_tasks_empty(self, client, mock_db):
"""List tasks returns empty when no tasks."""
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=_count_row(0))),
MagicMock(fetchall=MagicMock(return_value=[])),
]
resp = client.get("/process-tasks")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["tasks"] == []
# =============================================================================
# Test 2: List Tasks with Filters
# =============================================================================
class TestListTasksWithFilters:
def test_list_tasks_with_filters(self, client, mock_db):
"""Filter by status and category."""
row = _make_task_row({"status": "overdue", "category": "nis2"})
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=_count_row(1))),
MagicMock(fetchall=MagicMock(return_value=[row])),
]
resp = client.get("/process-tasks?status=overdue&category=nis2")
assert resp.status_code == 200
data = resp.json()
assert data["tasks"][0]["status"] == "overdue"
assert data["tasks"][0]["category"] == "nis2"
def test_list_tasks_overdue_filter(self, client, mock_db):
"""Filter overdue=true adds date condition."""
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=_count_row(0))),
MagicMock(fetchall=MagicMock(return_value=[])),
]
resp = client.get("/process-tasks?overdue=true")
assert resp.status_code == 200
# Verify the SQL was called (mock_db.execute called twice: count + select)
assert mock_db.execute.call_count == 2
# =============================================================================
# Test 3: Get Stats
# =============================================================================
class TestGetStats:
def test_get_stats(self, client, mock_db):
"""Verify stat counts structure."""
stats_row = MagicMock()
stats_row._mapping = {
"total": 50,
"pending": 20,
"in_progress": 5,
"completed": 15,
"overdue": 8,
"skipped": 2,
"overdue_count": 8,
"due_7_days": 3,
"due_14_days": 7,
"due_30_days": 12,
}
cat_row1 = MagicMock()
cat_row1._mapping = {"category": "dsgvo", "cnt": 15}
cat_row2 = MagicMock()
cat_row2._mapping = {"category": "nis2", "cnt": 10}
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=stats_row)),
MagicMock(fetchall=MagicMock(return_value=[cat_row1, cat_row2])),
]
resp = client.get("/process-tasks/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 50
assert data["by_status"]["pending"] == 20
assert data["by_status"]["completed"] == 15
assert data["overdue_count"] == 8
assert data["due_7_days"] == 3
assert data["due_30_days"] == 12
assert data["by_category"]["dsgvo"] == 15
assert data["by_category"]["nis2"] == 10
def test_get_stats_empty(self, client, mock_db):
"""Stats with no tasks returns zeros."""
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=None)),
MagicMock(fetchall=MagicMock(return_value=[])),
]
resp = client.get("/process-tasks/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["by_category"] == {}
# =============================================================================
# Test 4: Create Task
# =============================================================================
class TestCreateTask:
def test_create_task(self, client, mock_db):
"""Create a valid task returns 201."""
row = _make_task_row()
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=row))
resp = client.post("/process-tasks", json={
"task_code": "DSGVO-VVT-REVIEW",
"title": "VVT-Review und Aktualisierung",
"category": "dsgvo",
"priority": "high",
"frequency": "yearly",
})
assert resp.status_code == 201
data = resp.json()
assert data["id"] == TASK_ID
assert data["task_code"] == "DSGVO-VVT-REVIEW"
mock_db.commit.assert_called_once()
# =============================================================================
# Test 5: Create Task Invalid Category
# =============================================================================
class TestCreateTaskInvalidCategory:
def test_create_task_invalid_category(self, client, mock_db):
"""Invalid category returns 400."""
resp = client.post("/process-tasks", json={
"task_code": "TEST-001",
"title": "Test",
"category": "invalid_category",
})
assert resp.status_code == 400
assert "Invalid category" in resp.json()["detail"]
def test_create_task_invalid_priority(self, client, mock_db):
"""Invalid priority returns 400."""
resp = client.post("/process-tasks", json={
"task_code": "TEST-001",
"title": "Test",
"category": "dsgvo",
"priority": "super_high",
})
assert resp.status_code == 400
assert "Invalid priority" in resp.json()["detail"]
def test_create_task_invalid_frequency(self, client, mock_db):
"""Invalid frequency returns 400."""
resp = client.post("/process-tasks", json={
"task_code": "TEST-001",
"title": "Test",
"category": "dsgvo",
"frequency": "biweekly",
})
assert resp.status_code == 400
assert "Invalid frequency" in resp.json()["detail"]
# =============================================================================
# Test 6: Get Single Task
# =============================================================================
class TestGetSingleTask:
def test_get_single_task(self, client, mock_db):
"""Get existing task by ID."""
row = _make_task_row()
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=row))
resp = client.get(f"/process-tasks/{TASK_ID}")
assert resp.status_code == 200
assert resp.json()["id"] == TASK_ID
def test_get_task_not_found(self, client, mock_db):
"""Get non-existent task returns 404."""
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
resp = client.get("/process-tasks/nonexistent-id")
assert resp.status_code == 404
# =============================================================================
# Test 7: Complete Task
# =============================================================================
class TestCompleteTask:
def test_complete_task(self, client, mock_db):
"""Complete a task: verify history insert and next_due recalculation."""
task_row = _make_task_row({"frequency": "quarterly"})
updated_row = _make_task_row({
"frequency": "quarterly",
"status": "pending",
"last_completed_at": datetime(2026, 3, 14, 12, 0, 0),
"next_due_date": date(2026, 6, 12),
})
# First call: SELECT task, Second: INSERT history, Third: UPDATE task
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=task_row)),
MagicMock(), # history INSERT
MagicMock(fetchone=MagicMock(return_value=updated_row)),
]
resp = client.post(f"/process-tasks/{TASK_ID}/complete", json={
"completed_by": "admin",
"result": "Alles geprueft",
"notes": "Keine Auffaelligkeiten",
})
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "pending" # Reset for recurring
mock_db.commit.assert_called_once()
def test_complete_once_task(self, client, mock_db):
"""Complete a one-time task stays completed."""
task_row = _make_task_row({"frequency": "once"})
updated_row = _make_task_row({
"frequency": "once",
"status": "completed",
"next_due_date": None,
})
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=task_row)),
MagicMock(),
MagicMock(fetchone=MagicMock(return_value=updated_row)),
]
resp = client.post(f"/process-tasks/{TASK_ID}/complete", json={
"completed_by": "admin",
})
assert resp.status_code == 200
assert resp.json()["status"] == "completed"
def test_complete_task_not_found(self, client, mock_db):
"""Complete non-existent task returns 404."""
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
resp = client.post("/process-tasks/nonexistent-id/complete", json={
"completed_by": "admin",
})
assert resp.status_code == 404
# =============================================================================
# Test 8: Skip Task
# =============================================================================
class TestSkipTask:
def test_skip_task(self, client, mock_db):
"""Skip task with reason, verify next_due recalculation."""
task_row = _make_task_row({"frequency": "monthly"})
updated_row = _make_task_row({
"frequency": "monthly",
"status": "pending",
"next_due_date": date(2026, 4, 13),
})
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=task_row)),
MagicMock(), # history INSERT
MagicMock(fetchone=MagicMock(return_value=updated_row)),
]
resp = client.post(f"/process-tasks/{TASK_ID}/skip", json={
"reason": "Kein Handlungsbedarf diesen Monat",
})
assert resp.status_code == 200
assert resp.json()["status"] == "pending"
mock_db.commit.assert_called_once()
def test_skip_task_not_found(self, client, mock_db):
"""Skip non-existent task returns 404."""
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
resp = client.post("/process-tasks/nonexistent-id/skip", json={
"reason": "Test",
})
assert resp.status_code == 404
# =============================================================================
# Test 9: Seed Idempotent
# =============================================================================
class TestSeedIdempotent:
def test_seed_idempotent(self, client, mock_db):
"""Seed twice — ON CONFLICT ensures idempotency."""
# First seed: all inserted (rowcount=1 for each)
mock_result = MagicMock()
mock_result.rowcount = 1
mock_db.execute.return_value = mock_result
resp = client.post("/process-tasks/seed")
assert resp.status_code == 200
data = resp.json()
assert data["seeded"] == data["total_available"]
assert data["total_available"] == 50
mock_db.commit.assert_called_once()
def test_seed_second_time_no_inserts(self, client, mock_db):
"""Second seed inserts nothing (ON CONFLICT DO NOTHING)."""
mock_result = MagicMock()
mock_result.rowcount = 0
mock_db.execute.return_value = mock_result
resp = client.post("/process-tasks/seed")
assert resp.status_code == 200
data = resp.json()
assert data["seeded"] == 0
assert data["total_available"] == 50
# =============================================================================
# Test 10: Get History
# =============================================================================
class TestGetHistory:
def test_get_history(self, client, mock_db):
"""Return history entries for a task."""
task_id_row = _MockRow({"id": TASK_ID})
history_row = _make_history_row()
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=task_id_row)),
MagicMock(fetchall=MagicMock(return_value=[history_row])),
]
resp = client.get(f"/process-tasks/{TASK_ID}/history")
assert resp.status_code == 200
data = resp.json()
assert len(data["history"]) == 1
assert data["history"][0]["task_id"] == TASK_ID
assert data["history"][0]["status"] == "completed"
assert data["history"][0]["completed_by"] == "admin"
def test_get_history_task_not_found(self, client, mock_db):
"""History for non-existent task returns 404."""
mock_db.execute.return_value = MagicMock(fetchone=MagicMock(return_value=None))
resp = client.get("/process-tasks/nonexistent-id/history")
assert resp.status_code == 404
def test_get_history_empty(self, client, mock_db):
"""Task with no history returns empty list."""
task_id_row = _MockRow({"id": TASK_ID})
mock_db.execute.side_effect = [
MagicMock(fetchone=MagicMock(return_value=task_id_row)),
MagicMock(fetchall=MagicMock(return_value=[])),
]
resp = client.get(f"/process-tasks/{TASK_ID}/history")
assert resp.status_code == 200
assert resp.json()["history"] == []
# =============================================================================
# Constant / Schema Validation Tests
# =============================================================================
class TestConstants:
def test_valid_categories(self):
assert VALID_CATEGORIES == {"dsgvo", "nis2", "bsi", "iso27001", "ai_act", "internal"}
def test_valid_frequencies(self):
assert VALID_FREQUENCIES == {"weekly", "monthly", "quarterly", "semi_annual", "yearly", "once"}
def test_valid_priorities(self):
assert VALID_PRIORITIES == {"critical", "high", "medium", "low"}
def test_valid_statuses(self):
assert VALID_STATUSES == {"pending", "in_progress", "completed", "overdue", "skipped"}
def test_frequency_days_mapping(self):
assert FREQUENCY_DAYS["weekly"] == 7
assert FREQUENCY_DAYS["monthly"] == 30
assert FREQUENCY_DAYS["quarterly"] == 90
assert FREQUENCY_DAYS["semi_annual"] == 182
assert FREQUENCY_DAYS["yearly"] == 365
assert FREQUENCY_DAYS["once"] is None
class TestDeleteTask:
def test_delete_existing(self, client, mock_db):
mock_db.execute.return_value = MagicMock(rowcount=1)
resp = client.delete(f"/process-tasks/{TASK_ID}")
assert resp.status_code == 204
mock_db.commit.assert_called_once()
def test_delete_not_found(self, client, mock_db):
mock_db.execute.return_value = MagicMock(rowcount=0)
resp = client.delete(f"/process-tasks/{TASK_ID}")
assert resp.status_code == 404

View File

@@ -0,0 +1,175 @@
"""Tests for security document templates (Module 3)."""
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from fastapi import FastAPI
from datetime import datetime
from compliance.api.legal_template_routes import router
from classroom_engine.database import get_db
from compliance.api.tenant_utils import get_tenant_id
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# =============================================================================
# Test App Setup
# =============================================================================
app = FastAPI()
app.include_router(router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
def override_tenant():
return DEFAULT_TENANT_ID
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[get_tenant_id] = override_tenant
client = TestClient(app)
SECURITY_TEMPLATE_TYPES = [
"it_security_concept",
"data_protection_concept",
"backup_recovery_concept",
"logging_concept",
"incident_response_plan",
"access_control_concept",
"risk_management_concept",
]
# =============================================================================
# Helpers
# =============================================================================
def make_template_row(doc_type, title="Test Template", content="# Test"):
row = MagicMock()
row._mapping = {
"id": "tmpl-001",
"tenant_id": DEFAULT_TENANT_ID,
"document_type": doc_type,
"title": title,
"description": f"Test {doc_type}",
"content": content,
"placeholders": ["COMPANY_NAME", "ISB_NAME"],
"language": "de",
"jurisdiction": "DE",
"status": "published",
"license_id": None,
"license_name": None,
"source_name": None,
"inspiration_sources": [],
"created_at": datetime(2026, 3, 14),
"updated_at": datetime(2026, 3, 14),
}
return row
# =============================================================================
# Tests
# =============================================================================
class TestSecurityTemplateTypes:
"""Verify the 7 security template types are accepted by the API."""
def test_all_security_types_in_valid_set(self):
"""All 7 security template types are in VALID_DOCUMENT_TYPES."""
from compliance.api.legal_template_routes import VALID_DOCUMENT_TYPES
for doc_type in SECURITY_TEMPLATE_TYPES:
assert doc_type in VALID_DOCUMENT_TYPES, (
f"{doc_type} not in VALID_DOCUMENT_TYPES"
)
def test_security_template_count(self):
"""There are exactly 7 security template types."""
assert len(SECURITY_TEMPLATE_TYPES) == 7
def test_create_security_template_accepted(self):
"""Creating a template with a security type is accepted (not 400)."""
insert_row = MagicMock()
insert_row._mapping = {
"id": "new-tmpl",
"tenant_id": DEFAULT_TENANT_ID,
"document_type": "it_security_concept",
"title": "IT-Sicherheitskonzept",
"description": "Test",
"content": "# IT-Sicherheitskonzept",
"placeholders": [],
"language": "de",
"jurisdiction": "DE",
"status": "draft",
"license_id": None,
"license_name": None,
"source_name": None,
"inspiration_sources": [],
"created_at": datetime(2026, 3, 14),
"updated_at": datetime(2026, 3, 14),
}
mock_db.execute.return_value.fetchone.return_value = insert_row
mock_db.commit = MagicMock()
resp = client.post("/legal-templates", json={
"document_type": "it_security_concept",
"title": "IT-Sicherheitskonzept",
"content": "# IT-Sicherheitskonzept\n\n## 1. Managementzusammenfassung",
"language": "de",
"jurisdiction": "DE",
})
# Should NOT be 400 (invalid type)
assert resp.status_code != 400 or "Invalid document_type" not in resp.text
def test_invalid_type_rejected(self):
"""A non-existent template type is rejected with 400."""
resp = client.post("/legal-templates", json={
"document_type": "nonexistent_type",
"title": "Test",
"content": "# Test",
})
assert resp.status_code == 400
assert "Invalid document_type" in resp.json()["detail"]
class TestSecurityTemplateFilter:
"""Verify filtering templates by security document types."""
def test_filter_by_security_type(self):
"""GET /legal-templates?document_type=it_security_concept returns matching templates."""
row = make_template_row("it_security_concept", "IT-Sicherheitskonzept")
mock_db.execute.return_value.fetchall.return_value = [row]
resp = client.get("/legal-templates?document_type=it_security_concept")
assert resp.status_code == 200
data = resp.json()
assert "templates" in data or isinstance(data, list)
class TestSecurityTemplatePlaceholders:
"""Verify placeholder structure for security templates."""
def test_common_placeholders_present(self):
"""Security templates should use standard placeholders."""
common_placeholders = [
"COMPANY_NAME", "GF_NAME", "ISB_NAME",
"DOCUMENT_VERSION", "VERSION_DATE", "NEXT_REVIEW_DATE",
]
row = make_template_row(
"it_security_concept",
content="# IT-Sicherheitskonzept\n{{COMPANY_NAME}} {{ISB_NAME}}"
)
row._mapping["placeholders"] = common_placeholders
mock_db.execute.return_value.fetchone.return_value = row
# Verify the mock has all expected placeholders
assert all(
p in row._mapping["placeholders"]
for p in ["COMPANY_NAME", "GF_NAME", "ISB_NAME"]
)