feat(tom): TOM-Backend in Python erstellen, Frontend von In-Memory auf DB migrieren
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 33s
CI / test-python-backend-compliance (push) Successful in 31s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 15s

- Migration 034: compliance_tom_state + compliance_tom_measures Tabellen
- Python Routes: State CRUD, Measures CRUD, Bulk-Upsert, Stats, CSV/JSON-Export
- Frontend-Proxy: In-Memory Storage durch Proxy zu backend-compliance ersetzt
- Go TOM-Handler als DEPRECATED markiert (Source of Truth ist jetzt Python)
- 44 Tests (alle bestanden)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-06 17:35:44 +01:00
parent 4cbfea5c1d
commit 3593a4ff78
9 changed files with 1545 additions and 164 deletions

View File

@@ -1,119 +1,30 @@
import { NextRequest, NextResponse } from 'next/server'
import {
TOMGeneratorState,
createEmptyTOMGeneratorState,
} from '@/lib/sdk/tom-generator/types'
/**
* TOM Generator State API
* TOM Generator State API — Proxy to backend-compliance (Python/FastAPI)
*
* GET /api/sdk/v1/tom-generator/state?tenantId=xxx - Load TOM generator state
* POST /api/sdk/v1/tom-generator/state - Save TOM generator state
* DELETE /api/sdk/v1/tom-generator/state?tenantId=xxx - Clear state
*/
// =============================================================================
// STORAGE (In-Memory for development)
// =============================================================================
interface StoredTOMState {
state: TOMGeneratorState
version: number
createdAt: string
updatedAt: string
}
class InMemoryTOMStateStore {
private store: Map<string, StoredTOMState> = new Map()
async get(tenantId: string): Promise<StoredTOMState | null> {
return this.store.get(tenantId) || null
}
async save(tenantId: string, state: TOMGeneratorState, expectedVersion?: number): Promise<StoredTOMState> {
const existing = this.store.get(tenantId)
if (expectedVersion !== undefined && existing && existing.version !== expectedVersion) {
const error = new Error('Version conflict') as Error & { status: number }
error.status = 409
throw error
}
const now = new Date().toISOString()
const newVersion = existing ? existing.version + 1 : 1
const stored: StoredTOMState = {
state: {
...state,
updatedAt: new Date(now),
},
version: newVersion,
createdAt: existing?.createdAt || now,
updatedAt: now,
}
this.store.set(tenantId, stored)
return stored
}
async delete(tenantId: string): Promise<boolean> {
return this.store.delete(tenantId)
}
async list(): Promise<{ tenantId: string; updatedAt: string }[]> {
const result: { tenantId: string; updatedAt: string }[] = []
this.store.forEach((value, key) => {
result.push({ tenantId: key, updatedAt: value.updatedAt })
})
return result
}
}
const stateStore = new InMemoryTOMStateStore()
// =============================================================================
// HANDLERS
// =============================================================================
const BACKEND_URL = process.env.COMPLIANCE_BACKEND_URL || 'http://backend-compliance:8002'
export async function GET(request: NextRequest) {
try {
const { searchParams } = new URL(request.url)
const tenantId = searchParams.get('tenantId')
// List all states if no tenantId provided
if (!tenantId) {
const states = await stateStore.list()
return NextResponse.json({
success: true,
data: states,
})
}
const url = tenantId
? `${BACKEND_URL}/api/compliance/tom/state?tenant_id=${encodeURIComponent(tenantId)}`
: `${BACKEND_URL}/api/compliance/tom/state`
const stored = await stateStore.get(tenantId)
if (!stored) {
// Return empty state for new tenants
const emptyState = createEmptyTOMGeneratorState(tenantId)
return NextResponse.json({
success: true,
data: {
tenantId,
state: emptyState,
version: 0,
isNew: true,
},
})
}
return NextResponse.json({
success: true,
data: {
tenantId,
state: stored.state,
version: stored.version,
lastModified: stored.updatedAt,
},
const res = await fetch(url, {
headers: { 'Content-Type': 'application/json' },
})
const data = await res.json()
return NextResponse.json(data, { status: res.status })
} catch (error) {
console.error('Failed to load TOM generator state:', error)
return NextResponse.json(
@@ -142,65 +53,19 @@ export async function POST(request: NextRequest) {
)
}
// Deserialize dates
const parsedState: TOMGeneratorState = {
...state,
createdAt: new Date(state.createdAt),
updatedAt: new Date(state.updatedAt),
steps: state.steps.map((step: { id: string; completed: boolean; data: unknown; validatedAt: string | null }) => ({
...step,
validatedAt: step.validatedAt ? new Date(step.validatedAt) : null,
})),
documents: state.documents?.map((doc: { uploadedAt: string; validFrom?: string; validUntil?: string; aiAnalysis?: { analyzedAt: string } }) => ({
...doc,
uploadedAt: new Date(doc.uploadedAt),
validFrom: doc.validFrom ? new Date(doc.validFrom) : null,
validUntil: doc.validUntil ? new Date(doc.validUntil) : null,
aiAnalysis: doc.aiAnalysis ? {
...doc.aiAnalysis,
analyzedAt: new Date(doc.aiAnalysis.analyzedAt),
} : null,
})) || [],
derivedTOMs: state.derivedTOMs?.map((tom: { implementationDate?: string; reviewDate?: string }) => ({
...tom,
implementationDate: tom.implementationDate ? new Date(tom.implementationDate) : null,
reviewDate: tom.reviewDate ? new Date(tom.reviewDate) : null,
})) || [],
gapAnalysis: state.gapAnalysis ? {
...state.gapAnalysis,
generatedAt: new Date(state.gapAnalysis.generatedAt),
} : null,
exports: state.exports?.map((exp: { generatedAt: string }) => ({
...exp,
generatedAt: new Date(exp.generatedAt),
})) || [],
}
const stored = await stateStore.save(tenantId, parsedState, version)
return NextResponse.json({
success: true,
data: {
tenantId,
state: stored.state,
version: stored.version,
lastModified: stored.updatedAt,
},
const res = await fetch(`${BACKEND_URL}/api/compliance/tom/state`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tenant_id: tenantId,
state,
version,
}),
})
const data = await res.json()
return NextResponse.json(data, { status: res.status })
} catch (error) {
const err = error as Error & { status?: number }
if (err.status === 409 || err.message === 'Version conflict') {
return NextResponse.json(
{
success: false,
error: 'Version conflict. State was modified by another request.',
code: 'VERSION_CONFLICT',
},
{ status: 409 }
)
}
console.error('Failed to save TOM generator state:', error)
return NextResponse.json(
{ success: false, error: 'Failed to save state' },
@@ -221,14 +86,16 @@ export async function DELETE(request: NextRequest) {
)
}
const deleted = await stateStore.delete(tenantId)
const res = await fetch(
`${BACKEND_URL}/api/compliance/tom/state?tenant_id=${encodeURIComponent(tenantId)}`,
{
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
}
)
return NextResponse.json({
success: true,
tenantId,
deleted,
deletedAt: new Date().toISOString(),
})
const data = await res.json()
return NextResponse.json(data, { status: res.status })
} catch (error) {
console.error('Failed to delete TOM generator state:', error)
return NextResponse.json(

View File

@@ -263,6 +263,8 @@ func main() {
}
// TOM - Technische und Organisatorische Maßnahmen (Art. 32)
// DEPRECATED: TOM is now managed by backend-compliance (Python).
// Use: GET/POST /api/compliance/tom/state, /tom/measures, /tom/stats, /tom/export
tom := dsgvoRoutes.Group("/tom")
{
tom.GET("", dsgvoHandlers.ListTOMs)
@@ -301,7 +303,7 @@ func main() {
exports := dsgvoRoutes.Group("/export")
{
exports.GET("/vvt", dsgvoHandlers.ExportVVT) // DEPRECATED: use backend-compliance /vvt/export?format=csv
exports.GET("/tom", dsgvoHandlers.ExportTOM)
exports.GET("/tom", dsgvoHandlers.ExportTOM) // DEPRECATED: use backend-compliance /tom/export?format=csv
exports.GET("/dsr", dsgvoHandlers.ExportDSR)
exports.GET("/retention", dsgvoHandlers.ExportRetentionPolicies)
}

View File

@@ -144,8 +144,12 @@ func (h *DSGVOHandlers) DeleteProcessingActivity(c *gin.Context) {
// ============================================================================
// TOM - Technische und Organisatorische Maßnahmen
// ============================================================================
// DEPRECATED: TOM is now managed by backend-compliance (Python).
// These handlers remain for backwards compatibility but should not be used.
// Use backend-compliance endpoints: GET/POST /api/compliance/tom/...
// ListTOMs returns all TOMs for a tenant
// DEPRECATED: Use backend-compliance GET /api/compliance/tom/measures
func (h *DSGVOHandlers) ListTOMs(c *gin.Context) {
tenantID := rbac.GetTenantID(c)
if tenantID == uuid.Nil {
@@ -553,6 +557,7 @@ func (h *DSGVOHandlers) ExportVVT(c *gin.Context) {
}
// ExportTOM exports the TOM catalog as CSV/JSON
// DEPRECATED: Use backend-compliance GET /api/compliance/tom/export
func (h *DSGVOHandlers) ExportTOM(c *gin.Context) {
tenantID := rbac.GetTenantID(c)
if tenantID == uuid.Nil {

View File

@@ -26,6 +26,7 @@ from .dsr_routes import router as dsr_router
from .email_template_routes import router as email_template_router
from .banner_routes import router as banner_router
from .extraction_routes import router as extraction_router
from .tom_routes import router as tom_router
# Include sub-routers
router.include_router(audit_router)
@@ -53,6 +54,7 @@ router.include_router(dsr_router)
router.include_router(email_template_router)
router.include_router(banner_router)
router.include_router(extraction_router)
router.include_router(tom_router)
__all__ = [
"router",
@@ -80,4 +82,5 @@ __all__ = [
"dsr_router",
"email_template_router",
"banner_router",
"tom_router",
]

View File

@@ -2002,3 +2002,57 @@ class VVTAuditLogEntry(BaseModel):
class Config:
from_attributes = True
# ============================================================================
# TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO)
# ============================================================================
class TOMStateResponse(BaseModel):
tenant_id: str
state: Dict[str, Any] = {}
version: int = 0
last_modified: Optional[datetime] = None
is_new: bool = False
class TOMMeasureResponse(BaseModel):
id: str
tenant_id: str
control_id: str
name: str
description: Optional[str] = None
category: str
type: str
applicability: str = "REQUIRED"
applicability_reason: Optional[str] = None
implementation_status: str = "NOT_IMPLEMENTED"
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[datetime] = None
review_date: Optional[datetime] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: List[Any] = []
evidence_gaps: List[Any] = []
related_controls: Dict[str, Any] = {}
verified_at: Optional[datetime] = None
verified_by: Optional[str] = None
effectiveness_rating: Optional[str] = None
created_by: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class TOMStatsResponse(BaseModel):
total: int = 0
by_status: Dict[str, int] = {}
by_category: Dict[str, int] = {}
overdue_review_count: int = 0
implemented: int = 0
partial: int = 0
not_implemented: int = 0

View File

@@ -0,0 +1,575 @@
"""
FastAPI routes for TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO).
Endpoints:
GET /tom/state — Load TOM generator state for tenant
POST /tom/state — Save state (with version check)
DELETE /tom/state — Reset/clear state for tenant
GET /tom/measures — List measures (filter: category, status, tenant_id)
POST /tom/measures — Create single measure
PUT /tom/measures/{id} — Update measure
POST /tom/measures/bulk — Bulk upsert (for deriveTOMs sync)
GET /tom/stats — Statistics
GET /tom/export — Export as CSV or JSON
"""
import csv
import io
import json
import logging
from datetime import datetime, timezone
from typing import Optional, List, Any, Dict
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from sqlalchemy import func
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db.tom_models import TOMStateDB, TOMMeasureDB
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tom", tags=["tom"])
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# =============================================================================
# Pydantic Schemas (kept close to routes like loeschfristen pattern)
# =============================================================================
class TOMStateBody(BaseModel):
tenant_id: Optional[str] = None
tenantId: Optional[str] = None # Accept camelCase from frontend
state: Dict[str, Any]
version: Optional[int] = None
def get_tenant_id(self) -> str:
return self.tenant_id or self.tenantId or DEFAULT_TENANT_ID
class TOMMeasureCreate(BaseModel):
control_id: str
name: str
description: Optional[str] = None
category: str
type: str
applicability: str = "REQUIRED"
applicability_reason: Optional[str] = None
implementation_status: str = "NOT_IMPLEMENTED"
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[str] = None
review_date: Optional[str] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: Optional[List[Any]] = None
evidence_gaps: Optional[List[Any]] = None
related_controls: Optional[Dict[str, Any]] = None
verified_at: Optional[str] = None
verified_by: Optional[str] = None
effectiveness_rating: Optional[str] = None
class TOMMeasureUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
type: Optional[str] = None
applicability: Optional[str] = None
applicability_reason: Optional[str] = None
implementation_status: Optional[str] = None
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[str] = None
review_date: Optional[str] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: Optional[List[Any]] = None
evidence_gaps: Optional[List[Any]] = None
related_controls: Optional[Dict[str, Any]] = None
verified_at: Optional[str] = None
verified_by: Optional[str] = None
effectiveness_rating: Optional[str] = None
class TOMMeasureBulkItem(BaseModel):
control_id: str
name: str
description: Optional[str] = None
category: str
type: str
applicability: str = "REQUIRED"
applicability_reason: Optional[str] = None
implementation_status: str = "NOT_IMPLEMENTED"
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[str] = None
review_date: Optional[str] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: Optional[List[Any]] = None
evidence_gaps: Optional[List[Any]] = None
related_controls: Optional[Dict[str, Any]] = None
class TOMMeasureBulkBody(BaseModel):
tenant_id: Optional[str] = None
measures: List[TOMMeasureBulkItem]
# =============================================================================
# Helper: parse optional datetime strings
# =============================================================================
def _parse_dt(val: Optional[str]) -> Optional[datetime]:
if not val:
return None
try:
return datetime.fromisoformat(val.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
def _measure_to_dict(m: TOMMeasureDB) -> dict:
return {
"id": str(m.id),
"tenant_id": m.tenant_id,
"control_id": m.control_id,
"name": m.name,
"description": m.description,
"category": m.category,
"type": m.type,
"applicability": m.applicability,
"applicability_reason": m.applicability_reason,
"implementation_status": m.implementation_status,
"responsible_person": m.responsible_person,
"responsible_department": m.responsible_department,
"implementation_date": m.implementation_date.isoformat() if m.implementation_date else None,
"review_date": m.review_date.isoformat() if m.review_date else None,
"review_frequency": m.review_frequency,
"priority": m.priority,
"complexity": m.complexity,
"linked_evidence": m.linked_evidence or [],
"evidence_gaps": m.evidence_gaps or [],
"related_controls": m.related_controls or {},
"verified_at": m.verified_at.isoformat() if m.verified_at else None,
"verified_by": m.verified_by,
"effectiveness_rating": m.effectiveness_rating,
"created_by": m.created_by,
"created_at": m.created_at.isoformat() if m.created_at else None,
"updated_at": m.updated_at.isoformat() if m.updated_at else None,
}
# =============================================================================
# STATE ENDPOINTS
# =============================================================================
@router.get("/state")
async def get_tom_state(
tenant_id: Optional[str] = Query(None, alias="tenant_id"),
tenantId: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Load TOM generator state for a tenant."""
tid = tenant_id or tenantId or DEFAULT_TENANT_ID
row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first()
if not row:
return {
"success": True,
"data": {
"tenantId": tid,
"state": {},
"version": 0,
"isNew": True,
},
}
return {
"success": True,
"data": {
"tenantId": tid,
"state": row.state,
"version": row.version,
"lastModified": row.updated_at.isoformat() if row.updated_at else None,
},
}
@router.post("/state")
async def save_tom_state(body: TOMStateBody, db: Session = Depends(get_db)):
"""Save TOM generator state with optimistic locking (version check)."""
tid = body.get_tenant_id()
existing = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first()
# Version conflict check
if body.version is not None and existing and existing.version != body.version:
raise HTTPException(
status_code=409,
detail={
"success": False,
"error": "Version conflict. State was modified by another request.",
"code": "VERSION_CONFLICT",
},
)
now = datetime.now(timezone.utc)
if existing:
existing.state = body.state
existing.version = existing.version + 1
existing.updated_at = now
else:
existing = TOMStateDB(
tenant_id=tid,
state=body.state,
version=1,
created_at=now,
updated_at=now,
)
db.add(existing)
db.commit()
db.refresh(existing)
return {
"success": True,
"data": {
"tenantId": tid,
"state": existing.state,
"version": existing.version,
"lastModified": existing.updated_at.isoformat() if existing.updated_at else None,
},
}
@router.delete("/state")
async def delete_tom_state(
tenant_id: Optional[str] = Query(None, alias="tenant_id"),
tenantId: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Clear TOM generator state for a tenant."""
tid = tenant_id or tenantId
if not tid:
raise HTTPException(status_code=400, detail="tenant_id is required")
row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first()
deleted = False
if row:
db.delete(row)
db.commit()
deleted = True
return {
"success": True,
"tenantId": tid,
"deleted": deleted,
"deletedAt": datetime.now(timezone.utc).isoformat(),
}
# =============================================================================
# MEASURES ENDPOINTS
# =============================================================================
@router.get("/measures")
async def list_measures(
tenant_id: Optional[str] = Query(None),
category: Optional[str] = Query(None),
implementation_status: Optional[str] = Query(None),
priority: Optional[str] = Query(None),
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
):
"""List TOM measures with optional filters."""
tid = tenant_id or DEFAULT_TENANT_ID
q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid)
if category:
q = q.filter(TOMMeasureDB.category == category)
if implementation_status:
q = q.filter(TOMMeasureDB.implementation_status == implementation_status)
if priority:
q = q.filter(TOMMeasureDB.priority == priority)
if search:
pattern = f"%{search}%"
q = q.filter(
(TOMMeasureDB.name.ilike(pattern))
| (TOMMeasureDB.description.ilike(pattern))
| (TOMMeasureDB.control_id.ilike(pattern))
)
total = q.count()
rows = q.order_by(TOMMeasureDB.control_id).offset(offset).limit(limit).all()
return {
"measures": [_measure_to_dict(r) for r in rows],
"total": total,
"limit": limit,
"offset": offset,
}
@router.post("/measures", status_code=201)
async def create_measure(
body: TOMMeasureCreate,
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Create a single TOM measure."""
tid = tenant_id or DEFAULT_TENANT_ID
# Check for duplicate control_id
existing = (
db.query(TOMMeasureDB)
.filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == body.control_id)
.first()
)
if existing:
raise HTTPException(status_code=409, detail=f"Measure with control_id '{body.control_id}' already exists")
now = datetime.now(timezone.utc)
measure = TOMMeasureDB(
tenant_id=tid,
control_id=body.control_id,
name=body.name,
description=body.description,
category=body.category,
type=body.type,
applicability=body.applicability,
applicability_reason=body.applicability_reason,
implementation_status=body.implementation_status,
responsible_person=body.responsible_person,
responsible_department=body.responsible_department,
implementation_date=_parse_dt(body.implementation_date),
review_date=_parse_dt(body.review_date),
review_frequency=body.review_frequency,
priority=body.priority,
complexity=body.complexity,
linked_evidence=body.linked_evidence or [],
evidence_gaps=body.evidence_gaps or [],
related_controls=body.related_controls or {},
verified_at=_parse_dt(body.verified_at),
verified_by=body.verified_by,
effectiveness_rating=body.effectiveness_rating,
created_at=now,
updated_at=now,
)
db.add(measure)
db.commit()
db.refresh(measure)
return _measure_to_dict(measure)
@router.put("/measures/{measure_id}")
async def update_measure(
measure_id: UUID,
body: TOMMeasureUpdate,
db: Session = Depends(get_db),
):
"""Update a TOM measure."""
row = db.query(TOMMeasureDB).filter(TOMMeasureDB.id == measure_id).first()
if not row:
raise HTTPException(status_code=404, detail="Measure not found")
update_data = body.model_dump(exclude_unset=True)
for key, val in update_data.items():
if key in ("implementation_date", "review_date", "verified_at"):
val = _parse_dt(val)
setattr(row, key, val)
row.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(row)
return _measure_to_dict(row)
@router.post("/measures/bulk")
async def bulk_upsert_measures(
body: TOMMeasureBulkBody,
db: Session = Depends(get_db),
):
"""Bulk upsert measures — used by deriveTOMs sync from frontend."""
tid = body.tenant_id or DEFAULT_TENANT_ID
now = datetime.now(timezone.utc)
created = 0
updated = 0
for item in body.measures:
existing = (
db.query(TOMMeasureDB)
.filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == item.control_id)
.first()
)
if existing:
existing.name = item.name
existing.description = item.description
existing.category = item.category
existing.type = item.type
existing.applicability = item.applicability
existing.applicability_reason = item.applicability_reason
existing.implementation_status = item.implementation_status
existing.responsible_person = item.responsible_person
existing.responsible_department = item.responsible_department
existing.implementation_date = _parse_dt(item.implementation_date)
existing.review_date = _parse_dt(item.review_date)
existing.review_frequency = item.review_frequency
existing.priority = item.priority
existing.complexity = item.complexity
existing.linked_evidence = item.linked_evidence or []
existing.evidence_gaps = item.evidence_gaps or []
existing.related_controls = item.related_controls or {}
existing.updated_at = now
updated += 1
else:
measure = TOMMeasureDB(
tenant_id=tid,
control_id=item.control_id,
name=item.name,
description=item.description,
category=item.category,
type=item.type,
applicability=item.applicability,
applicability_reason=item.applicability_reason,
implementation_status=item.implementation_status,
responsible_person=item.responsible_person,
responsible_department=item.responsible_department,
implementation_date=_parse_dt(item.implementation_date),
review_date=_parse_dt(item.review_date),
review_frequency=item.review_frequency,
priority=item.priority,
complexity=item.complexity,
linked_evidence=item.linked_evidence or [],
evidence_gaps=item.evidence_gaps or [],
related_controls=item.related_controls or {},
created_at=now,
updated_at=now,
)
db.add(measure)
created += 1
db.commit()
return {
"success": True,
"tenant_id": tid,
"created": created,
"updated": updated,
"total": created + updated,
}
# =============================================================================
# STATS & EXPORT
# =============================================================================
@router.get("/stats")
async def get_tom_stats(
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Return TOM statistics for a tenant."""
tid = tenant_id or DEFAULT_TENANT_ID
base_q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid)
total = base_q.count()
# By status
status_rows = (
db.query(TOMMeasureDB.implementation_status, func.count(TOMMeasureDB.id))
.filter(TOMMeasureDB.tenant_id == tid)
.group_by(TOMMeasureDB.implementation_status)
.all()
)
by_status = {row[0]: row[1] for row in status_rows}
# By category
cat_rows = (
db.query(TOMMeasureDB.category, func.count(TOMMeasureDB.id))
.filter(TOMMeasureDB.tenant_id == tid)
.group_by(TOMMeasureDB.category)
.all()
)
by_category = {row[0]: row[1] for row in cat_rows}
# Overdue reviews
now = datetime.now(timezone.utc)
overdue = (
base_q.filter(
TOMMeasureDB.review_date.isnot(None),
TOMMeasureDB.review_date < now,
)
.count()
)
return {
"total": total,
"by_status": by_status,
"by_category": by_category,
"overdue_review_count": overdue,
"implemented": by_status.get("IMPLEMENTED", 0),
"partial": by_status.get("PARTIAL", 0),
"not_implemented": by_status.get("NOT_IMPLEMENTED", 0),
}
@router.get("/export")
async def export_measures(
tenant_id: Optional[str] = Query(None),
format: str = Query("csv"),
db: Session = Depends(get_db),
):
"""Export TOM measures as CSV (semicolon-separated) or JSON."""
tid = tenant_id or DEFAULT_TENANT_ID
rows = (
db.query(TOMMeasureDB)
.filter(TOMMeasureDB.tenant_id == tid)
.order_by(TOMMeasureDB.control_id)
.all()
)
measures = [_measure_to_dict(r) for r in rows]
if format == "json":
return StreamingResponse(
io.BytesIO(json.dumps(measures, ensure_ascii=False, indent=2).encode("utf-8")),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=tom_export.json"},
)
# CSV (semicolon, like VVT)
output = io.StringIO()
fieldnames = [
"control_id", "name", "description", "category", "type",
"applicability", "implementation_status", "responsible_person",
"responsible_department", "implementation_date", "review_date",
"review_frequency", "priority", "complexity", "effectiveness_rating",
]
writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=";", extrasaction="ignore")
writer.writeheader()
for m in measures:
writer.writerow(m)
output.seek(0)
return StreamingResponse(
io.BytesIO(output.getvalue().encode("utf-8")),
media_type="text/csv; charset=utf-8",
headers={"Content-Disposition": "attachment; filename=tom_export.csv"},
)

View File

@@ -0,0 +1,79 @@
"""
SQLAlchemy models for TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO).
Tables:
- compliance_tom_state: Full TOM-Generator state per tenant (JSONB blob)
- compliance_tom_measures: Individual TOM measures (flat, queryable)
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Column, String, Text, Integer, DateTime, JSON, Index
)
from sqlalchemy.dialects.postgresql import UUID
from classroom_engine.database import Base
class TOMStateDB(Base):
"""Persists the entire TOM-Generator state per tenant."""
__tablename__ = 'compliance_tom_state'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False, unique=True)
state = Column(JSON, nullable=False, default=dict)
version = Column(Integer, nullable=False, default=1)
created_at = Column(DateTime(timezone=True), default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_tom_state_tenant', 'tenant_id'),
)
def __repr__(self):
return f"<TOMState tenant={self.tenant_id} v{self.version}>"
class TOMMeasureDB(Base):
"""Individual TOM measure — flat, queryable, for reports and export."""
__tablename__ = 'compliance_tom_measures'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False)
control_id = Column(String(50), nullable=False)
name = Column(String(300), nullable=False)
description = Column(Text)
category = Column(String(50), nullable=False)
type = Column(String(20), nullable=False)
applicability = Column(String(20), default='REQUIRED')
applicability_reason = Column(Text)
implementation_status = Column(String(20), default='NOT_IMPLEMENTED')
responsible_person = Column(String(255))
responsible_department = Column(String(255))
implementation_date = Column(DateTime(timezone=True))
review_date = Column(DateTime(timezone=True))
review_frequency = Column(String(20))
priority = Column(String(20))
complexity = Column(String(20))
linked_evidence = Column(JSON, default=list)
evidence_gaps = Column(JSON, default=list)
related_controls = Column(JSON, default=dict)
verified_at = Column(DateTime(timezone=True))
verified_by = Column(String(200))
effectiveness_rating = Column(String(20))
created_by = Column(String(200), default='system')
created_at = Column(DateTime(timezone=True), default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_tom_measures_tenant', 'tenant_id'),
Index('idx_tom_measures_category', 'tenant_id', 'category'),
Index('idx_tom_measures_status', 'tenant_id', 'implementation_status'),
)
def __repr__(self):
return f"<TOMMeasure {self.control_id}: {self.name}>"

View File

@@ -0,0 +1,61 @@
-- Migration 034: TOM (Technisch-Organisatorische Massnahmen, Art. 32 DSGVO)
--
-- Two tables:
-- 1. compliance_tom_state: Persists the full TOM-Generator state per tenant (replaces In-Memory)
-- 2. compliance_tom_measures: Individual TOM measures (flat, queryable, for reports/export)
BEGIN;
-- ---------------------------------------------------------------------------
-- 1. TOM Generator State (one JSONB blob per tenant)
-- ---------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS compliance_tom_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL,
state JSONB NOT NULL DEFAULT '{}',
version INT NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id)
);
CREATE INDEX IF NOT EXISTS idx_tom_state_tenant ON compliance_tom_state(tenant_id);
-- ---------------------------------------------------------------------------
-- 2. Individual TOM Measures (flat, queryable)
-- ---------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS compliance_tom_measures (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL,
control_id VARCHAR(50) NOT NULL,
name VARCHAR(300) NOT NULL,
description TEXT,
category VARCHAR(50) NOT NULL,
type VARCHAR(20) NOT NULL,
applicability VARCHAR(20) DEFAULT 'REQUIRED',
applicability_reason TEXT,
implementation_status VARCHAR(20) DEFAULT 'NOT_IMPLEMENTED',
responsible_person VARCHAR(255),
responsible_department VARCHAR(255),
implementation_date TIMESTAMPTZ,
review_date TIMESTAMPTZ,
review_frequency VARCHAR(20),
priority VARCHAR(20),
complexity VARCHAR(20),
linked_evidence JSONB DEFAULT '[]',
evidence_gaps JSONB DEFAULT '[]',
related_controls JSONB DEFAULT '{}',
verified_at TIMESTAMPTZ,
verified_by VARCHAR(200),
effectiveness_rating VARCHAR(20),
created_by VARCHAR(200) DEFAULT 'system',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id, control_id)
);
CREATE INDEX IF NOT EXISTS idx_tom_measures_tenant ON compliance_tom_measures(tenant_id);
CREATE INDEX IF NOT EXISTS idx_tom_measures_category ON compliance_tom_measures(tenant_id, category);
CREATE INDEX IF NOT EXISTS idx_tom_measures_status ON compliance_tom_measures(tenant_id, implementation_status);
COMMIT;

View File

@@ -0,0 +1,735 @@
"""Tests for TOM routes (tom_routes.py, tom_models.py)."""
import pytest
import uuid
from unittest.mock import MagicMock, patch, PropertyMock
from datetime import datetime, timezone
from fastapi.testclient import TestClient
from fastapi import FastAPI
from compliance.api.tom_routes import (
router,
TOMStateBody,
TOMMeasureCreate,
TOMMeasureUpdate,
TOMMeasureBulkBody,
TOMMeasureBulkItem,
_parse_dt,
_measure_to_dict,
DEFAULT_TENANT_ID,
)
from compliance.db.tom_models import TOMStateDB, TOMMeasureDB
from compliance.api.schemas import TOMStatsResponse, TOMMeasureResponse
# =============================================================================
# Test App Setup
# =============================================================================
app = FastAPI()
app.include_router(router)
DEFAULT_TENANT = DEFAULT_TENANT_ID
MEASURE_ID = "ffffffff-0001-0001-0001-000000000001"
UNKNOWN_ID = "aaaaaaaa-9999-9999-9999-999999999999"
# =============================================================================
# Helper: create mock DB session
# =============================================================================
def _make_mock_db():
db = MagicMock()
db.query.return_value = db
db.filter.return_value = db
db.first.return_value = None
db.count.return_value = 0
db.all.return_value = []
db.offset.return_value = db
db.limit.return_value = db
db.order_by.return_value = db
db.group_by.return_value = db
return db
def _make_state_row(tenant_id=DEFAULT_TENANT, version=1, state=None):
row = TOMStateDB()
row.id = uuid.uuid4()
row.tenant_id = tenant_id
row.state = state or {"steps": [], "derivedTOMs": []}
row.version = version
row.created_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
row.updated_at = datetime(2024, 1, 2, tzinfo=timezone.utc)
return row
def _make_measure_row(control_id="TOM.GOV.01", **kwargs):
m = TOMMeasureDB()
m.id = uuid.UUID(kwargs.get("id", MEASURE_ID))
m.tenant_id = kwargs.get("tenant_id", DEFAULT_TENANT)
m.control_id = control_id
m.name = kwargs.get("name", "Datenschutzrichtlinie")
m.description = kwargs.get("description", "Beschreibung")
m.category = kwargs.get("category", "GOVERNANCE")
m.type = kwargs.get("type", "ORGANIZATIONAL")
m.applicability = kwargs.get("applicability", "REQUIRED")
m.applicability_reason = kwargs.get("applicability_reason", None)
m.implementation_status = kwargs.get("implementation_status", "NOT_IMPLEMENTED")
m.responsible_person = kwargs.get("responsible_person", None)
m.responsible_department = kwargs.get("responsible_department", None)
m.implementation_date = kwargs.get("implementation_date", None)
m.review_date = kwargs.get("review_date", None)
m.review_frequency = kwargs.get("review_frequency", "ANNUAL")
m.priority = kwargs.get("priority", "HIGH")
m.complexity = kwargs.get("complexity", "MEDIUM")
m.linked_evidence = kwargs.get("linked_evidence", [])
m.evidence_gaps = kwargs.get("evidence_gaps", [])
m.related_controls = kwargs.get("related_controls", {})
m.verified_at = kwargs.get("verified_at", None)
m.verified_by = kwargs.get("verified_by", None)
m.effectiveness_rating = kwargs.get("effectiveness_rating", None)
m.created_by = kwargs.get("created_by", "system")
m.created_at = kwargs.get("created_at", datetime(2024, 1, 1, tzinfo=timezone.utc))
m.updated_at = kwargs.get("updated_at", datetime(2024, 1, 2, tzinfo=timezone.utc))
return m
# =============================================================================
# Schema Tests
# =============================================================================
class TestTOMStateBody:
def test_get_tenant_id_from_tenant_id(self):
body = TOMStateBody(tenant_id="abc", state={})
assert body.get_tenant_id() == "abc"
def test_get_tenant_id_from_camelcase(self):
body = TOMStateBody(tenantId="def", state={})
assert body.get_tenant_id() == "def"
def test_get_tenant_id_default(self):
body = TOMStateBody(state={})
assert body.get_tenant_id() == DEFAULT_TENANT
def test_version_optional(self):
body = TOMStateBody(tenant_id="x", state={"foo": "bar"})
assert body.version is None
class TestTOMMeasureCreate:
def test_defaults(self):
mc = TOMMeasureCreate(
control_id="TOM.GOV.01",
name="Test",
category="GOVERNANCE",
type="ORGANIZATIONAL",
)
assert mc.applicability == "REQUIRED"
assert mc.implementation_status == "NOT_IMPLEMENTED"
assert mc.priority is None
assert mc.linked_evidence is None
def test_full_values(self):
mc = TOMMeasureCreate(
control_id="TOM.ACC.02",
name="Zugriffskontrolle",
description="RBAC implementieren",
category="ACCESS_CONTROL",
type="TECHNICAL",
applicability="REQUIRED",
implementation_status="IMPLEMENTED",
priority="CRITICAL",
complexity="HIGH",
)
assert mc.control_id == "TOM.ACC.02"
assert mc.priority == "CRITICAL"
class TestTOMMeasureUpdate:
def test_partial(self):
mu = TOMMeasureUpdate(implementation_status="IMPLEMENTED")
data = mu.model_dump(exclude_unset=True)
assert data == {"implementation_status": "IMPLEMENTED"}
def test_empty(self):
mu = TOMMeasureUpdate()
data = mu.model_dump(exclude_unset=True)
assert data == {}
class TestTOMStatsResponse:
def test_defaults(self):
stats = TOMStatsResponse()
assert stats.total == 0
assert stats.by_status == {}
assert stats.overdue_review_count == 0
def test_full(self):
stats = TOMStatsResponse(
total=10,
by_status={"IMPLEMENTED": 5, "NOT_IMPLEMENTED": 3, "PARTIAL": 2},
by_category={"GOVERNANCE": 4, "ACCESS_CONTROL": 6},
overdue_review_count=2,
implemented=5,
partial=2,
not_implemented=3,
)
assert stats.total == 10
assert stats.implemented == 5
class TestTOMMeasureResponse:
def test_from_dict(self):
resp = TOMMeasureResponse(
id="abc",
tenant_id=DEFAULT_TENANT,
control_id="TOM.GOV.01",
name="Test",
category="GOVERNANCE",
type="ORGANIZATIONAL",
)
assert resp.id == "abc"
assert resp.linked_evidence == []
# =============================================================================
# DB Model Tests
# =============================================================================
class TestTOMModels:
def test_state_repr(self):
s = TOMStateDB()
s.tenant_id = "test"
s.version = 3
assert "test" in repr(s)
assert "v3" in repr(s)
def test_measure_repr(self):
m = TOMMeasureDB()
m.control_id = "TOM.ACC.01"
m.name = "Zugriffskontrolle"
assert "TOM.ACC.01" in repr(m)
# =============================================================================
# Helper Function Tests
# =============================================================================
class TestParseDt:
def test_none(self):
assert _parse_dt(None) is None
def test_empty_string(self):
assert _parse_dt("") is None
def test_iso_format(self):
dt = _parse_dt("2024-01-15T10:30:00+00:00")
assert dt is not None
assert dt.year == 2024
assert dt.month == 1
def test_iso_with_z(self):
dt = _parse_dt("2024-06-15T12:00:00Z")
assert dt is not None
assert dt.year == 2024
def test_invalid_string(self):
assert _parse_dt("not-a-date") is None
class TestMeasureToDict:
def test_full_conversion(self):
m = _make_measure_row()
d = _measure_to_dict(m)
assert d["id"] == MEASURE_ID
assert d["control_id"] == "TOM.GOV.01"
assert d["name"] == "Datenschutzrichtlinie"
assert d["category"] == "GOVERNANCE"
assert d["type"] == "ORGANIZATIONAL"
assert d["linked_evidence"] == []
assert d["related_controls"] == {}
assert d["created_at"] is not None
def test_with_dates(self):
m = _make_measure_row(
implementation_date=datetime(2024, 3, 1, tzinfo=timezone.utc),
review_date=datetime(2025, 3, 1, tzinfo=timezone.utc),
)
d = _measure_to_dict(m)
assert "2024-03-01" in d["implementation_date"]
assert "2025-03-01" in d["review_date"]
def test_null_dates(self):
m = _make_measure_row()
d = _measure_to_dict(m)
assert d["implementation_date"] is None
assert d["review_date"] is None
assert d["verified_at"] is None
# =============================================================================
# Route Tests (with mocked DB)
# =============================================================================
from classroom_engine.database import get_db
def override_get_db(mock_db):
def _override():
return mock_db
return _override
class TestStateRoutes:
def test_get_state_new_tenant(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/state?tenant_id=new-tenant")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["isNew"] is True
assert data["data"]["version"] == 0
app.dependency_overrides.clear()
def test_get_state_existing(self):
db = _make_mock_db()
row = _make_state_row(state={"steps": [1, 2, 3]})
db.filter.return_value.first.return_value = row
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get(f"/tom/state?tenant_id={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["version"] == 1
assert data["data"]["state"]["steps"] == [1, 2, 3]
app.dependency_overrides.clear()
def test_post_state_new(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
def mock_refresh(obj):
obj.version = 1
obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
obj.state = {"test": True}
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenant_id": DEFAULT_TENANT,
"state": {"test": True},
})
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
db.add.assert_called_once()
app.dependency_overrides.clear()
def test_post_state_version_conflict(self):
db = _make_mock_db()
row = _make_state_row(version=5)
db.filter.return_value.first.return_value = row
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenant_id": DEFAULT_TENANT,
"state": {"test": True},
"version": 3, # Expected 3, actual 5
})
assert resp.status_code == 409
app.dependency_overrides.clear()
def test_post_state_update_existing(self):
db = _make_mock_db()
row = _make_state_row(version=2)
db.filter.return_value.first.return_value = row
def mock_refresh(obj):
pass # row already has attributes
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenant_id": DEFAULT_TENANT,
"state": {"new": "data"},
"version": 2,
})
assert resp.status_code == 200
assert row.version == 3
assert row.state == {"new": "data"}
app.dependency_overrides.clear()
def test_delete_state(self):
db = _make_mock_db()
row = _make_state_row()
db.filter.return_value.first.return_value = row
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.delete(f"/tom/state?tenant_id={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["deleted"] is True
db.delete.assert_called_once()
app.dependency_overrides.clear()
def test_delete_state_not_found(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.delete(f"/tom/state?tenant_id={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["deleted"] is False
app.dependency_overrides.clear()
def test_delete_state_missing_tenant(self):
db = _make_mock_db()
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.delete("/tom/state")
assert resp.status_code == 400
app.dependency_overrides.clear()
class TestMeasureRoutes:
def test_list_measures_empty(self):
db = _make_mock_db()
db.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = []
db.filter.return_value.count.return_value = 0
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/measures")
assert resp.status_code == 200
data = resp.json()
assert data["measures"] == []
assert data["total"] == 0
app.dependency_overrides.clear()
def test_list_measures_with_data(self):
db = _make_mock_db()
measures = [_make_measure_row("TOM.GOV.01"), _make_measure_row("TOM.ACC.01", name="Zugriff")]
db.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = measures
db.filter.return_value.count.return_value = 2
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/measures")
assert resp.status_code == 200
data = resp.json()
assert len(data["measures"]) == 2
assert data["total"] == 2
app.dependency_overrides.clear()
def test_create_measure(self):
db = _make_mock_db()
# No existing measure with same control_id
db.filter.return_value.first.return_value = None
def mock_refresh(obj):
obj.id = uuid.UUID(MEASURE_ID)
obj.created_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures", json={
"control_id": "TOM.GOV.01",
"name": "Datenschutzrichtlinie",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
})
assert resp.status_code == 201
data = resp.json()
assert data["control_id"] == "TOM.GOV.01"
db.add.assert_called_once()
app.dependency_overrides.clear()
def test_create_measure_duplicate(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = _make_measure_row()
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures", json={
"control_id": "TOM.GOV.01",
"name": "Duplicate",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
})
assert resp.status_code == 409
app.dependency_overrides.clear()
def test_update_measure(self):
db = _make_mock_db()
row = _make_measure_row()
db.filter.return_value.first.return_value = row
def mock_refresh(obj):
pass
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.put(f"/tom/measures/{MEASURE_ID}", json={
"implementation_status": "IMPLEMENTED",
"responsible_person": "Max Mustermann",
})
assert resp.status_code == 200
assert row.implementation_status == "IMPLEMENTED"
assert row.responsible_person == "Max Mustermann"
app.dependency_overrides.clear()
def test_update_measure_not_found(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.put(f"/tom/measures/{UNKNOWN_ID}", json={
"implementation_status": "IMPLEMENTED",
})
assert resp.status_code == 404
app.dependency_overrides.clear()
def test_bulk_upsert_create(self):
db = _make_mock_db()
# No existing measures
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures/bulk", json={
"tenant_id": DEFAULT_TENANT,
"measures": [
{
"control_id": "TOM.GOV.01",
"name": "Datenschutzrichtlinie",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
},
{
"control_id": "TOM.ACC.01",
"name": "Zugriffskontrolle",
"category": "ACCESS_CONTROL",
"type": "TECHNICAL",
},
],
})
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["created"] == 2
assert data["updated"] == 0
assert data["total"] == 2
app.dependency_overrides.clear()
def test_bulk_upsert_update(self):
db = _make_mock_db()
existing = _make_measure_row("TOM.GOV.01")
db.filter.return_value.first.return_value = existing
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures/bulk", json={
"measures": [
{
"control_id": "TOM.GOV.01",
"name": "Updated Name",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
},
],
})
assert resp.status_code == 200
data = resp.json()
assert data["updated"] == 1
assert data["created"] == 0
assert existing.name == "Updated Name"
app.dependency_overrides.clear()
class TestStatsRoute:
def test_stats_empty(self):
db = _make_mock_db()
db.filter.return_value.count.return_value = 0
db.filter.return_value.group_by.return_value.all.return_value = []
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["by_status"] == {}
assert data["by_category"] == {}
app.dependency_overrides.clear()
def test_stats_with_data(self):
db = _make_mock_db()
# Total count
base_q = MagicMock()
base_q.count.return_value = 10
base_q.filter.return_value.count.return_value = 2 # overdue
# Status group_by
status_q = MagicMock()
status_q.all.return_value = [("IMPLEMENTED", 5), ("NOT_IMPLEMENTED", 3), ("PARTIAL", 2)]
# Category group_by
cat_q = MagicMock()
cat_q.all.return_value = [("GOVERNANCE", 4), ("ACCESS_CONTROL", 6)]
call_count = [0]
original_filter = db.query.return_value.filter
def mock_filter(*args):
call_count[0] += 1
if call_count[0] == 1:
return base_q
elif call_count[0] == 2:
mock_gby = MagicMock()
mock_gby.all.return_value = [("IMPLEMENTED", 5), ("NOT_IMPLEMENTED", 3), ("PARTIAL", 2)]
result = MagicMock()
result.group_by.return_value = mock_gby
return result
elif call_count[0] == 3:
mock_gby = MagicMock()
mock_gby.all.return_value = [("GOVERNANCE", 4), ("ACCESS_CONTROL", 6)]
result = MagicMock()
result.group_by.return_value = mock_gby
return result
return MagicMock()
db.query.return_value.filter = mock_filter
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 10
app.dependency_overrides.clear()
class TestExportRoute:
def test_export_json(self):
db = _make_mock_db()
measures = [_make_measure_row("TOM.GOV.01")]
db.filter.return_value.order_by.return_value.all.return_value = measures
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/export?format=json")
assert resp.status_code == 200
assert "application/json" in resp.headers.get("content-type", "")
data = resp.json()
assert len(data) == 1
assert data[0]["control_id"] == "TOM.GOV.01"
app.dependency_overrides.clear()
def test_export_csv(self):
db = _make_mock_db()
measures = [_make_measure_row("TOM.GOV.01")]
db.filter.return_value.order_by.return_value.all.return_value = measures
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/export?format=csv")
assert resp.status_code == 200
assert "text/csv" in resp.headers.get("content-type", "")
content = resp.text
assert "control_id" in content # Header
assert "TOM.GOV.01" in content
app.dependency_overrides.clear()
def test_export_csv_empty(self):
db = _make_mock_db()
db.filter.return_value.order_by.return_value.all.return_value = []
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/export?format=csv")
assert resp.status_code == 200
content = resp.text
assert "control_id" in content # Header still present
app.dependency_overrides.clear()
# =============================================================================
# camelCase tenantId alias tests
# =============================================================================
class TestTenantIdAlias:
def test_get_state_camelcase(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get(f"/tom/state?tenantId={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["data"]["tenantId"] == DEFAULT_TENANT
app.dependency_overrides.clear()
def test_post_state_camelcase(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
def mock_refresh(obj):
obj.version = 1
obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
obj.state = {}
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenantId": DEFAULT_TENANT,
"state": {},
})
assert resp.status_code == 200
app.dependency_overrides.clear()