From 3593a4ff7810259de5c9e4adc8938ca391a38a9e Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Fri, 6 Mar 2026 17:35:44 +0100 Subject: [PATCH] feat(tom): TOM-Backend in Python erstellen, Frontend von In-Memory auf DB migrieren - Migration 034: compliance_tom_state + compliance_tom_measures Tabellen - Python Routes: State CRUD, Measures CRUD, Bulk-Upsert, Stats, CSV/JSON-Export - Frontend-Proxy: In-Memory Storage durch Proxy zu backend-compliance ersetzt - Go TOM-Handler als DEPRECATED markiert (Source of Truth ist jetzt Python) - 44 Tests (alle bestanden) Co-Authored-By: Claude Opus 4.6 --- .../api/sdk/v1/tom-generator/state/route.ts | 193 +---- ai-compliance-sdk/cmd/server/main.go | 4 +- .../internal/api/handlers/dsgvo_handlers.go | 5 + backend-compliance/compliance/api/__init__.py | 3 + backend-compliance/compliance/api/schemas.py | 54 ++ .../compliance/api/tom_routes.py | 575 ++++++++++++++ .../compliance/db/tom_models.py | 79 ++ backend-compliance/migrations/034_tom.sql | 61 ++ backend-compliance/tests/test_tom_routes.py | 735 ++++++++++++++++++ 9 files changed, 1545 insertions(+), 164 deletions(-) create mode 100644 backend-compliance/compliance/api/tom_routes.py create mode 100644 backend-compliance/compliance/db/tom_models.py create mode 100644 backend-compliance/migrations/034_tom.sql create mode 100644 backend-compliance/tests/test_tom_routes.py diff --git a/admin-compliance/app/api/sdk/v1/tom-generator/state/route.ts b/admin-compliance/app/api/sdk/v1/tom-generator/state/route.ts index 90c22e0..b64b290 100644 --- a/admin-compliance/app/api/sdk/v1/tom-generator/state/route.ts +++ b/admin-compliance/app/api/sdk/v1/tom-generator/state/route.ts @@ -1,119 +1,30 @@ import { NextRequest, NextResponse } from 'next/server' -import { - TOMGeneratorState, - createEmptyTOMGeneratorState, -} from '@/lib/sdk/tom-generator/types' /** - * TOM Generator State API + * TOM Generator State API — Proxy to backend-compliance (Python/FastAPI) * * GET /api/sdk/v1/tom-generator/state?tenantId=xxx - Load TOM generator state * POST /api/sdk/v1/tom-generator/state - Save TOM generator state * DELETE /api/sdk/v1/tom-generator/state?tenantId=xxx - Clear state */ -// ============================================================================= -// STORAGE (In-Memory for development) -// ============================================================================= - -interface StoredTOMState { - state: TOMGeneratorState - version: number - createdAt: string - updatedAt: string -} - -class InMemoryTOMStateStore { - private store: Map = new Map() - - async get(tenantId: string): Promise { - return this.store.get(tenantId) || null - } - - async save(tenantId: string, state: TOMGeneratorState, expectedVersion?: number): Promise { - const existing = this.store.get(tenantId) - - if (expectedVersion !== undefined && existing && existing.version !== expectedVersion) { - const error = new Error('Version conflict') as Error & { status: number } - error.status = 409 - throw error - } - - const now = new Date().toISOString() - const newVersion = existing ? existing.version + 1 : 1 - - const stored: StoredTOMState = { - state: { - ...state, - updatedAt: new Date(now), - }, - version: newVersion, - createdAt: existing?.createdAt || now, - updatedAt: now, - } - - this.store.set(tenantId, stored) - return stored - } - - async delete(tenantId: string): Promise { - return this.store.delete(tenantId) - } - - async list(): Promise<{ tenantId: string; updatedAt: string }[]> { - const result: { tenantId: string; updatedAt: string }[] = [] - this.store.forEach((value, key) => { - result.push({ tenantId: key, updatedAt: value.updatedAt }) - }) - return result - } -} - -const stateStore = new InMemoryTOMStateStore() - -// ============================================================================= -// HANDLERS -// ============================================================================= +const BACKEND_URL = process.env.COMPLIANCE_BACKEND_URL || 'http://backend-compliance:8002' export async function GET(request: NextRequest) { try { const { searchParams } = new URL(request.url) const tenantId = searchParams.get('tenantId') - // List all states if no tenantId provided - if (!tenantId) { - const states = await stateStore.list() - return NextResponse.json({ - success: true, - data: states, - }) - } + const url = tenantId + ? `${BACKEND_URL}/api/compliance/tom/state?tenant_id=${encodeURIComponent(tenantId)}` + : `${BACKEND_URL}/api/compliance/tom/state` - const stored = await stateStore.get(tenantId) - - if (!stored) { - // Return empty state for new tenants - const emptyState = createEmptyTOMGeneratorState(tenantId) - return NextResponse.json({ - success: true, - data: { - tenantId, - state: emptyState, - version: 0, - isNew: true, - }, - }) - } - - return NextResponse.json({ - success: true, - data: { - tenantId, - state: stored.state, - version: stored.version, - lastModified: stored.updatedAt, - }, + const res = await fetch(url, { + headers: { 'Content-Type': 'application/json' }, }) + + const data = await res.json() + return NextResponse.json(data, { status: res.status }) } catch (error) { console.error('Failed to load TOM generator state:', error) return NextResponse.json( @@ -142,65 +53,19 @@ export async function POST(request: NextRequest) { ) } - // Deserialize dates - const parsedState: TOMGeneratorState = { - ...state, - createdAt: new Date(state.createdAt), - updatedAt: new Date(state.updatedAt), - steps: state.steps.map((step: { id: string; completed: boolean; data: unknown; validatedAt: string | null }) => ({ - ...step, - validatedAt: step.validatedAt ? new Date(step.validatedAt) : null, - })), - documents: state.documents?.map((doc: { uploadedAt: string; validFrom?: string; validUntil?: string; aiAnalysis?: { analyzedAt: string } }) => ({ - ...doc, - uploadedAt: new Date(doc.uploadedAt), - validFrom: doc.validFrom ? new Date(doc.validFrom) : null, - validUntil: doc.validUntil ? new Date(doc.validUntil) : null, - aiAnalysis: doc.aiAnalysis ? { - ...doc.aiAnalysis, - analyzedAt: new Date(doc.aiAnalysis.analyzedAt), - } : null, - })) || [], - derivedTOMs: state.derivedTOMs?.map((tom: { implementationDate?: string; reviewDate?: string }) => ({ - ...tom, - implementationDate: tom.implementationDate ? new Date(tom.implementationDate) : null, - reviewDate: tom.reviewDate ? new Date(tom.reviewDate) : null, - })) || [], - gapAnalysis: state.gapAnalysis ? { - ...state.gapAnalysis, - generatedAt: new Date(state.gapAnalysis.generatedAt), - } : null, - exports: state.exports?.map((exp: { generatedAt: string }) => ({ - ...exp, - generatedAt: new Date(exp.generatedAt), - })) || [], - } - - const stored = await stateStore.save(tenantId, parsedState, version) - - return NextResponse.json({ - success: true, - data: { - tenantId, - state: stored.state, - version: stored.version, - lastModified: stored.updatedAt, - }, + const res = await fetch(`${BACKEND_URL}/api/compliance/tom/state`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + tenant_id: tenantId, + state, + version, + }), }) + + const data = await res.json() + return NextResponse.json(data, { status: res.status }) } catch (error) { - const err = error as Error & { status?: number } - - if (err.status === 409 || err.message === 'Version conflict') { - return NextResponse.json( - { - success: false, - error: 'Version conflict. State was modified by another request.', - code: 'VERSION_CONFLICT', - }, - { status: 409 } - ) - } - console.error('Failed to save TOM generator state:', error) return NextResponse.json( { success: false, error: 'Failed to save state' }, @@ -221,14 +86,16 @@ export async function DELETE(request: NextRequest) { ) } - const deleted = await stateStore.delete(tenantId) + const res = await fetch( + `${BACKEND_URL}/api/compliance/tom/state?tenant_id=${encodeURIComponent(tenantId)}`, + { + method: 'DELETE', + headers: { 'Content-Type': 'application/json' }, + } + ) - return NextResponse.json({ - success: true, - tenantId, - deleted, - deletedAt: new Date().toISOString(), - }) + const data = await res.json() + return NextResponse.json(data, { status: res.status }) } catch (error) { console.error('Failed to delete TOM generator state:', error) return NextResponse.json( diff --git a/ai-compliance-sdk/cmd/server/main.go b/ai-compliance-sdk/cmd/server/main.go index ea11206..a1e55d4 100644 --- a/ai-compliance-sdk/cmd/server/main.go +++ b/ai-compliance-sdk/cmd/server/main.go @@ -263,6 +263,8 @@ func main() { } // TOM - Technische und Organisatorische Maßnahmen (Art. 32) + // DEPRECATED: TOM is now managed by backend-compliance (Python). + // Use: GET/POST /api/compliance/tom/state, /tom/measures, /tom/stats, /tom/export tom := dsgvoRoutes.Group("/tom") { tom.GET("", dsgvoHandlers.ListTOMs) @@ -301,7 +303,7 @@ func main() { exports := dsgvoRoutes.Group("/export") { exports.GET("/vvt", dsgvoHandlers.ExportVVT) // DEPRECATED: use backend-compliance /vvt/export?format=csv - exports.GET("/tom", dsgvoHandlers.ExportTOM) + exports.GET("/tom", dsgvoHandlers.ExportTOM) // DEPRECATED: use backend-compliance /tom/export?format=csv exports.GET("/dsr", dsgvoHandlers.ExportDSR) exports.GET("/retention", dsgvoHandlers.ExportRetentionPolicies) } diff --git a/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go b/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go index f3c085a..f44c069 100644 --- a/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go +++ b/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go @@ -144,8 +144,12 @@ func (h *DSGVOHandlers) DeleteProcessingActivity(c *gin.Context) { // ============================================================================ // TOM - Technische und Organisatorische Maßnahmen // ============================================================================ +// DEPRECATED: TOM is now managed by backend-compliance (Python). +// These handlers remain for backwards compatibility but should not be used. +// Use backend-compliance endpoints: GET/POST /api/compliance/tom/... // ListTOMs returns all TOMs for a tenant +// DEPRECATED: Use backend-compliance GET /api/compliance/tom/measures func (h *DSGVOHandlers) ListTOMs(c *gin.Context) { tenantID := rbac.GetTenantID(c) if tenantID == uuid.Nil { @@ -553,6 +557,7 @@ func (h *DSGVOHandlers) ExportVVT(c *gin.Context) { } // ExportTOM exports the TOM catalog as CSV/JSON +// DEPRECATED: Use backend-compliance GET /api/compliance/tom/export func (h *DSGVOHandlers) ExportTOM(c *gin.Context) { tenantID := rbac.GetTenantID(c) if tenantID == uuid.Nil { diff --git a/backend-compliance/compliance/api/__init__.py b/backend-compliance/compliance/api/__init__.py index 8eeb2d3..b2e593c 100644 --- a/backend-compliance/compliance/api/__init__.py +++ b/backend-compliance/compliance/api/__init__.py @@ -26,6 +26,7 @@ from .dsr_routes import router as dsr_router from .email_template_routes import router as email_template_router from .banner_routes import router as banner_router from .extraction_routes import router as extraction_router +from .tom_routes import router as tom_router # Include sub-routers router.include_router(audit_router) @@ -53,6 +54,7 @@ router.include_router(dsr_router) router.include_router(email_template_router) router.include_router(banner_router) router.include_router(extraction_router) +router.include_router(tom_router) __all__ = [ "router", @@ -80,4 +82,5 @@ __all__ = [ "dsr_router", "email_template_router", "banner_router", + "tom_router", ] diff --git a/backend-compliance/compliance/api/schemas.py b/backend-compliance/compliance/api/schemas.py index 95d08c1..059ccfa 100644 --- a/backend-compliance/compliance/api/schemas.py +++ b/backend-compliance/compliance/api/schemas.py @@ -2002,3 +2002,57 @@ class VVTAuditLogEntry(BaseModel): class Config: from_attributes = True + + +# ============================================================================ +# TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO) +# ============================================================================ + +class TOMStateResponse(BaseModel): + tenant_id: str + state: Dict[str, Any] = {} + version: int = 0 + last_modified: Optional[datetime] = None + is_new: bool = False + + +class TOMMeasureResponse(BaseModel): + id: str + tenant_id: str + control_id: str + name: str + description: Optional[str] = None + category: str + type: str + applicability: str = "REQUIRED" + applicability_reason: Optional[str] = None + implementation_status: str = "NOT_IMPLEMENTED" + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[datetime] = None + review_date: Optional[datetime] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: List[Any] = [] + evidence_gaps: List[Any] = [] + related_controls: Dict[str, Any] = {} + verified_at: Optional[datetime] = None + verified_by: Optional[str] = None + effectiveness_rating: Optional[str] = None + created_by: Optional[str] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + class Config: + from_attributes = True + + +class TOMStatsResponse(BaseModel): + total: int = 0 + by_status: Dict[str, int] = {} + by_category: Dict[str, int] = {} + overdue_review_count: int = 0 + implemented: int = 0 + partial: int = 0 + not_implemented: int = 0 diff --git a/backend-compliance/compliance/api/tom_routes.py b/backend-compliance/compliance/api/tom_routes.py new file mode 100644 index 0000000..0869189 --- /dev/null +++ b/backend-compliance/compliance/api/tom_routes.py @@ -0,0 +1,575 @@ +""" +FastAPI routes for TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO). + +Endpoints: + GET /tom/state — Load TOM generator state for tenant + POST /tom/state — Save state (with version check) + DELETE /tom/state — Reset/clear state for tenant + GET /tom/measures — List measures (filter: category, status, tenant_id) + POST /tom/measures — Create single measure + PUT /tom/measures/{id} — Update measure + POST /tom/measures/bulk — Bulk upsert (for deriveTOMs sync) + GET /tom/stats — Statistics + GET /tom/export — Export as CSV or JSON +""" + +import csv +import io +import json +import logging +from datetime import datetime, timezone +from typing import Optional, List, Any, Dict +from uuid import UUID, uuid4 + +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.responses import StreamingResponse +from pydantic import BaseModel, Field +from sqlalchemy import func +from sqlalchemy.orm import Session + +from classroom_engine.database import get_db + +from ..db.tom_models import TOMStateDB, TOMMeasureDB + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/tom", tags=["tom"]) + +DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" + + +# ============================================================================= +# Pydantic Schemas (kept close to routes like loeschfristen pattern) +# ============================================================================= + +class TOMStateBody(BaseModel): + tenant_id: Optional[str] = None + tenantId: Optional[str] = None # Accept camelCase from frontend + state: Dict[str, Any] + version: Optional[int] = None + + def get_tenant_id(self) -> str: + return self.tenant_id or self.tenantId or DEFAULT_TENANT_ID + + +class TOMMeasureCreate(BaseModel): + control_id: str + name: str + description: Optional[str] = None + category: str + type: str + applicability: str = "REQUIRED" + applicability_reason: Optional[str] = None + implementation_status: str = "NOT_IMPLEMENTED" + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[str] = None + review_date: Optional[str] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: Optional[List[Any]] = None + evidence_gaps: Optional[List[Any]] = None + related_controls: Optional[Dict[str, Any]] = None + verified_at: Optional[str] = None + verified_by: Optional[str] = None + effectiveness_rating: Optional[str] = None + + +class TOMMeasureUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + category: Optional[str] = None + type: Optional[str] = None + applicability: Optional[str] = None + applicability_reason: Optional[str] = None + implementation_status: Optional[str] = None + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[str] = None + review_date: Optional[str] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: Optional[List[Any]] = None + evidence_gaps: Optional[List[Any]] = None + related_controls: Optional[Dict[str, Any]] = None + verified_at: Optional[str] = None + verified_by: Optional[str] = None + effectiveness_rating: Optional[str] = None + + +class TOMMeasureBulkItem(BaseModel): + control_id: str + name: str + description: Optional[str] = None + category: str + type: str + applicability: str = "REQUIRED" + applicability_reason: Optional[str] = None + implementation_status: str = "NOT_IMPLEMENTED" + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[str] = None + review_date: Optional[str] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: Optional[List[Any]] = None + evidence_gaps: Optional[List[Any]] = None + related_controls: Optional[Dict[str, Any]] = None + + +class TOMMeasureBulkBody(BaseModel): + tenant_id: Optional[str] = None + measures: List[TOMMeasureBulkItem] + + +# ============================================================================= +# Helper: parse optional datetime strings +# ============================================================================= + +def _parse_dt(val: Optional[str]) -> Optional[datetime]: + if not val: + return None + try: + return datetime.fromisoformat(val.replace("Z", "+00:00")) + except (ValueError, AttributeError): + return None + + +def _measure_to_dict(m: TOMMeasureDB) -> dict: + return { + "id": str(m.id), + "tenant_id": m.tenant_id, + "control_id": m.control_id, + "name": m.name, + "description": m.description, + "category": m.category, + "type": m.type, + "applicability": m.applicability, + "applicability_reason": m.applicability_reason, + "implementation_status": m.implementation_status, + "responsible_person": m.responsible_person, + "responsible_department": m.responsible_department, + "implementation_date": m.implementation_date.isoformat() if m.implementation_date else None, + "review_date": m.review_date.isoformat() if m.review_date else None, + "review_frequency": m.review_frequency, + "priority": m.priority, + "complexity": m.complexity, + "linked_evidence": m.linked_evidence or [], + "evidence_gaps": m.evidence_gaps or [], + "related_controls": m.related_controls or {}, + "verified_at": m.verified_at.isoformat() if m.verified_at else None, + "verified_by": m.verified_by, + "effectiveness_rating": m.effectiveness_rating, + "created_by": m.created_by, + "created_at": m.created_at.isoformat() if m.created_at else None, + "updated_at": m.updated_at.isoformat() if m.updated_at else None, + } + + +# ============================================================================= +# STATE ENDPOINTS +# ============================================================================= + +@router.get("/state") +async def get_tom_state( + tenant_id: Optional[str] = Query(None, alias="tenant_id"), + tenantId: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Load TOM generator state for a tenant.""" + tid = tenant_id or tenantId or DEFAULT_TENANT_ID + + row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first() + + if not row: + return { + "success": True, + "data": { + "tenantId": tid, + "state": {}, + "version": 0, + "isNew": True, + }, + } + + return { + "success": True, + "data": { + "tenantId": tid, + "state": row.state, + "version": row.version, + "lastModified": row.updated_at.isoformat() if row.updated_at else None, + }, + } + + +@router.post("/state") +async def save_tom_state(body: TOMStateBody, db: Session = Depends(get_db)): + """Save TOM generator state with optimistic locking (version check).""" + tid = body.get_tenant_id() + + existing = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first() + + # Version conflict check + if body.version is not None and existing and existing.version != body.version: + raise HTTPException( + status_code=409, + detail={ + "success": False, + "error": "Version conflict. State was modified by another request.", + "code": "VERSION_CONFLICT", + }, + ) + + now = datetime.now(timezone.utc) + + if existing: + existing.state = body.state + existing.version = existing.version + 1 + existing.updated_at = now + else: + existing = TOMStateDB( + tenant_id=tid, + state=body.state, + version=1, + created_at=now, + updated_at=now, + ) + db.add(existing) + + db.commit() + db.refresh(existing) + + return { + "success": True, + "data": { + "tenantId": tid, + "state": existing.state, + "version": existing.version, + "lastModified": existing.updated_at.isoformat() if existing.updated_at else None, + }, + } + + +@router.delete("/state") +async def delete_tom_state( + tenant_id: Optional[str] = Query(None, alias="tenant_id"), + tenantId: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Clear TOM generator state for a tenant.""" + tid = tenant_id or tenantId + if not tid: + raise HTTPException(status_code=400, detail="tenant_id is required") + + row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first() + deleted = False + if row: + db.delete(row) + db.commit() + deleted = True + + return { + "success": True, + "tenantId": tid, + "deleted": deleted, + "deletedAt": datetime.now(timezone.utc).isoformat(), + } + + +# ============================================================================= +# MEASURES ENDPOINTS +# ============================================================================= + +@router.get("/measures") +async def list_measures( + tenant_id: Optional[str] = Query(None), + category: Optional[str] = Query(None), + implementation_status: Optional[str] = Query(None), + priority: Optional[str] = Query(None), + search: Optional[str] = Query(None), + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0), + db: Session = Depends(get_db), +): + """List TOM measures with optional filters.""" + tid = tenant_id or DEFAULT_TENANT_ID + q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid) + + if category: + q = q.filter(TOMMeasureDB.category == category) + if implementation_status: + q = q.filter(TOMMeasureDB.implementation_status == implementation_status) + if priority: + q = q.filter(TOMMeasureDB.priority == priority) + if search: + pattern = f"%{search}%" + q = q.filter( + (TOMMeasureDB.name.ilike(pattern)) + | (TOMMeasureDB.description.ilike(pattern)) + | (TOMMeasureDB.control_id.ilike(pattern)) + ) + + total = q.count() + rows = q.order_by(TOMMeasureDB.control_id).offset(offset).limit(limit).all() + + return { + "measures": [_measure_to_dict(r) for r in rows], + "total": total, + "limit": limit, + "offset": offset, + } + + +@router.post("/measures", status_code=201) +async def create_measure( + body: TOMMeasureCreate, + tenant_id: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Create a single TOM measure.""" + tid = tenant_id or DEFAULT_TENANT_ID + + # Check for duplicate control_id + existing = ( + db.query(TOMMeasureDB) + .filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == body.control_id) + .first() + ) + if existing: + raise HTTPException(status_code=409, detail=f"Measure with control_id '{body.control_id}' already exists") + + now = datetime.now(timezone.utc) + measure = TOMMeasureDB( + tenant_id=tid, + control_id=body.control_id, + name=body.name, + description=body.description, + category=body.category, + type=body.type, + applicability=body.applicability, + applicability_reason=body.applicability_reason, + implementation_status=body.implementation_status, + responsible_person=body.responsible_person, + responsible_department=body.responsible_department, + implementation_date=_parse_dt(body.implementation_date), + review_date=_parse_dt(body.review_date), + review_frequency=body.review_frequency, + priority=body.priority, + complexity=body.complexity, + linked_evidence=body.linked_evidence or [], + evidence_gaps=body.evidence_gaps or [], + related_controls=body.related_controls or {}, + verified_at=_parse_dt(body.verified_at), + verified_by=body.verified_by, + effectiveness_rating=body.effectiveness_rating, + created_at=now, + updated_at=now, + ) + db.add(measure) + db.commit() + db.refresh(measure) + + return _measure_to_dict(measure) + + +@router.put("/measures/{measure_id}") +async def update_measure( + measure_id: UUID, + body: TOMMeasureUpdate, + db: Session = Depends(get_db), +): + """Update a TOM measure.""" + row = db.query(TOMMeasureDB).filter(TOMMeasureDB.id == measure_id).first() + if not row: + raise HTTPException(status_code=404, detail="Measure not found") + + update_data = body.model_dump(exclude_unset=True) + for key, val in update_data.items(): + if key in ("implementation_date", "review_date", "verified_at"): + val = _parse_dt(val) + setattr(row, key, val) + + row.updated_at = datetime.now(timezone.utc) + db.commit() + db.refresh(row) + + return _measure_to_dict(row) + + +@router.post("/measures/bulk") +async def bulk_upsert_measures( + body: TOMMeasureBulkBody, + db: Session = Depends(get_db), +): + """Bulk upsert measures — used by deriveTOMs sync from frontend.""" + tid = body.tenant_id or DEFAULT_TENANT_ID + now = datetime.now(timezone.utc) + + created = 0 + updated = 0 + + for item in body.measures: + existing = ( + db.query(TOMMeasureDB) + .filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == item.control_id) + .first() + ) + + if existing: + existing.name = item.name + existing.description = item.description + existing.category = item.category + existing.type = item.type + existing.applicability = item.applicability + existing.applicability_reason = item.applicability_reason + existing.implementation_status = item.implementation_status + existing.responsible_person = item.responsible_person + existing.responsible_department = item.responsible_department + existing.implementation_date = _parse_dt(item.implementation_date) + existing.review_date = _parse_dt(item.review_date) + existing.review_frequency = item.review_frequency + existing.priority = item.priority + existing.complexity = item.complexity + existing.linked_evidence = item.linked_evidence or [] + existing.evidence_gaps = item.evidence_gaps or [] + existing.related_controls = item.related_controls or {} + existing.updated_at = now + updated += 1 + else: + measure = TOMMeasureDB( + tenant_id=tid, + control_id=item.control_id, + name=item.name, + description=item.description, + category=item.category, + type=item.type, + applicability=item.applicability, + applicability_reason=item.applicability_reason, + implementation_status=item.implementation_status, + responsible_person=item.responsible_person, + responsible_department=item.responsible_department, + implementation_date=_parse_dt(item.implementation_date), + review_date=_parse_dt(item.review_date), + review_frequency=item.review_frequency, + priority=item.priority, + complexity=item.complexity, + linked_evidence=item.linked_evidence or [], + evidence_gaps=item.evidence_gaps or [], + related_controls=item.related_controls or {}, + created_at=now, + updated_at=now, + ) + db.add(measure) + created += 1 + + db.commit() + + return { + "success": True, + "tenant_id": tid, + "created": created, + "updated": updated, + "total": created + updated, + } + + +# ============================================================================= +# STATS & EXPORT +# ============================================================================= + +@router.get("/stats") +async def get_tom_stats( + tenant_id: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Return TOM statistics for a tenant.""" + tid = tenant_id or DEFAULT_TENANT_ID + + base_q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid) + total = base_q.count() + + # By status + status_rows = ( + db.query(TOMMeasureDB.implementation_status, func.count(TOMMeasureDB.id)) + .filter(TOMMeasureDB.tenant_id == tid) + .group_by(TOMMeasureDB.implementation_status) + .all() + ) + by_status = {row[0]: row[1] for row in status_rows} + + # By category + cat_rows = ( + db.query(TOMMeasureDB.category, func.count(TOMMeasureDB.id)) + .filter(TOMMeasureDB.tenant_id == tid) + .group_by(TOMMeasureDB.category) + .all() + ) + by_category = {row[0]: row[1] for row in cat_rows} + + # Overdue reviews + now = datetime.now(timezone.utc) + overdue = ( + base_q.filter( + TOMMeasureDB.review_date.isnot(None), + TOMMeasureDB.review_date < now, + ) + .count() + ) + + return { + "total": total, + "by_status": by_status, + "by_category": by_category, + "overdue_review_count": overdue, + "implemented": by_status.get("IMPLEMENTED", 0), + "partial": by_status.get("PARTIAL", 0), + "not_implemented": by_status.get("NOT_IMPLEMENTED", 0), + } + + +@router.get("/export") +async def export_measures( + tenant_id: Optional[str] = Query(None), + format: str = Query("csv"), + db: Session = Depends(get_db), +): + """Export TOM measures as CSV (semicolon-separated) or JSON.""" + tid = tenant_id or DEFAULT_TENANT_ID + + rows = ( + db.query(TOMMeasureDB) + .filter(TOMMeasureDB.tenant_id == tid) + .order_by(TOMMeasureDB.control_id) + .all() + ) + measures = [_measure_to_dict(r) for r in rows] + + if format == "json": + return StreamingResponse( + io.BytesIO(json.dumps(measures, ensure_ascii=False, indent=2).encode("utf-8")), + media_type="application/json", + headers={"Content-Disposition": "attachment; filename=tom_export.json"}, + ) + + # CSV (semicolon, like VVT) + output = io.StringIO() + fieldnames = [ + "control_id", "name", "description", "category", "type", + "applicability", "implementation_status", "responsible_person", + "responsible_department", "implementation_date", "review_date", + "review_frequency", "priority", "complexity", "effectiveness_rating", + ] + writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=";", extrasaction="ignore") + writer.writeheader() + for m in measures: + writer.writerow(m) + + output.seek(0) + return StreamingResponse( + io.BytesIO(output.getvalue().encode("utf-8")), + media_type="text/csv; charset=utf-8", + headers={"Content-Disposition": "attachment; filename=tom_export.csv"}, + ) diff --git a/backend-compliance/compliance/db/tom_models.py b/backend-compliance/compliance/db/tom_models.py new file mode 100644 index 0000000..4409eee --- /dev/null +++ b/backend-compliance/compliance/db/tom_models.py @@ -0,0 +1,79 @@ +""" +SQLAlchemy models for TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO). + +Tables: +- compliance_tom_state: Full TOM-Generator state per tenant (JSONB blob) +- compliance_tom_measures: Individual TOM measures (flat, queryable) +""" + +import uuid +from datetime import datetime + +from sqlalchemy import ( + Column, String, Text, Integer, DateTime, JSON, Index +) +from sqlalchemy.dialects.postgresql import UUID + +from classroom_engine.database import Base + + +class TOMStateDB(Base): + """Persists the entire TOM-Generator state per tenant.""" + + __tablename__ = 'compliance_tom_state' + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id = Column(String(100), nullable=False, unique=True) + state = Column(JSON, nullable=False, default=dict) + version = Column(Integer, nullable=False, default=1) + created_at = Column(DateTime(timezone=True), default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) + + __table_args__ = ( + Index('idx_tom_state_tenant', 'tenant_id'), + ) + + def __repr__(self): + return f"" + + +class TOMMeasureDB(Base): + """Individual TOM measure — flat, queryable, for reports and export.""" + + __tablename__ = 'compliance_tom_measures' + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id = Column(String(100), nullable=False) + control_id = Column(String(50), nullable=False) + name = Column(String(300), nullable=False) + description = Column(Text) + category = Column(String(50), nullable=False) + type = Column(String(20), nullable=False) + applicability = Column(String(20), default='REQUIRED') + applicability_reason = Column(Text) + implementation_status = Column(String(20), default='NOT_IMPLEMENTED') + responsible_person = Column(String(255)) + responsible_department = Column(String(255)) + implementation_date = Column(DateTime(timezone=True)) + review_date = Column(DateTime(timezone=True)) + review_frequency = Column(String(20)) + priority = Column(String(20)) + complexity = Column(String(20)) + linked_evidence = Column(JSON, default=list) + evidence_gaps = Column(JSON, default=list) + related_controls = Column(JSON, default=dict) + verified_at = Column(DateTime(timezone=True)) + verified_by = Column(String(200)) + effectiveness_rating = Column(String(20)) + created_by = Column(String(200), default='system') + created_at = Column(DateTime(timezone=True), default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) + + __table_args__ = ( + Index('idx_tom_measures_tenant', 'tenant_id'), + Index('idx_tom_measures_category', 'tenant_id', 'category'), + Index('idx_tom_measures_status', 'tenant_id', 'implementation_status'), + ) + + def __repr__(self): + return f"" diff --git a/backend-compliance/migrations/034_tom.sql b/backend-compliance/migrations/034_tom.sql new file mode 100644 index 0000000..385bccc --- /dev/null +++ b/backend-compliance/migrations/034_tom.sql @@ -0,0 +1,61 @@ +-- Migration 034: TOM (Technisch-Organisatorische Massnahmen, Art. 32 DSGVO) +-- +-- Two tables: +-- 1. compliance_tom_state: Persists the full TOM-Generator state per tenant (replaces In-Memory) +-- 2. compliance_tom_measures: Individual TOM measures (flat, queryable, for reports/export) + +BEGIN; + +-- --------------------------------------------------------------------------- +-- 1. TOM Generator State (one JSONB blob per tenant) +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS compliance_tom_state ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id VARCHAR(100) NOT NULL, + state JSONB NOT NULL DEFAULT '{}', + version INT NOT NULL DEFAULT 1, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(tenant_id) +); + +CREATE INDEX IF NOT EXISTS idx_tom_state_tenant ON compliance_tom_state(tenant_id); + +-- --------------------------------------------------------------------------- +-- 2. Individual TOM Measures (flat, queryable) +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS compliance_tom_measures ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id VARCHAR(100) NOT NULL, + control_id VARCHAR(50) NOT NULL, + name VARCHAR(300) NOT NULL, + description TEXT, + category VARCHAR(50) NOT NULL, + type VARCHAR(20) NOT NULL, + applicability VARCHAR(20) DEFAULT 'REQUIRED', + applicability_reason TEXT, + implementation_status VARCHAR(20) DEFAULT 'NOT_IMPLEMENTED', + responsible_person VARCHAR(255), + responsible_department VARCHAR(255), + implementation_date TIMESTAMPTZ, + review_date TIMESTAMPTZ, + review_frequency VARCHAR(20), + priority VARCHAR(20), + complexity VARCHAR(20), + linked_evidence JSONB DEFAULT '[]', + evidence_gaps JSONB DEFAULT '[]', + related_controls JSONB DEFAULT '{}', + verified_at TIMESTAMPTZ, + verified_by VARCHAR(200), + effectiveness_rating VARCHAR(20), + created_by VARCHAR(200) DEFAULT 'system', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(tenant_id, control_id) +); + +CREATE INDEX IF NOT EXISTS idx_tom_measures_tenant ON compliance_tom_measures(tenant_id); +CREATE INDEX IF NOT EXISTS idx_tom_measures_category ON compliance_tom_measures(tenant_id, category); +CREATE INDEX IF NOT EXISTS idx_tom_measures_status ON compliance_tom_measures(tenant_id, implementation_status); + +COMMIT; diff --git a/backend-compliance/tests/test_tom_routes.py b/backend-compliance/tests/test_tom_routes.py new file mode 100644 index 0000000..bbb8017 --- /dev/null +++ b/backend-compliance/tests/test_tom_routes.py @@ -0,0 +1,735 @@ +"""Tests for TOM routes (tom_routes.py, tom_models.py).""" + +import pytest +import uuid +from unittest.mock import MagicMock, patch, PropertyMock +from datetime import datetime, timezone +from fastapi.testclient import TestClient +from fastapi import FastAPI + +from compliance.api.tom_routes import ( + router, + TOMStateBody, + TOMMeasureCreate, + TOMMeasureUpdate, + TOMMeasureBulkBody, + TOMMeasureBulkItem, + _parse_dt, + _measure_to_dict, + DEFAULT_TENANT_ID, +) +from compliance.db.tom_models import TOMStateDB, TOMMeasureDB +from compliance.api.schemas import TOMStatsResponse, TOMMeasureResponse + + +# ============================================================================= +# Test App Setup +# ============================================================================= + +app = FastAPI() +app.include_router(router) + +DEFAULT_TENANT = DEFAULT_TENANT_ID +MEASURE_ID = "ffffffff-0001-0001-0001-000000000001" +UNKNOWN_ID = "aaaaaaaa-9999-9999-9999-999999999999" + + +# ============================================================================= +# Helper: create mock DB session +# ============================================================================= + +def _make_mock_db(): + db = MagicMock() + db.query.return_value = db + db.filter.return_value = db + db.first.return_value = None + db.count.return_value = 0 + db.all.return_value = [] + db.offset.return_value = db + db.limit.return_value = db + db.order_by.return_value = db + db.group_by.return_value = db + return db + + +def _make_state_row(tenant_id=DEFAULT_TENANT, version=1, state=None): + row = TOMStateDB() + row.id = uuid.uuid4() + row.tenant_id = tenant_id + row.state = state or {"steps": [], "derivedTOMs": []} + row.version = version + row.created_at = datetime(2024, 1, 1, tzinfo=timezone.utc) + row.updated_at = datetime(2024, 1, 2, tzinfo=timezone.utc) + return row + + +def _make_measure_row(control_id="TOM.GOV.01", **kwargs): + m = TOMMeasureDB() + m.id = uuid.UUID(kwargs.get("id", MEASURE_ID)) + m.tenant_id = kwargs.get("tenant_id", DEFAULT_TENANT) + m.control_id = control_id + m.name = kwargs.get("name", "Datenschutzrichtlinie") + m.description = kwargs.get("description", "Beschreibung") + m.category = kwargs.get("category", "GOVERNANCE") + m.type = kwargs.get("type", "ORGANIZATIONAL") + m.applicability = kwargs.get("applicability", "REQUIRED") + m.applicability_reason = kwargs.get("applicability_reason", None) + m.implementation_status = kwargs.get("implementation_status", "NOT_IMPLEMENTED") + m.responsible_person = kwargs.get("responsible_person", None) + m.responsible_department = kwargs.get("responsible_department", None) + m.implementation_date = kwargs.get("implementation_date", None) + m.review_date = kwargs.get("review_date", None) + m.review_frequency = kwargs.get("review_frequency", "ANNUAL") + m.priority = kwargs.get("priority", "HIGH") + m.complexity = kwargs.get("complexity", "MEDIUM") + m.linked_evidence = kwargs.get("linked_evidence", []) + m.evidence_gaps = kwargs.get("evidence_gaps", []) + m.related_controls = kwargs.get("related_controls", {}) + m.verified_at = kwargs.get("verified_at", None) + m.verified_by = kwargs.get("verified_by", None) + m.effectiveness_rating = kwargs.get("effectiveness_rating", None) + m.created_by = kwargs.get("created_by", "system") + m.created_at = kwargs.get("created_at", datetime(2024, 1, 1, tzinfo=timezone.utc)) + m.updated_at = kwargs.get("updated_at", datetime(2024, 1, 2, tzinfo=timezone.utc)) + return m + + +# ============================================================================= +# Schema Tests +# ============================================================================= + +class TestTOMStateBody: + def test_get_tenant_id_from_tenant_id(self): + body = TOMStateBody(tenant_id="abc", state={}) + assert body.get_tenant_id() == "abc" + + def test_get_tenant_id_from_camelcase(self): + body = TOMStateBody(tenantId="def", state={}) + assert body.get_tenant_id() == "def" + + def test_get_tenant_id_default(self): + body = TOMStateBody(state={}) + assert body.get_tenant_id() == DEFAULT_TENANT + + def test_version_optional(self): + body = TOMStateBody(tenant_id="x", state={"foo": "bar"}) + assert body.version is None + + +class TestTOMMeasureCreate: + def test_defaults(self): + mc = TOMMeasureCreate( + control_id="TOM.GOV.01", + name="Test", + category="GOVERNANCE", + type="ORGANIZATIONAL", + ) + assert mc.applicability == "REQUIRED" + assert mc.implementation_status == "NOT_IMPLEMENTED" + assert mc.priority is None + assert mc.linked_evidence is None + + def test_full_values(self): + mc = TOMMeasureCreate( + control_id="TOM.ACC.02", + name="Zugriffskontrolle", + description="RBAC implementieren", + category="ACCESS_CONTROL", + type="TECHNICAL", + applicability="REQUIRED", + implementation_status="IMPLEMENTED", + priority="CRITICAL", + complexity="HIGH", + ) + assert mc.control_id == "TOM.ACC.02" + assert mc.priority == "CRITICAL" + + +class TestTOMMeasureUpdate: + def test_partial(self): + mu = TOMMeasureUpdate(implementation_status="IMPLEMENTED") + data = mu.model_dump(exclude_unset=True) + assert data == {"implementation_status": "IMPLEMENTED"} + + def test_empty(self): + mu = TOMMeasureUpdate() + data = mu.model_dump(exclude_unset=True) + assert data == {} + + +class TestTOMStatsResponse: + def test_defaults(self): + stats = TOMStatsResponse() + assert stats.total == 0 + assert stats.by_status == {} + assert stats.overdue_review_count == 0 + + def test_full(self): + stats = TOMStatsResponse( + total=10, + by_status={"IMPLEMENTED": 5, "NOT_IMPLEMENTED": 3, "PARTIAL": 2}, + by_category={"GOVERNANCE": 4, "ACCESS_CONTROL": 6}, + overdue_review_count=2, + implemented=5, + partial=2, + not_implemented=3, + ) + assert stats.total == 10 + assert stats.implemented == 5 + + +class TestTOMMeasureResponse: + def test_from_dict(self): + resp = TOMMeasureResponse( + id="abc", + tenant_id=DEFAULT_TENANT, + control_id="TOM.GOV.01", + name="Test", + category="GOVERNANCE", + type="ORGANIZATIONAL", + ) + assert resp.id == "abc" + assert resp.linked_evidence == [] + + +# ============================================================================= +# DB Model Tests +# ============================================================================= + +class TestTOMModels: + def test_state_repr(self): + s = TOMStateDB() + s.tenant_id = "test" + s.version = 3 + assert "test" in repr(s) + assert "v3" in repr(s) + + def test_measure_repr(self): + m = TOMMeasureDB() + m.control_id = "TOM.ACC.01" + m.name = "Zugriffskontrolle" + assert "TOM.ACC.01" in repr(m) + + +# ============================================================================= +# Helper Function Tests +# ============================================================================= + +class TestParseDt: + def test_none(self): + assert _parse_dt(None) is None + + def test_empty_string(self): + assert _parse_dt("") is None + + def test_iso_format(self): + dt = _parse_dt("2024-01-15T10:30:00+00:00") + assert dt is not None + assert dt.year == 2024 + assert dt.month == 1 + + def test_iso_with_z(self): + dt = _parse_dt("2024-06-15T12:00:00Z") + assert dt is not None + assert dt.year == 2024 + + def test_invalid_string(self): + assert _parse_dt("not-a-date") is None + + +class TestMeasureToDict: + def test_full_conversion(self): + m = _make_measure_row() + d = _measure_to_dict(m) + assert d["id"] == MEASURE_ID + assert d["control_id"] == "TOM.GOV.01" + assert d["name"] == "Datenschutzrichtlinie" + assert d["category"] == "GOVERNANCE" + assert d["type"] == "ORGANIZATIONAL" + assert d["linked_evidence"] == [] + assert d["related_controls"] == {} + assert d["created_at"] is not None + + def test_with_dates(self): + m = _make_measure_row( + implementation_date=datetime(2024, 3, 1, tzinfo=timezone.utc), + review_date=datetime(2025, 3, 1, tzinfo=timezone.utc), + ) + d = _measure_to_dict(m) + assert "2024-03-01" in d["implementation_date"] + assert "2025-03-01" in d["review_date"] + + def test_null_dates(self): + m = _make_measure_row() + d = _measure_to_dict(m) + assert d["implementation_date"] is None + assert d["review_date"] is None + assert d["verified_at"] is None + + +# ============================================================================= +# Route Tests (with mocked DB) +# ============================================================================= + +from classroom_engine.database import get_db + +def override_get_db(mock_db): + def _override(): + return mock_db + return _override + + +class TestStateRoutes: + def test_get_state_new_tenant(self): + db = _make_mock_db() + db.filter.return_value.first.return_value = None + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/state?tenant_id=new-tenant") + assert resp.status_code == 200 + data = resp.json() + assert data["success"] is True + assert data["data"]["isNew"] is True + assert data["data"]["version"] == 0 + + app.dependency_overrides.clear() + + def test_get_state_existing(self): + db = _make_mock_db() + row = _make_state_row(state={"steps": [1, 2, 3]}) + db.filter.return_value.first.return_value = row + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get(f"/tom/state?tenant_id={DEFAULT_TENANT}") + assert resp.status_code == 200 + data = resp.json() + assert data["success"] is True + assert data["data"]["version"] == 1 + assert data["data"]["state"]["steps"] == [1, 2, 3] + + app.dependency_overrides.clear() + + def test_post_state_new(self): + db = _make_mock_db() + db.filter.return_value.first.return_value = None + + def mock_refresh(obj): + obj.version = 1 + obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc) + obj.state = {"test": True} + + db.refresh = mock_refresh + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/state", json={ + "tenant_id": DEFAULT_TENANT, + "state": {"test": True}, + }) + assert resp.status_code == 200 + data = resp.json() + assert data["success"] is True + db.add.assert_called_once() + + app.dependency_overrides.clear() + + def test_post_state_version_conflict(self): + db = _make_mock_db() + row = _make_state_row(version=5) + db.filter.return_value.first.return_value = row + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/state", json={ + "tenant_id": DEFAULT_TENANT, + "state": {"test": True}, + "version": 3, # Expected 3, actual 5 + }) + assert resp.status_code == 409 + + app.dependency_overrides.clear() + + def test_post_state_update_existing(self): + db = _make_mock_db() + row = _make_state_row(version=2) + db.filter.return_value.first.return_value = row + + def mock_refresh(obj): + pass # row already has attributes + + db.refresh = mock_refresh + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/state", json={ + "tenant_id": DEFAULT_TENANT, + "state": {"new": "data"}, + "version": 2, + }) + assert resp.status_code == 200 + assert row.version == 3 + assert row.state == {"new": "data"} + + app.dependency_overrides.clear() + + def test_delete_state(self): + db = _make_mock_db() + row = _make_state_row() + db.filter.return_value.first.return_value = row + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.delete(f"/tom/state?tenant_id={DEFAULT_TENANT}") + assert resp.status_code == 200 + data = resp.json() + assert data["deleted"] is True + db.delete.assert_called_once() + + app.dependency_overrides.clear() + + def test_delete_state_not_found(self): + db = _make_mock_db() + db.filter.return_value.first.return_value = None + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.delete(f"/tom/state?tenant_id={DEFAULT_TENANT}") + assert resp.status_code == 200 + data = resp.json() + assert data["deleted"] is False + + app.dependency_overrides.clear() + + def test_delete_state_missing_tenant(self): + db = _make_mock_db() + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.delete("/tom/state") + assert resp.status_code == 400 + + app.dependency_overrides.clear() + + +class TestMeasureRoutes: + def test_list_measures_empty(self): + db = _make_mock_db() + db.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = [] + db.filter.return_value.count.return_value = 0 + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/measures") + assert resp.status_code == 200 + data = resp.json() + assert data["measures"] == [] + assert data["total"] == 0 + + app.dependency_overrides.clear() + + def test_list_measures_with_data(self): + db = _make_mock_db() + measures = [_make_measure_row("TOM.GOV.01"), _make_measure_row("TOM.ACC.01", name="Zugriff")] + db.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = measures + db.filter.return_value.count.return_value = 2 + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/measures") + assert resp.status_code == 200 + data = resp.json() + assert len(data["measures"]) == 2 + assert data["total"] == 2 + + app.dependency_overrides.clear() + + def test_create_measure(self): + db = _make_mock_db() + # No existing measure with same control_id + db.filter.return_value.first.return_value = None + + def mock_refresh(obj): + obj.id = uuid.UUID(MEASURE_ID) + obj.created_at = datetime(2024, 1, 1, tzinfo=timezone.utc) + obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc) + + db.refresh = mock_refresh + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/measures", json={ + "control_id": "TOM.GOV.01", + "name": "Datenschutzrichtlinie", + "category": "GOVERNANCE", + "type": "ORGANIZATIONAL", + }) + assert resp.status_code == 201 + data = resp.json() + assert data["control_id"] == "TOM.GOV.01" + db.add.assert_called_once() + + app.dependency_overrides.clear() + + def test_create_measure_duplicate(self): + db = _make_mock_db() + db.filter.return_value.first.return_value = _make_measure_row() + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/measures", json={ + "control_id": "TOM.GOV.01", + "name": "Duplicate", + "category": "GOVERNANCE", + "type": "ORGANIZATIONAL", + }) + assert resp.status_code == 409 + + app.dependency_overrides.clear() + + def test_update_measure(self): + db = _make_mock_db() + row = _make_measure_row() + db.filter.return_value.first.return_value = row + + def mock_refresh(obj): + pass + + db.refresh = mock_refresh + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.put(f"/tom/measures/{MEASURE_ID}", json={ + "implementation_status": "IMPLEMENTED", + "responsible_person": "Max Mustermann", + }) + assert resp.status_code == 200 + assert row.implementation_status == "IMPLEMENTED" + assert row.responsible_person == "Max Mustermann" + + app.dependency_overrides.clear() + + def test_update_measure_not_found(self): + db = _make_mock_db() + db.filter.return_value.first.return_value = None + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.put(f"/tom/measures/{UNKNOWN_ID}", json={ + "implementation_status": "IMPLEMENTED", + }) + assert resp.status_code == 404 + + app.dependency_overrides.clear() + + def test_bulk_upsert_create(self): + db = _make_mock_db() + # No existing measures + db.filter.return_value.first.return_value = None + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/measures/bulk", json={ + "tenant_id": DEFAULT_TENANT, + "measures": [ + { + "control_id": "TOM.GOV.01", + "name": "Datenschutzrichtlinie", + "category": "GOVERNANCE", + "type": "ORGANIZATIONAL", + }, + { + "control_id": "TOM.ACC.01", + "name": "Zugriffskontrolle", + "category": "ACCESS_CONTROL", + "type": "TECHNICAL", + }, + ], + }) + assert resp.status_code == 200 + data = resp.json() + assert data["success"] is True + assert data["created"] == 2 + assert data["updated"] == 0 + assert data["total"] == 2 + + app.dependency_overrides.clear() + + def test_bulk_upsert_update(self): + db = _make_mock_db() + existing = _make_measure_row("TOM.GOV.01") + db.filter.return_value.first.return_value = existing + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/measures/bulk", json={ + "measures": [ + { + "control_id": "TOM.GOV.01", + "name": "Updated Name", + "category": "GOVERNANCE", + "type": "ORGANIZATIONAL", + }, + ], + }) + assert resp.status_code == 200 + data = resp.json() + assert data["updated"] == 1 + assert data["created"] == 0 + assert existing.name == "Updated Name" + + app.dependency_overrides.clear() + + +class TestStatsRoute: + def test_stats_empty(self): + db = _make_mock_db() + db.filter.return_value.count.return_value = 0 + db.filter.return_value.group_by.return_value.all.return_value = [] + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/stats") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["by_status"] == {} + assert data["by_category"] == {} + + app.dependency_overrides.clear() + + def test_stats_with_data(self): + db = _make_mock_db() + + # Total count + base_q = MagicMock() + base_q.count.return_value = 10 + base_q.filter.return_value.count.return_value = 2 # overdue + + # Status group_by + status_q = MagicMock() + status_q.all.return_value = [("IMPLEMENTED", 5), ("NOT_IMPLEMENTED", 3), ("PARTIAL", 2)] + + # Category group_by + cat_q = MagicMock() + cat_q.all.return_value = [("GOVERNANCE", 4), ("ACCESS_CONTROL", 6)] + + call_count = [0] + original_filter = db.query.return_value.filter + + def mock_filter(*args): + call_count[0] += 1 + if call_count[0] == 1: + return base_q + elif call_count[0] == 2: + mock_gby = MagicMock() + mock_gby.all.return_value = [("IMPLEMENTED", 5), ("NOT_IMPLEMENTED", 3), ("PARTIAL", 2)] + result = MagicMock() + result.group_by.return_value = mock_gby + return result + elif call_count[0] == 3: + mock_gby = MagicMock() + mock_gby.all.return_value = [("GOVERNANCE", 4), ("ACCESS_CONTROL", 6)] + result = MagicMock() + result.group_by.return_value = mock_gby + return result + return MagicMock() + + db.query.return_value.filter = mock_filter + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/stats") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 10 + + app.dependency_overrides.clear() + + +class TestExportRoute: + def test_export_json(self): + db = _make_mock_db() + measures = [_make_measure_row("TOM.GOV.01")] + db.filter.return_value.order_by.return_value.all.return_value = measures + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/export?format=json") + assert resp.status_code == 200 + assert "application/json" in resp.headers.get("content-type", "") + data = resp.json() + assert len(data) == 1 + assert data[0]["control_id"] == "TOM.GOV.01" + + app.dependency_overrides.clear() + + def test_export_csv(self): + db = _make_mock_db() + measures = [_make_measure_row("TOM.GOV.01")] + db.filter.return_value.order_by.return_value.all.return_value = measures + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/export?format=csv") + assert resp.status_code == 200 + assert "text/csv" in resp.headers.get("content-type", "") + content = resp.text + assert "control_id" in content # Header + assert "TOM.GOV.01" in content + + app.dependency_overrides.clear() + + def test_export_csv_empty(self): + db = _make_mock_db() + db.filter.return_value.order_by.return_value.all.return_value = [] + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get("/tom/export?format=csv") + assert resp.status_code == 200 + content = resp.text + assert "control_id" in content # Header still present + + app.dependency_overrides.clear() + + +# ============================================================================= +# camelCase tenantId alias tests +# ============================================================================= + +class TestTenantIdAlias: + def test_get_state_camelcase(self): + db = _make_mock_db() + db.filter.return_value.first.return_value = None + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.get(f"/tom/state?tenantId={DEFAULT_TENANT}") + assert resp.status_code == 200 + data = resp.json() + assert data["data"]["tenantId"] == DEFAULT_TENANT + + app.dependency_overrides.clear() + + def test_post_state_camelcase(self): + db = _make_mock_db() + db.filter.return_value.first.return_value = None + + def mock_refresh(obj): + obj.version = 1 + obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc) + obj.state = {} + + db.refresh = mock_refresh + app.dependency_overrides[get_db] = override_get_db(db) + client = TestClient(app) + + resp = client.post("/tom/state", json={ + "tenantId": DEFAULT_TENANT, + "state": {}, + }) + assert resp.status_code == 200 + + app.dependency_overrides.clear()