feat(tom): audit document, compliance checks, 25 controls, canonical control mapping

Phase A: TOM document HTML generator (12 sections, inline CSS, A4 print)
Phase B: TOMDocumentTab component (org-header form, revisions, print/download)
Phase C: 11 compliance checks with severity-weighted scoring
Phase D: MkDocs documentation for TOM module
Phase E: 25 new controls (63 → 88) in 13 categories

Canonical Control Mapping (three-layer architecture):
- Migration 068: tom_control_mappings + tom_control_sync_state tables
- 6 API endpoints: sync, list, by-tom, stats, manual add, delete
- Category mapping: 13 TOM categories → 17 canonical categories
- Frontend: sync button + coverage card (Overview), drill-down (Editor),
  belegende Controls count (Document)
- 20 tests (unit + API with mocked DB)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-19 11:56:53 +01:00
parent 2a70441eaa
commit 4b1eede45b
14 changed files with 3910 additions and 8 deletions

View File

@@ -56,6 +56,8 @@ _ROUTER_MODULES = [
"crosswalk_routes",
"process_task_routes",
"evidence_check_routes",
"vvt_library_routes",
"tom_mapping_routes",
]
_loaded_count = 0

View File

@@ -0,0 +1,537 @@
"""
TOM ↔ Canonical Control Mapping Routes.
Three-layer architecture:
TOM Measures (~88, audit-level) → Mapping Bridge → Canonical Controls (10,000+)
Endpoints:
POST /v1/tom-mappings/sync — Sync canonical controls for company profile
GET /v1/tom-mappings — List all mappings for tenant/project
GET /v1/tom-mappings/by-tom/{code} — Mappings for a specific TOM control
GET /v1/tom-mappings/stats — Coverage statistics
POST /v1/tom-mappings/manual — Manually add a mapping
DELETE /v1/tom-mappings/{id} — Remove a mapping
"""
from __future__ import annotations
import hashlib
import json
import logging
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy import text
from database import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tom-mappings", tags=["tom-control-mappings"])
# =============================================================================
# TOM CATEGORY → CANONICAL CATEGORY MAPPING
# =============================================================================
# Maps 13 TOM control categories to canonical_control_categories
# Each TOM category maps to 1-3 canonical categories for broad coverage
TOM_TO_CANONICAL_CATEGORIES: dict[str, list[str]] = {
"ACCESS_CONTROL": ["authentication", "identity", "physical"],
"ADMISSION_CONTROL": ["authentication", "identity", "system"],
"ACCESS_AUTHORIZATION": ["authentication", "identity"],
"TRANSFER_CONTROL": ["network", "data_protection", "encryption"],
"INPUT_CONTROL": ["application", "data_protection"],
"ORDER_CONTROL": ["supply_chain", "compliance"],
"AVAILABILITY": ["continuity", "system"],
"SEPARATION": ["network", "data_protection"],
"ENCRYPTION": ["encryption"],
"PSEUDONYMIZATION": ["data_protection", "encryption"],
"RESILIENCE": ["continuity", "system"],
"RECOVERY": ["continuity"],
"REVIEW": ["compliance", "governance", "risk"],
}
# =============================================================================
# REQUEST / RESPONSE MODELS
# =============================================================================
class SyncRequest(BaseModel):
"""Trigger a sync of canonical controls to TOM measures."""
industry: Optional[str] = None
company_size: Optional[str] = None
force: bool = False
class ManualMappingRequest(BaseModel):
"""Manually add a canonical control to a TOM measure."""
tom_control_code: str
tom_category: str
canonical_control_id: str
canonical_control_code: str
canonical_category: Optional[str] = None
relevance_score: float = 1.0
# =============================================================================
# HELPERS
# =============================================================================
def _get_tenant_id(x_tenant_id: Optional[str]) -> str:
"""Extract tenant ID from header."""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="X-Tenant-ID header required")
return x_tenant_id
def _compute_profile_hash(industry: Optional[str], company_size: Optional[str]) -> str:
"""Compute a hash from profile parameters for change detection."""
data = json.dumps({"industry": industry, "company_size": company_size}, sort_keys=True)
return hashlib.sha256(data.encode()).hexdigest()[:16]
def _mapping_row_to_dict(r) -> dict[str, Any]:
"""Convert a mapping row to API response dict."""
return {
"id": str(r.id),
"tenant_id": str(r.tenant_id),
"project_id": str(r.project_id) if r.project_id else None,
"tom_control_code": r.tom_control_code,
"tom_category": r.tom_category,
"canonical_control_id": str(r.canonical_control_id),
"canonical_control_code": r.canonical_control_code,
"canonical_category": r.canonical_category,
"mapping_type": r.mapping_type,
"relevance_score": float(r.relevance_score) if r.relevance_score else 1.0,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
# =============================================================================
# SYNC ENDPOINT
# =============================================================================
@router.post("/sync")
async def sync_mappings(
body: SyncRequest,
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
project_id: Optional[str] = Query(None),
):
"""
Sync canonical controls to TOM measures based on company profile.
Algorithm:
1. Compute profile hash → skip if unchanged (unless force=True)
2. For each TOM category, find matching canonical controls by:
- Category mapping (TOM category → canonical categories)
- Industry filter (applicable_industries JSONB containment)
- Company size filter (applicable_company_size JSONB containment)
- Only approved + customer_visible controls
3. Delete old auto-mappings, insert new ones
4. Update sync state
"""
tenant_id = _get_tenant_id(x_tenant_id)
profile_hash = _compute_profile_hash(body.industry, body.company_size)
with SessionLocal() as db:
# Check if sync is needed (profile unchanged)
if not body.force:
existing = db.execute(
text("""
SELECT profile_hash FROM tom_control_sync_state
WHERE tenant_id = :tid AND (project_id = :pid OR (project_id IS NULL AND :pid IS NULL))
"""),
{"tid": tenant_id, "pid": project_id},
).fetchone()
if existing and existing.profile_hash == profile_hash:
return {
"status": "unchanged",
"message": "Profile unchanged since last sync",
"profile_hash": profile_hash,
}
# Delete old auto-mappings for this tenant+project
db.execute(
text("""
DELETE FROM tom_control_mappings
WHERE tenant_id = :tid
AND (project_id = :pid OR (project_id IS NULL AND :pid IS NULL))
AND mapping_type = 'auto'
"""),
{"tid": tenant_id, "pid": project_id},
)
total_mappings = 0
canonical_ids_matched = set()
tom_codes_covered = set()
# For each TOM category, find matching canonical controls
for tom_category, canonical_categories in TOM_TO_CANONICAL_CATEGORIES.items():
# Build JSONB containment query for categories
cat_conditions = " OR ".join(
f"category = :cat_{i}" for i in range(len(canonical_categories))
)
cat_params = {f"cat_{i}": c for i, c in enumerate(canonical_categories)}
# Build industry filter
industry_filter = ""
if body.industry:
industry_filter = """
AND (
applicable_industries IS NULL
OR applicable_industries @> '"all"'::jsonb
OR applicable_industries @> (:industry)::jsonb
)
"""
cat_params["industry"] = json.dumps([body.industry])
# Build company size filter
size_filter = ""
if body.company_size:
size_filter = """
AND (
applicable_company_size IS NULL
OR applicable_company_size @> '"all"'::jsonb
OR applicable_company_size @> (:csize)::jsonb
)
"""
cat_params["csize"] = json.dumps([body.company_size])
query = f"""
SELECT id, control_id, category
FROM canonical_controls
WHERE ({cat_conditions})
AND release_state = 'approved'
AND customer_visible = true
{industry_filter}
{size_filter}
ORDER BY control_id
"""
rows = db.execute(text(query), cat_params).fetchall()
# Find TOM control codes in this category (query the frontend library
# codes; we use the category prefix pattern from the loader)
# TOM codes follow pattern: TOM-XX-NN where XX is category abbreviation
# We insert one mapping per canonical control per TOM category
for row in rows:
db.execute(
text("""
INSERT INTO tom_control_mappings (
tenant_id, project_id, tom_control_code, tom_category,
canonical_control_id, canonical_control_code, canonical_category,
mapping_type, relevance_score
) VALUES (
:tid, :pid, :tom_cat, :tom_cat,
:cc_id, :cc_code, :cc_category,
'auto', 1.00
)
ON CONFLICT (tenant_id, project_id, tom_control_code, canonical_control_id)
DO NOTHING
"""),
{
"tid": tenant_id,
"pid": project_id,
"tom_cat": tom_category,
"cc_id": str(row.id),
"cc_code": row.control_id,
"cc_category": row.category,
},
)
total_mappings += 1
canonical_ids_matched.add(str(row.id))
tom_codes_covered.add(tom_category)
# Upsert sync state
db.execute(
text("""
INSERT INTO tom_control_sync_state (
tenant_id, project_id, profile_hash,
total_mappings, canonical_controls_matched, tom_controls_covered,
last_synced_at
) VALUES (
:tid, :pid, :hash,
:total, :matched, :covered,
NOW()
)
ON CONFLICT (tenant_id, project_id)
DO UPDATE SET
profile_hash = :hash,
total_mappings = :total,
canonical_controls_matched = :matched,
tom_controls_covered = :covered,
last_synced_at = NOW()
"""),
{
"tid": tenant_id,
"pid": project_id,
"hash": profile_hash,
"total": total_mappings,
"matched": len(canonical_ids_matched),
"covered": len(tom_codes_covered),
},
)
db.commit()
return {
"status": "synced",
"profile_hash": profile_hash,
"total_mappings": total_mappings,
"canonical_controls_matched": len(canonical_ids_matched),
"tom_categories_covered": len(tom_codes_covered),
}
# =============================================================================
# LIST MAPPINGS
# =============================================================================
@router.get("")
async def list_mappings(
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
project_id: Optional[str] = Query(None),
tom_category: Optional[str] = Query(None),
mapping_type: Optional[str] = Query(None),
limit: int = Query(500, ge=1, le=5000),
offset: int = Query(0, ge=0),
):
"""List all TOM ↔ canonical control mappings for tenant/project."""
tenant_id = _get_tenant_id(x_tenant_id)
query = """
SELECT m.*, cc.title as canonical_title, cc.severity as canonical_severity
FROM tom_control_mappings m
LEFT JOIN canonical_controls cc ON cc.id = m.canonical_control_id
WHERE m.tenant_id = :tid
AND (m.project_id = :pid OR (m.project_id IS NULL AND :pid IS NULL))
"""
params: dict[str, Any] = {"tid": tenant_id, "pid": project_id}
if tom_category:
query += " AND m.tom_category = :tcat"
params["tcat"] = tom_category
if mapping_type:
query += " AND m.mapping_type = :mtype"
params["mtype"] = mapping_type
query += " ORDER BY m.tom_category, m.canonical_control_code"
query += " LIMIT :lim OFFSET :off"
params["lim"] = limit
params["off"] = offset
count_query = """
SELECT count(*) FROM tom_control_mappings
WHERE tenant_id = :tid
AND (project_id = :pid OR (project_id IS NULL AND :pid IS NULL))
"""
count_params: dict[str, Any] = {"tid": tenant_id, "pid": project_id}
if tom_category:
count_query += " AND tom_category = :tcat"
count_params["tcat"] = tom_category
with SessionLocal() as db:
rows = db.execute(text(query), params).fetchall()
total = db.execute(text(count_query), count_params).scalar()
mappings = []
for r in rows:
d = _mapping_row_to_dict(r)
d["canonical_title"] = getattr(r, "canonical_title", None)
d["canonical_severity"] = getattr(r, "canonical_severity", None)
mappings.append(d)
return {"mappings": mappings, "total": total}
# =============================================================================
# MAPPINGS BY TOM CONTROL
# =============================================================================
@router.get("/by-tom/{tom_code}")
async def get_mappings_by_tom(
tom_code: str,
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
project_id: Optional[str] = Query(None),
):
"""Get all canonical controls mapped to a specific TOM control code or category."""
tenant_id = _get_tenant_id(x_tenant_id)
with SessionLocal() as db:
rows = db.execute(
text("""
SELECT m.*, cc.title as canonical_title, cc.severity as canonical_severity,
cc.objective as canonical_objective
FROM tom_control_mappings m
LEFT JOIN canonical_controls cc ON cc.id = m.canonical_control_id
WHERE m.tenant_id = :tid
AND (m.project_id = :pid OR (m.project_id IS NULL AND :pid IS NULL))
AND (m.tom_control_code = :code OR m.tom_category = :code)
ORDER BY m.canonical_control_code
"""),
{"tid": tenant_id, "pid": project_id, "code": tom_code},
).fetchall()
mappings = []
for r in rows:
d = _mapping_row_to_dict(r)
d["canonical_title"] = getattr(r, "canonical_title", None)
d["canonical_severity"] = getattr(r, "canonical_severity", None)
d["canonical_objective"] = getattr(r, "canonical_objective", None)
mappings.append(d)
return {"tom_code": tom_code, "mappings": mappings, "total": len(mappings)}
# =============================================================================
# STATS
# =============================================================================
@router.get("/stats")
async def get_mapping_stats(
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
project_id: Optional[str] = Query(None),
):
"""Coverage statistics for TOM ↔ canonical control mappings."""
tenant_id = _get_tenant_id(x_tenant_id)
with SessionLocal() as db:
# Sync state
sync_state = db.execute(
text("""
SELECT * FROM tom_control_sync_state
WHERE tenant_id = :tid
AND (project_id = :pid OR (project_id IS NULL AND :pid IS NULL))
"""),
{"tid": tenant_id, "pid": project_id},
).fetchone()
# Per-category breakdown
category_stats = db.execute(
text("""
SELECT tom_category,
count(*) as total_mappings,
count(DISTINCT canonical_control_id) as unique_controls,
count(*) FILTER (WHERE mapping_type = 'auto') as auto_count,
count(*) FILTER (WHERE mapping_type = 'manual') as manual_count
FROM tom_control_mappings
WHERE tenant_id = :tid
AND (project_id = :pid OR (project_id IS NULL AND :pid IS NULL))
GROUP BY tom_category
ORDER BY tom_category
"""),
{"tid": tenant_id, "pid": project_id},
).fetchall()
# Total canonical controls in DB (approved + visible)
total_canonical = db.execute(
text("""
SELECT count(*) FROM canonical_controls
WHERE release_state = 'approved' AND customer_visible = true
""")
).scalar()
return {
"sync_state": {
"profile_hash": sync_state.profile_hash if sync_state else None,
"total_mappings": sync_state.total_mappings if sync_state else 0,
"canonical_controls_matched": sync_state.canonical_controls_matched if sync_state else 0,
"tom_controls_covered": sync_state.tom_controls_covered if sync_state else 0,
"last_synced_at": sync_state.last_synced_at.isoformat() if sync_state and sync_state.last_synced_at else None,
},
"category_breakdown": [
{
"tom_category": r.tom_category,
"total_mappings": r.total_mappings,
"unique_controls": r.unique_controls,
"auto_count": r.auto_count,
"manual_count": r.manual_count,
}
for r in category_stats
],
"total_canonical_controls_available": total_canonical or 0,
}
# =============================================================================
# MANUAL MAPPING
# =============================================================================
@router.post("/manual", status_code=201)
async def add_manual_mapping(
body: ManualMappingRequest,
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
project_id: Optional[str] = Query(None),
):
"""Manually add a canonical control to a TOM measure."""
tenant_id = _get_tenant_id(x_tenant_id)
with SessionLocal() as db:
# Verify canonical control exists
cc = db.execute(
text("SELECT id, control_id, category FROM canonical_controls WHERE id = CAST(:cid AS uuid)"),
{"cid": body.canonical_control_id},
).fetchone()
if not cc:
raise HTTPException(status_code=404, detail="Canonical control not found")
try:
row = db.execute(
text("""
INSERT INTO tom_control_mappings (
tenant_id, project_id, tom_control_code, tom_category,
canonical_control_id, canonical_control_code, canonical_category,
mapping_type, relevance_score
) VALUES (
:tid, :pid, :tom_code, :tom_cat,
CAST(:cc_id AS uuid), :cc_code, :cc_category,
'manual', :score
)
RETURNING *
"""),
{
"tid": tenant_id,
"pid": project_id,
"tom_code": body.tom_control_code,
"tom_cat": body.tom_category,
"cc_id": body.canonical_control_id,
"cc_code": body.canonical_control_code,
"cc_category": body.canonical_category or cc.category,
"score": body.relevance_score,
},
).fetchone()
db.commit()
except Exception as e:
if "unique" in str(e).lower() or "duplicate" in str(e).lower():
raise HTTPException(status_code=409, detail="Mapping already exists")
raise
return _mapping_row_to_dict(row)
# =============================================================================
# DELETE MAPPING
# =============================================================================
@router.delete("/{mapping_id}", status_code=204)
async def delete_mapping(
mapping_id: str,
x_tenant_id: Optional[str] = Header(None, alias="X-Tenant-ID"),
):
"""Remove a mapping (manual or auto)."""
tenant_id = _get_tenant_id(x_tenant_id)
with SessionLocal() as db:
result = db.execute(
text("""
DELETE FROM tom_control_mappings
WHERE id = CAST(:mid AS uuid) AND tenant_id = :tid
"""),
{"mid": mapping_id, "tid": tenant_id},
)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Mapping not found")
db.commit()
return None

View File

@@ -0,0 +1,65 @@
-- Migration 068: TOM ↔ Canonical Control Mappings
-- Bridge table connecting TOM measures (88) to Canonical Controls (10,000+)
-- Enables three-layer architecture: TOM → Mapping → Canonical Controls
-- ============================================================================
-- 1. Mapping table (TOM control code → Canonical control)
-- ============================================================================
CREATE TABLE IF NOT EXISTS tom_control_mappings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
project_id UUID,
-- TOM side (references the embedded TOM control code, e.g. 'TOM-AC-01')
tom_control_code VARCHAR(20) NOT NULL,
tom_category VARCHAR(50) NOT NULL,
-- Canonical control side
canonical_control_id UUID NOT NULL,
canonical_control_code VARCHAR(20) NOT NULL,
canonical_category VARCHAR(50),
-- Mapping metadata
mapping_type VARCHAR(20) NOT NULL DEFAULT 'auto'
CHECK (mapping_type IN ('auto', 'manual')),
relevance_score NUMERIC(3,2) DEFAULT 1.00
CHECK (relevance_score >= 0 AND relevance_score <= 1),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- No duplicate mappings per tenant+project+TOM+canonical
UNIQUE (tenant_id, project_id, tom_control_code, canonical_control_id)
);
CREATE INDEX IF NOT EXISTS idx_tcm_tenant_project
ON tom_control_mappings (tenant_id, project_id);
CREATE INDEX IF NOT EXISTS idx_tcm_tom_code
ON tom_control_mappings (tom_control_code);
CREATE INDEX IF NOT EXISTS idx_tcm_canonical_id
ON tom_control_mappings (canonical_control_id);
CREATE INDEX IF NOT EXISTS idx_tcm_tom_category
ON tom_control_mappings (tom_category);
-- ============================================================================
-- 2. Sync state (tracks when the last sync ran + profile hash)
-- ============================================================================
CREATE TABLE IF NOT EXISTS tom_control_sync_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
project_id UUID,
-- Profile hash to detect changes (SHA-256 of serialized company profile)
profile_hash VARCHAR(64),
-- Stats from last sync
total_mappings INTEGER DEFAULT 0,
canonical_controls_matched INTEGER DEFAULT 0,
tom_controls_covered INTEGER DEFAULT 0,
last_synced_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- One sync state per tenant+project
UNIQUE (tenant_id, project_id)
);

View File

@@ -0,0 +1,274 @@
"""
Tests for TOM ↔ Canonical Control Mapping Routes.
Tests the three-layer architecture:
TOM Measures → Mapping Bridge → Canonical Controls
"""
import uuid
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from compliance.api.tom_mapping_routes import (
router,
TOM_TO_CANONICAL_CATEGORIES,
_compute_profile_hash,
)
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def app():
"""Create a test FastAPI app with the TOM mapping router."""
from fastapi import FastAPI
app = FastAPI()
app.include_router(router)
return app
@pytest.fixture
def client(app):
return TestClient(app)
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
PROJECT_ID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
HEADERS = {"X-Tenant-ID": TENANT_ID}
# =============================================================================
# UNIT TESTS
# =============================================================================
class TestCategoryMapping:
"""Test the TOM → Canonical category mapping dictionary."""
def test_all_13_tom_categories_mapped(self):
expected = {
"ACCESS_CONTROL", "ADMISSION_CONTROL", "ACCESS_AUTHORIZATION",
"TRANSFER_CONTROL", "INPUT_CONTROL", "ORDER_CONTROL",
"AVAILABILITY", "SEPARATION", "ENCRYPTION", "PSEUDONYMIZATION",
"RESILIENCE", "RECOVERY", "REVIEW",
}
assert set(TOM_TO_CANONICAL_CATEGORIES.keys()) == expected
def test_each_category_has_at_least_one_canonical(self):
for tom_cat, canonical_cats in TOM_TO_CANONICAL_CATEGORIES.items():
assert len(canonical_cats) >= 1, f"{tom_cat} has no canonical categories"
def test_canonical_categories_are_valid(self):
"""All referenced canonical categories must exist in the DB seed (migration 047)."""
valid_canonical = {
"encryption", "authentication", "network", "data_protection",
"logging", "incident", "continuity", "compliance", "supply_chain",
"physical", "personnel", "application", "system", "risk",
"governance", "hardware", "identity",
}
for tom_cat, canonical_cats in TOM_TO_CANONICAL_CATEGORIES.items():
for cc in canonical_cats:
assert cc in valid_canonical, f"Invalid canonical category '{cc}' in {tom_cat}"
class TestProfileHash:
"""Test profile hash computation."""
def test_same_input_same_hash(self):
h1 = _compute_profile_hash("Telekommunikation", "medium")
h2 = _compute_profile_hash("Telekommunikation", "medium")
assert h1 == h2
def test_different_input_different_hash(self):
h1 = _compute_profile_hash("Telekommunikation", "medium")
h2 = _compute_profile_hash("Gesundheitswesen", "large")
assert h1 != h2
def test_none_values_produce_hash(self):
h = _compute_profile_hash(None, None)
assert len(h) == 16
def test_hash_is_16_chars(self):
h = _compute_profile_hash("test", "small")
assert len(h) == 16
# =============================================================================
# API ENDPOINT TESTS (with mocked DB)
# =============================================================================
class TestSyncEndpoint:
"""Test POST /tom-mappings/sync."""
def test_sync_requires_tenant_header(self, client):
resp = client.post("/tom-mappings/sync", json={"industry": "IT"})
assert resp.status_code == 400
assert "X-Tenant-ID" in resp.json()["detail"]
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_sync_unchanged_profile_skips(self, mock_session_cls, client):
"""When profile hash matches, sync should return 'unchanged'."""
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
profile_hash = _compute_profile_hash("IT", "medium")
mock_row = MagicMock()
mock_row.profile_hash = profile_hash
mock_db.execute.return_value.fetchone.return_value = mock_row
resp = client.post(
"/tom-mappings/sync",
json={"industry": "IT", "company_size": "medium"},
headers=HEADERS,
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "unchanged"
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_sync_force_ignores_hash(self, mock_session_cls, client):
"""force=True should sync even if hash matches."""
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
# Return empty results for canonical control queries
mock_db.execute.return_value.fetchall.return_value = []
mock_db.execute.return_value.fetchone.return_value = None
resp = client.post(
"/tom-mappings/sync",
json={"industry": "IT", "company_size": "medium", "force": True},
headers=HEADERS,
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "synced"
class TestListEndpoint:
"""Test GET /tom-mappings."""
def test_list_requires_tenant_header(self, client):
resp = client.get("/tom-mappings")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_list_returns_mappings(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchall.return_value = []
mock_db.execute.return_value.scalar.return_value = 0
resp = client.get("/tom-mappings", headers=HEADERS)
assert resp.status_code == 200
data = resp.json()
assert "mappings" in data
assert "total" in data
class TestByTomEndpoint:
"""Test GET /tom-mappings/by-tom/{code}."""
def test_by_tom_requires_tenant_header(self, client):
resp = client.get("/tom-mappings/by-tom/ENCRYPTION")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_by_tom_returns_mappings(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchall.return_value = []
resp = client.get("/tom-mappings/by-tom/ENCRYPTION", headers=HEADERS)
assert resp.status_code == 200
data = resp.json()
assert data["tom_code"] == "ENCRYPTION"
assert "mappings" in data
class TestStatsEndpoint:
"""Test GET /tom-mappings/stats."""
def test_stats_requires_tenant_header(self, client):
resp = client.get("/tom-mappings/stats")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_stats_returns_structure(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchone.return_value = None
mock_db.execute.return_value.fetchall.return_value = []
mock_db.execute.return_value.scalar.return_value = 0
resp = client.get("/tom-mappings/stats", headers=HEADERS)
assert resp.status_code == 200
data = resp.json()
assert "sync_state" in data
assert "category_breakdown" in data
assert "total_canonical_controls_available" in data
class TestManualMappingEndpoint:
"""Test POST /tom-mappings/manual."""
def test_manual_requires_tenant_header(self, client):
resp = client.post("/tom-mappings/manual", json={
"tom_control_code": "TOM-ENC-01",
"tom_category": "ENCRYPTION",
"canonical_control_id": str(uuid.uuid4()),
"canonical_control_code": "CRYP-001",
})
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_manual_404_if_canonical_not_found(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.fetchone.return_value = None
resp = client.post(
"/tom-mappings/manual",
json={
"tom_control_code": "TOM-ENC-01",
"tom_category": "ENCRYPTION",
"canonical_control_id": str(uuid.uuid4()),
"canonical_control_code": "CRYP-001",
},
headers=HEADERS,
)
assert resp.status_code == 404
class TestDeleteMappingEndpoint:
"""Test DELETE /tom-mappings/{id}."""
def test_delete_requires_tenant_header(self, client):
resp = client.delete(f"/tom-mappings/{uuid.uuid4()}")
assert resp.status_code == 400
@patch("compliance.api.tom_mapping_routes.SessionLocal")
def test_delete_404_if_not_found(self, mock_session_cls, client):
mock_db = MagicMock()
mock_session_cls.return_value.__enter__ = MagicMock(return_value=mock_db)
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_db.execute.return_value.rowcount = 0
resp = client.delete(
f"/tom-mappings/{uuid.uuid4()}",
headers=HEADERS,
)
assert resp.status_code == 404