Phase A: TOM document HTML generator (12 sections, inline CSS, A4 print) Phase B: TOMDocumentTab component (org-header form, revisions, print/download) Phase C: 11 compliance checks with severity-weighted scoring Phase D: MkDocs documentation for TOM module Phase E: 25 new controls (63 → 88) in 13 categories Canonical Control Mapping (three-layer architecture): - Migration 068: tom_control_mappings + tom_control_sync_state tables - 6 API endpoints: sync, list, by-tom, stats, manual add, delete - Category mapping: 13 TOM categories → 17 canonical categories - Frontend: sync button + coverage card (Overview), drill-down (Editor), belegende Controls count (Document) - 20 tests (unit + API with mocked DB) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
275 lines
9.9 KiB
Python
275 lines
9.9 KiB
Python
"""
|
|
Tests for TOM ↔ Canonical Control Mapping Routes.
|
|
|
|
Tests the three-layer architecture:
|
|
TOM Measures → Mapping Bridge → Canonical Controls
|
|
"""
|
|
|
|
import uuid
|
|
import pytest
|
|
from unittest.mock import MagicMock, patch
|
|
from fastapi.testclient import TestClient
|
|
|
|
from compliance.api.tom_mapping_routes import (
|
|
router,
|
|
TOM_TO_CANONICAL_CATEGORIES,
|
|
_compute_profile_hash,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# FIXTURES
|
|
# =============================================================================
|
|
|
|
@pytest.fixture
|
|
def app():
|
|
"""Create a test FastAPI app with the TOM mapping router."""
|
|
from fastapi import FastAPI
|
|
app = FastAPI()
|
|
app.include_router(router)
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
return TestClient(app)
|
|
|
|
|
|
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
|
PROJECT_ID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
|
HEADERS = {"X-Tenant-ID": TENANT_ID}
|
|
|
|
|
|
# =============================================================================
|
|
# UNIT TESTS
|
|
# =============================================================================
|
|
|
|
class TestCategoryMapping:
|
|
"""Test the TOM → Canonical category mapping dictionary."""
|
|
|
|
def test_all_13_tom_categories_mapped(self):
|
|
expected = {
|
|
"ACCESS_CONTROL", "ADMISSION_CONTROL", "ACCESS_AUTHORIZATION",
|
|
"TRANSFER_CONTROL", "INPUT_CONTROL", "ORDER_CONTROL",
|
|
"AVAILABILITY", "SEPARATION", "ENCRYPTION", "PSEUDONYMIZATION",
|
|
"RESILIENCE", "RECOVERY", "REVIEW",
|
|
}
|
|
assert set(TOM_TO_CANONICAL_CATEGORIES.keys()) == expected
|
|
|
|
def test_each_category_has_at_least_one_canonical(self):
|
|
for tom_cat, canonical_cats in TOM_TO_CANONICAL_CATEGORIES.items():
|
|
assert len(canonical_cats) >= 1, f"{tom_cat} has no canonical categories"
|
|
|
|
def test_canonical_categories_are_valid(self):
|
|
"""All referenced canonical categories must exist in the DB seed (migration 047)."""
|
|
valid_canonical = {
|
|
"encryption", "authentication", "network", "data_protection",
|
|
"logging", "incident", "continuity", "compliance", "supply_chain",
|
|
"physical", "personnel", "application", "system", "risk",
|
|
"governance", "hardware", "identity",
|
|
}
|
|
for tom_cat, canonical_cats in TOM_TO_CANONICAL_CATEGORIES.items():
|
|
for cc in canonical_cats:
|
|
assert cc in valid_canonical, f"Invalid canonical category '{cc}' in {tom_cat}"
|
|
|
|
|
|
class TestProfileHash:
|
|
"""Test profile hash computation."""
|
|
|
|
def test_same_input_same_hash(self):
|
|
h1 = _compute_profile_hash("Telekommunikation", "medium")
|
|
h2 = _compute_profile_hash("Telekommunikation", "medium")
|
|
assert h1 == h2
|
|
|
|
def test_different_input_different_hash(self):
|
|
h1 = _compute_profile_hash("Telekommunikation", "medium")
|
|
h2 = _compute_profile_hash("Gesundheitswesen", "large")
|
|
assert h1 != h2
|
|
|
|
def test_none_values_produce_hash(self):
|
|
h = _compute_profile_hash(None, None)
|
|
assert len(h) == 16
|
|
|
|
def test_hash_is_16_chars(self):
|
|
h = _compute_profile_hash("test", "small")
|
|
assert len(h) == 16
|
|
|
|
|
|
# =============================================================================
|
|
# API ENDPOINT TESTS (with mocked DB)
|
|
# =============================================================================
|
|
|
|
class TestSyncEndpoint:
|
|
"""Test POST /tom-mappings/sync."""
|
|
|
|
def test_sync_requires_tenant_header(self, client):
|
|
resp = client.post("/tom-mappings/sync", json={"industry": "IT"})
|
|
assert resp.status_code == 400
|
|
assert "X-Tenant-ID" in resp.json()["detail"]
|
|
|
|
@patch("compliance.api.tom_mapping_routes.SessionLocal")
|
|
def test_sync_unchanged_profile_skips(self, mock_session_cls, client):
|
|
"""When profile hash matches, sync should return 'unchanged'."""
|
|
mock_db = MagicMock()
|
|
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
|
|
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
profile_hash = _compute_profile_hash("IT", "medium")
|
|
mock_row = MagicMock()
|
|
mock_row.profile_hash = profile_hash
|
|
mock_db.execute.return_value.fetchone.return_value = mock_row
|
|
|
|
resp = client.post(
|
|
"/tom-mappings/sync",
|
|
json={"industry": "IT", "company_size": "medium"},
|
|
headers=HEADERS,
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "unchanged"
|
|
|
|
@patch("compliance.api.tom_mapping_routes.SessionLocal")
|
|
def test_sync_force_ignores_hash(self, mock_session_cls, client):
|
|
"""force=True should sync even if hash matches."""
|
|
mock_db = MagicMock()
|
|
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
|
|
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
# Return empty results for canonical control queries
|
|
mock_db.execute.return_value.fetchall.return_value = []
|
|
mock_db.execute.return_value.fetchone.return_value = None
|
|
|
|
resp = client.post(
|
|
"/tom-mappings/sync",
|
|
json={"industry": "IT", "company_size": "medium", "force": True},
|
|
headers=HEADERS,
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "synced"
|
|
|
|
|
|
class TestListEndpoint:
|
|
"""Test GET /tom-mappings."""
|
|
|
|
def test_list_requires_tenant_header(self, client):
|
|
resp = client.get("/tom-mappings")
|
|
assert resp.status_code == 400
|
|
|
|
@patch("compliance.api.tom_mapping_routes.SessionLocal")
|
|
def test_list_returns_mappings(self, mock_session_cls, client):
|
|
mock_db = MagicMock()
|
|
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
|
|
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
mock_db.execute.return_value.fetchall.return_value = []
|
|
mock_db.execute.return_value.scalar.return_value = 0
|
|
|
|
resp = client.get("/tom-mappings", headers=HEADERS)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "mappings" in data
|
|
assert "total" in data
|
|
|
|
|
|
class TestByTomEndpoint:
|
|
"""Test GET /tom-mappings/by-tom/{code}."""
|
|
|
|
def test_by_tom_requires_tenant_header(self, client):
|
|
resp = client.get("/tom-mappings/by-tom/ENCRYPTION")
|
|
assert resp.status_code == 400
|
|
|
|
@patch("compliance.api.tom_mapping_routes.SessionLocal")
|
|
def test_by_tom_returns_mappings(self, mock_session_cls, client):
|
|
mock_db = MagicMock()
|
|
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
|
|
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
mock_db.execute.return_value.fetchall.return_value = []
|
|
|
|
resp = client.get("/tom-mappings/by-tom/ENCRYPTION", headers=HEADERS)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["tom_code"] == "ENCRYPTION"
|
|
assert "mappings" in data
|
|
|
|
|
|
class TestStatsEndpoint:
|
|
"""Test GET /tom-mappings/stats."""
|
|
|
|
def test_stats_requires_tenant_header(self, client):
|
|
resp = client.get("/tom-mappings/stats")
|
|
assert resp.status_code == 400
|
|
|
|
@patch("compliance.api.tom_mapping_routes.SessionLocal")
|
|
def test_stats_returns_structure(self, mock_session_cls, client):
|
|
mock_db = MagicMock()
|
|
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
|
|
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
mock_db.execute.return_value.fetchone.return_value = None
|
|
mock_db.execute.return_value.fetchall.return_value = []
|
|
mock_db.execute.return_value.scalar.return_value = 0
|
|
|
|
resp = client.get("/tom-mappings/stats", headers=HEADERS)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "sync_state" in data
|
|
assert "category_breakdown" in data
|
|
assert "total_canonical_controls_available" in data
|
|
|
|
|
|
class TestManualMappingEndpoint:
|
|
"""Test POST /tom-mappings/manual."""
|
|
|
|
def test_manual_requires_tenant_header(self, client):
|
|
resp = client.post("/tom-mappings/manual", json={
|
|
"tom_control_code": "TOM-ENC-01",
|
|
"tom_category": "ENCRYPTION",
|
|
"canonical_control_id": str(uuid.uuid4()),
|
|
"canonical_control_code": "CRYP-001",
|
|
})
|
|
assert resp.status_code == 400
|
|
|
|
@patch("compliance.api.tom_mapping_routes.SessionLocal")
|
|
def test_manual_404_if_canonical_not_found(self, mock_session_cls, client):
|
|
mock_db = MagicMock()
|
|
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
|
|
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
mock_db.execute.return_value.fetchone.return_value = None
|
|
|
|
resp = client.post(
|
|
"/tom-mappings/manual",
|
|
json={
|
|
"tom_control_code": "TOM-ENC-01",
|
|
"tom_category": "ENCRYPTION",
|
|
"canonical_control_id": str(uuid.uuid4()),
|
|
"canonical_control_code": "CRYP-001",
|
|
},
|
|
headers=HEADERS,
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
class TestDeleteMappingEndpoint:
|
|
"""Test DELETE /tom-mappings/{id}."""
|
|
|
|
def test_delete_requires_tenant_header(self, client):
|
|
resp = client.delete(f"/tom-mappings/{uuid.uuid4()}")
|
|
assert resp.status_code == 400
|
|
|
|
@patch("compliance.api.tom_mapping_routes.SessionLocal")
|
|
def test_delete_404_if_not_found(self, mock_session_cls, client):
|
|
mock_db = MagicMock()
|
|
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
|
|
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
mock_db.execute.return_value.rowcount = 0
|
|
|
|
resp = client.delete(
|
|
f"/tom-mappings/{uuid.uuid4()}",
|
|
headers=HEADERS,
|
|
)
|
|
assert resp.status_code == 404
|