Files
breakpilot-compliance/backend-compliance/tests/test_tom_mapping_routes.py
Benjamin Admin 4b1eede45b feat(tom): audit document, compliance checks, 25 controls, canonical control mapping
Phase A: TOM document HTML generator (12 sections, inline CSS, A4 print)
Phase B: TOMDocumentTab component (org-header form, revisions, print/download)
Phase C: 11 compliance checks with severity-weighted scoring
Phase D: MkDocs documentation for TOM module
Phase E: 25 new controls (63 → 88) in 13 categories

Canonical Control Mapping (three-layer architecture):
- Migration 068: tom_control_mappings + tom_control_sync_state tables
- 6 API endpoints: sync, list, by-tom, stats, manual add, delete
- Category mapping: 13 TOM categories → 17 canonical categories
- Frontend: sync button + coverage card (Overview), drill-down (Editor),
  belegende Controls count (Document)
- 20 tests (unit + API with mocked DB)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 11:56:53 +01:00

275 lines
9.9 KiB
Python

"""
Tests for TOM ↔ Canonical Control Mapping Routes.
Tests the three-layer architecture:
TOM Measures → Mapping Bridge → Canonical Controls
"""
import uuid
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from compliance.api.tom_mapping_routes import (
router,
TOM_TO_CANONICAL_CATEGORIES,
_compute_profile_hash,
)
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def app():
"""Create a test FastAPI app with the TOM mapping router."""
from fastapi import FastAPI
app = FastAPI()
app.include_router(router)
return app
@pytest.fixture
def client(app):
return TestClient(app)
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
PROJECT_ID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
HEADERS = {"X-Tenant-ID": TENANT_ID}
# =============================================================================
# UNIT TESTS
# =============================================================================
class TestCategoryMapping:
"""Test the TOM → Canonical category mapping dictionary."""
def test_all_13_tom_categories_mapped(self):
expected = {
"ACCESS_CONTROL", "ADMISSION_CONTROL", "ACCESS_AUTHORIZATION",
"TRANSFER_CONTROL", "INPUT_CONTROL", "ORDER_CONTROL",
"AVAILABILITY", "SEPARATION", "ENCRYPTION", "PSEUDONYMIZATION",
"RESILIENCE", "RECOVERY", "REVIEW",
}
assert set(TOM_TO_CANONICAL_CATEGORIES.keys()) == expected
def test_each_category_has_at_least_one_canonical(self):
for tom_cat, canonical_cats in TOM_TO_CANONICAL_CATEGORIES.items():
assert len(canonical_cats) >= 1, f"{tom_cat} has no canonical categories"
def test_canonical_categories_are_valid(self):
"""All referenced canonical categories must exist in the DB seed (migration 047)."""
valid_canonical = {
"encryption", "authentication", "network", "data_protection",
"logging", "incident", "continuity", "compliance", "supply_chain",
"physical", "personnel", "application", "system", "risk",
"governance", "hardware", "identity",
}
for tom_cat, canonical_cats in TOM_TO_CANONICAL_CATEGORIES.items():
for cc in canonical_cats:
assert cc in valid_canonical, f"Invalid canonical category '{cc}' in {tom_cat}"
class TestProfileHash:
"""Test profile hash computation."""
def test_same_input_same_hash(self):
h1 = _compute_profile_hash("Telekommunikation", "medium")
h2 = _compute_profile_hash("Telekommunikation", "medium")
assert h1 == h2
def test_different_input_different_hash(self):
h1 = _compute_profile_hash("Telekommunikation", "medium")
h2 = _compute_profile_hash("Gesundheitswesen", "large")
assert h1 != h2
def test_none_values_produce_hash(self):
h = _compute_profile_hash(None, None)
assert len(h) == 16
def test_hash_is_16_chars(self):
h = _compute_profile_hash("test", "small")
assert len(h) == 16
# =============================================================================
# API ENDPOINT TESTS (with mocked DB)
# =============================================================================
class TestSyncEndpoint:
"""Test POST /tom-mappings/sync."""
def test_sync_requires_tenant_header(self, client):
resp = client.post("/tom-mappings/sync", json={"industry": "IT"})
assert resp.status_code == 400
assert "X-Tenant-ID" in resp.json()["detail"]
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_sync_unchanged_profile_skips(self, mock_session_cls, client):
"""When profile hash matches, sync should return 'unchanged'."""
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
profile_hash = _compute_profile_hash("IT", "medium")
mock_row = MagicMock()
mock_row.profile_hash = profile_hash
mock_db.execute.return_value.fetchone.return_value = mock_row
resp = client.post(
"/tom-mappings/sync",
json={"industry": "IT", "company_size": "medium"},
headers=HEADERS,
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "unchanged"
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_sync_force_ignores_hash(self, mock_session_cls, client):
"""force=True should sync even if hash matches."""
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
# Return empty results for canonical control queries
mock_db.execute.return_value.fetchall.return_value = []
mock_db.execute.return_value.fetchone.return_value = None
resp = client.post(
"/tom-mappings/sync",
json={"industry": "IT", "company_size": "medium", "force": True},
headers=HEADERS,
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "synced"
class TestListEndpoint:
"""Test GET /tom-mappings."""
def test_list_requires_tenant_header(self, client):
resp = client.get("/tom-mappings")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_list_returns_mappings(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchall.return_value = []
mock_db.execute.return_value.scalar.return_value = 0
resp = client.get("/tom-mappings", headers=HEADERS)
assert resp.status_code == 200
data = resp.json()
assert "mappings" in data
assert "total" in data
class TestByTomEndpoint:
"""Test GET /tom-mappings/by-tom/{code}."""
def test_by_tom_requires_tenant_header(self, client):
resp = client.get("/tom-mappings/by-tom/ENCRYPTION")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_by_tom_returns_mappings(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchall.return_value = []
resp = client.get("/tom-mappings/by-tom/ENCRYPTION", headers=HEADERS)
assert resp.status_code == 200
data = resp.json()
assert data["tom_code"] == "ENCRYPTION"
assert "mappings" in data
class TestStatsEndpoint:
"""Test GET /tom-mappings/stats."""
def test_stats_requires_tenant_header(self, client):
resp = client.get("/tom-mappings/stats")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_stats_returns_structure(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchone.return_value = None
mock_db.execute.return_value.fetchall.return_value = []
mock_db.execute.return_value.scalar.return_value = 0
resp = client.get("/tom-mappings/stats", headers=HEADERS)
assert resp.status_code == 200
data = resp.json()
assert "sync_state" in data
assert "category_breakdown" in data
assert "total_canonical_controls_available" in data
class TestManualMappingEndpoint:
"""Test POST /tom-mappings/manual."""
def test_manual_requires_tenant_header(self, client):
resp = client.post("/tom-mappings/manual", json={
"tom_control_code": "TOM-ENC-01",
"tom_category": "ENCRYPTION",
"canonical_control_id": str(uuid.uuid4()),
"canonical_control_code": "CRYP-001",
})
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_manual_404_if_canonical_not_found(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchone.return_value = None
resp = client.post(
"/tom-mappings/manual",
json={
"tom_control_code": "TOM-ENC-01",
"tom_category": "ENCRYPTION",
"canonical_control_id": str(uuid.uuid4()),
"canonical_control_code": "CRYP-001",
},
headers=HEADERS,
)
assert resp.status_code == 404
class TestDeleteMappingEndpoint:
"""Test DELETE /tom-mappings/{id}."""
def test_delete_requires_tenant_header(self, client):
resp = client.delete(f"/tom-mappings/{uuid.uuid4()}")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_delete_404_if_not_found(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.rowcount = 0
resp = client.delete(
f"/tom-mappings/{uuid.uuid4()}",
headers=HEADERS,
)
assert resp.status_code == 404