2 Commits

Author SHA1 Message Date
Benjamin Admin
3593a4ff78 feat(tom): TOM-Backend in Python erstellen, Frontend von In-Memory auf DB migrieren
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 33s
CI / test-python-backend-compliance (push) Successful in 31s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 15s
- Migration 034: compliance_tom_state + compliance_tom_measures Tabellen
- Python Routes: State CRUD, Measures CRUD, Bulk-Upsert, Stats, CSV/JSON-Export
- Frontend-Proxy: In-Memory Storage durch Proxy zu backend-compliance ersetzt
- Go TOM-Handler als DEPRECATED markiert (Source of Truth ist jetzt Python)
- 44 Tests (alle bestanden)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 17:35:44 +01:00
Benjamin Admin
4cbfea5c1d feat(vvt): Go-Features nach Python portieren (Source of Truth)
Review-Daten (last_reviewed_at, next_review_at), created_by, DSFA-Link,
CSV-Export mit Semikolon-Trennung, overdue_review_count in Stats.
Go-VVT-Handler als DEPRECATED markiert. 32 Tests bestanden.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 17:14:38 +01:00
13 changed files with 1875 additions and 172 deletions

View File

@@ -1,119 +1,30 @@
import { NextRequest, NextResponse } from 'next/server'
import {
TOMGeneratorState,
createEmptyTOMGeneratorState,
} from '@/lib/sdk/tom-generator/types'
/**
* TOM Generator State API
* TOM Generator State API — Proxy to backend-compliance (Python/FastAPI)
*
* GET /api/sdk/v1/tom-generator/state?tenantId=xxx - Load TOM generator state
* POST /api/sdk/v1/tom-generator/state - Save TOM generator state
* DELETE /api/sdk/v1/tom-generator/state?tenantId=xxx - Clear state
*/
// =============================================================================
// STORAGE (In-Memory for development)
// =============================================================================
interface StoredTOMState {
state: TOMGeneratorState
version: number
createdAt: string
updatedAt: string
}
class InMemoryTOMStateStore {
private store: Map<string, StoredTOMState> = new Map()
async get(tenantId: string): Promise<StoredTOMState | null> {
return this.store.get(tenantId) || null
}
async save(tenantId: string, state: TOMGeneratorState, expectedVersion?: number): Promise<StoredTOMState> {
const existing = this.store.get(tenantId)
if (expectedVersion !== undefined && existing && existing.version !== expectedVersion) {
const error = new Error('Version conflict') as Error & { status: number }
error.status = 409
throw error
}
const now = new Date().toISOString()
const newVersion = existing ? existing.version + 1 : 1
const stored: StoredTOMState = {
state: {
...state,
updatedAt: new Date(now),
},
version: newVersion,
createdAt: existing?.createdAt || now,
updatedAt: now,
}
this.store.set(tenantId, stored)
return stored
}
async delete(tenantId: string): Promise<boolean> {
return this.store.delete(tenantId)
}
async list(): Promise<{ tenantId: string; updatedAt: string }[]> {
const result: { tenantId: string; updatedAt: string }[] = []
this.store.forEach((value, key) => {
result.push({ tenantId: key, updatedAt: value.updatedAt })
})
return result
}
}
const stateStore = new InMemoryTOMStateStore()
// =============================================================================
// HANDLERS
// =============================================================================
const BACKEND_URL = process.env.COMPLIANCE_BACKEND_URL || 'http://backend-compliance:8002'
export async function GET(request: NextRequest) {
try {
const { searchParams } = new URL(request.url)
const tenantId = searchParams.get('tenantId')
// List all states if no tenantId provided
if (!tenantId) {
const states = await stateStore.list()
return NextResponse.json({
success: true,
data: states,
})
}
const url = tenantId
? `${BACKEND_URL}/api/compliance/tom/state?tenant_id=${encodeURIComponent(tenantId)}`
: `${BACKEND_URL}/api/compliance/tom/state`
const stored = await stateStore.get(tenantId)
if (!stored) {
// Return empty state for new tenants
const emptyState = createEmptyTOMGeneratorState(tenantId)
return NextResponse.json({
success: true,
data: {
tenantId,
state: emptyState,
version: 0,
isNew: true,
},
})
}
return NextResponse.json({
success: true,
data: {
tenantId,
state: stored.state,
version: stored.version,
lastModified: stored.updatedAt,
},
const res = await fetch(url, {
headers: { 'Content-Type': 'application/json' },
})
const data = await res.json()
return NextResponse.json(data, { status: res.status })
} catch (error) {
console.error('Failed to load TOM generator state:', error)
return NextResponse.json(
@@ -142,65 +53,19 @@ export async function POST(request: NextRequest) {
)
}
// Deserialize dates
const parsedState: TOMGeneratorState = {
...state,
createdAt: new Date(state.createdAt),
updatedAt: new Date(state.updatedAt),
steps: state.steps.map((step: { id: string; completed: boolean; data: unknown; validatedAt: string | null }) => ({
...step,
validatedAt: step.validatedAt ? new Date(step.validatedAt) : null,
})),
documents: state.documents?.map((doc: { uploadedAt: string; validFrom?: string; validUntil?: string; aiAnalysis?: { analyzedAt: string } }) => ({
...doc,
uploadedAt: new Date(doc.uploadedAt),
validFrom: doc.validFrom ? new Date(doc.validFrom) : null,
validUntil: doc.validUntil ? new Date(doc.validUntil) : null,
aiAnalysis: doc.aiAnalysis ? {
...doc.aiAnalysis,
analyzedAt: new Date(doc.aiAnalysis.analyzedAt),
} : null,
})) || [],
derivedTOMs: state.derivedTOMs?.map((tom: { implementationDate?: string; reviewDate?: string }) => ({
...tom,
implementationDate: tom.implementationDate ? new Date(tom.implementationDate) : null,
reviewDate: tom.reviewDate ? new Date(tom.reviewDate) : null,
})) || [],
gapAnalysis: state.gapAnalysis ? {
...state.gapAnalysis,
generatedAt: new Date(state.gapAnalysis.generatedAt),
} : null,
exports: state.exports?.map((exp: { generatedAt: string }) => ({
...exp,
generatedAt: new Date(exp.generatedAt),
})) || [],
}
const stored = await stateStore.save(tenantId, parsedState, version)
return NextResponse.json({
success: true,
data: {
tenantId,
state: stored.state,
version: stored.version,
lastModified: stored.updatedAt,
},
const res = await fetch(`${BACKEND_URL}/api/compliance/tom/state`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tenant_id: tenantId,
state,
version,
}),
})
const data = await res.json()
return NextResponse.json(data, { status: res.status })
} catch (error) {
const err = error as Error & { status?: number }
if (err.status === 409 || err.message === 'Version conflict') {
return NextResponse.json(
{
success: false,
error: 'Version conflict. State was modified by another request.',
code: 'VERSION_CONFLICT',
},
{ status: 409 }
)
}
console.error('Failed to save TOM generator state:', error)
return NextResponse.json(
{ success: false, error: 'Failed to save state' },
@@ -221,14 +86,16 @@ export async function DELETE(request: NextRequest) {
)
}
const deleted = await stateStore.delete(tenantId)
const res = await fetch(
`${BACKEND_URL}/api/compliance/tom/state?tenant_id=${encodeURIComponent(tenantId)}`,
{
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
}
)
return NextResponse.json({
success: true,
tenantId,
deleted,
deletedAt: new Date().toISOString(),
})
const data = await res.json()
return NextResponse.json(data, { status: res.status })
} catch (error) {
console.error('Failed to delete TOM generator state:', error)
return NextResponse.json(

View File

@@ -251,6 +251,7 @@ func main() {
// Statistics
dsgvoRoutes.GET("/stats", dsgvoHandlers.GetStats)
// DEPRECATED: VVT routes - frontend uses backend-compliance proxy instead
// VVT - Verarbeitungsverzeichnis (Art. 30)
vvt := dsgvoRoutes.Group("/processing-activities")
{
@@ -262,6 +263,8 @@ func main() {
}
// TOM - Technische und Organisatorische Maßnahmen (Art. 32)
// DEPRECATED: TOM is now managed by backend-compliance (Python).
// Use: GET/POST /api/compliance/tom/state, /tom/measures, /tom/stats, /tom/export
tom := dsgvoRoutes.Group("/tom")
{
tom.GET("", dsgvoHandlers.ListTOMs)
@@ -299,8 +302,8 @@ func main() {
// Export routes
exports := dsgvoRoutes.Group("/export")
{
exports.GET("/vvt", dsgvoHandlers.ExportVVT)
exports.GET("/tom", dsgvoHandlers.ExportTOM)
exports.GET("/vvt", dsgvoHandlers.ExportVVT) // DEPRECATED: use backend-compliance /vvt/export?format=csv
exports.GET("/tom", dsgvoHandlers.ExportTOM) // DEPRECATED: use backend-compliance /tom/export?format=csv
exports.GET("/dsr", dsgvoHandlers.ExportDSR)
exports.GET("/retention", dsgvoHandlers.ExportRetentionPolicies)
}

View File

@@ -24,6 +24,8 @@ func NewDSGVOHandlers(store *dsgvo.Store) *DSGVOHandlers {
// ============================================================================
// VVT - Verarbeitungsverzeichnis (Processing Activities)
// DEPRECATED: VVT is now managed by backend-compliance (Python).
// These handlers will be removed once all DSGVO sub-modules are consolidated.
// ============================================================================
// ListProcessingActivities returns all processing activities for a tenant
@@ -142,8 +144,12 @@ func (h *DSGVOHandlers) DeleteProcessingActivity(c *gin.Context) {
// ============================================================================
// TOM - Technische und Organisatorische Maßnahmen
// ============================================================================
// DEPRECATED: TOM is now managed by backend-compliance (Python).
// These handlers remain for backwards compatibility but should not be used.
// Use backend-compliance endpoints: GET/POST /api/compliance/tom/...
// ListTOMs returns all TOMs for a tenant
// DEPRECATED: Use backend-compliance GET /api/compliance/tom/measures
func (h *DSGVOHandlers) ListTOMs(c *gin.Context) {
tenantID := rbac.GetTenantID(c)
if tenantID == uuid.Nil {
@@ -551,6 +557,7 @@ func (h *DSGVOHandlers) ExportVVT(c *gin.Context) {
}
// ExportTOM exports the TOM catalog as CSV/JSON
// DEPRECATED: Use backend-compliance GET /api/compliance/tom/export
func (h *DSGVOHandlers) ExportTOM(c *gin.Context) {
tenantID := rbac.GetTenantID(c)
if tenantID == uuid.Nil {

View File

@@ -26,6 +26,7 @@ from .dsr_routes import router as dsr_router
from .email_template_routes import router as email_template_router
from .banner_routes import router as banner_router
from .extraction_routes import router as extraction_router
from .tom_routes import router as tom_router
# Include sub-routers
router.include_router(audit_router)
@@ -53,6 +54,7 @@ router.include_router(dsr_router)
router.include_router(email_template_router)
router.include_router(banner_router)
router.include_router(extraction_router)
router.include_router(tom_router)
__all__ = [
"router",
@@ -80,4 +82,5 @@ __all__ = [
"dsr_router",
"email_template_router",
"banner_router",
"tom_router",
]

View File

@@ -1910,6 +1910,10 @@ class VVTActivityCreate(BaseModel):
status: str = 'DRAFT'
responsible: Optional[str] = None
owner: Optional[str] = None
last_reviewed_at: Optional[datetime] = None
next_review_at: Optional[datetime] = None
created_by: Optional[str] = None
dsfa_id: Optional[str] = None
class VVTActivityUpdate(BaseModel):
@@ -1934,6 +1938,10 @@ class VVTActivityUpdate(BaseModel):
status: Optional[str] = None
responsible: Optional[str] = None
owner: Optional[str] = None
last_reviewed_at: Optional[datetime] = None
next_review_at: Optional[datetime] = None
created_by: Optional[str] = None
dsfa_id: Optional[str] = None
class VVTActivityResponse(BaseModel):
@@ -1960,6 +1968,10 @@ class VVTActivityResponse(BaseModel):
status: str = 'DRAFT'
responsible: Optional[str] = None
owner: Optional[str] = None
last_reviewed_at: Optional[datetime] = None
next_review_at: Optional[datetime] = None
created_by: Optional[str] = None
dsfa_id: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None
@@ -1975,6 +1987,7 @@ class VVTStatsResponse(BaseModel):
third_country_count: int
draft_count: int
approved_count: int
overdue_review_count: int = 0
class VVTAuditLogEntry(BaseModel):
@@ -1989,3 +2002,57 @@ class VVTAuditLogEntry(BaseModel):
class Config:
from_attributes = True
# ============================================================================
# TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO)
# ============================================================================
class TOMStateResponse(BaseModel):
tenant_id: str
state: Dict[str, Any] = {}
version: int = 0
last_modified: Optional[datetime] = None
is_new: bool = False
class TOMMeasureResponse(BaseModel):
id: str
tenant_id: str
control_id: str
name: str
description: Optional[str] = None
category: str
type: str
applicability: str = "REQUIRED"
applicability_reason: Optional[str] = None
implementation_status: str = "NOT_IMPLEMENTED"
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[datetime] = None
review_date: Optional[datetime] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: List[Any] = []
evidence_gaps: List[Any] = []
related_controls: Dict[str, Any] = {}
verified_at: Optional[datetime] = None
verified_by: Optional[str] = None
effectiveness_rating: Optional[str] = None
created_by: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class TOMStatsResponse(BaseModel):
total: int = 0
by_status: Dict[str, int] = {}
by_category: Dict[str, int] = {}
overdue_review_count: int = 0
implemented: int = 0
partial: int = 0
not_implemented: int = 0

View File

@@ -0,0 +1,575 @@
"""
FastAPI routes for TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO).
Endpoints:
GET /tom/state — Load TOM generator state for tenant
POST /tom/state — Save state (with version check)
DELETE /tom/state — Reset/clear state for tenant
GET /tom/measures — List measures (filter: category, status, tenant_id)
POST /tom/measures — Create single measure
PUT /tom/measures/{id} — Update measure
POST /tom/measures/bulk — Bulk upsert (for deriveTOMs sync)
GET /tom/stats — Statistics
GET /tom/export — Export as CSV or JSON
"""
import csv
import io
import json
import logging
from datetime import datetime, timezone
from typing import Optional, List, Any, Dict
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from sqlalchemy import func
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db.tom_models import TOMStateDB, TOMMeasureDB
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tom", tags=["tom"])
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# =============================================================================
# Pydantic Schemas (kept close to routes like loeschfristen pattern)
# =============================================================================
class TOMStateBody(BaseModel):
tenant_id: Optional[str] = None
tenantId: Optional[str] = None # Accept camelCase from frontend
state: Dict[str, Any]
version: Optional[int] = None
def get_tenant_id(self) -> str:
return self.tenant_id or self.tenantId or DEFAULT_TENANT_ID
class TOMMeasureCreate(BaseModel):
control_id: str
name: str
description: Optional[str] = None
category: str
type: str
applicability: str = "REQUIRED"
applicability_reason: Optional[str] = None
implementation_status: str = "NOT_IMPLEMENTED"
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[str] = None
review_date: Optional[str] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: Optional[List[Any]] = None
evidence_gaps: Optional[List[Any]] = None
related_controls: Optional[Dict[str, Any]] = None
verified_at: Optional[str] = None
verified_by: Optional[str] = None
effectiveness_rating: Optional[str] = None
class TOMMeasureUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
type: Optional[str] = None
applicability: Optional[str] = None
applicability_reason: Optional[str] = None
implementation_status: Optional[str] = None
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[str] = None
review_date: Optional[str] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: Optional[List[Any]] = None
evidence_gaps: Optional[List[Any]] = None
related_controls: Optional[Dict[str, Any]] = None
verified_at: Optional[str] = None
verified_by: Optional[str] = None
effectiveness_rating: Optional[str] = None
class TOMMeasureBulkItem(BaseModel):
control_id: str
name: str
description: Optional[str] = None
category: str
type: str
applicability: str = "REQUIRED"
applicability_reason: Optional[str] = None
implementation_status: str = "NOT_IMPLEMENTED"
responsible_person: Optional[str] = None
responsible_department: Optional[str] = None
implementation_date: Optional[str] = None
review_date: Optional[str] = None
review_frequency: Optional[str] = None
priority: Optional[str] = None
complexity: Optional[str] = None
linked_evidence: Optional[List[Any]] = None
evidence_gaps: Optional[List[Any]] = None
related_controls: Optional[Dict[str, Any]] = None
class TOMMeasureBulkBody(BaseModel):
tenant_id: Optional[str] = None
measures: List[TOMMeasureBulkItem]
# =============================================================================
# Helper: parse optional datetime strings
# =============================================================================
def _parse_dt(val: Optional[str]) -> Optional[datetime]:
if not val:
return None
try:
return datetime.fromisoformat(val.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
def _measure_to_dict(m: TOMMeasureDB) -> dict:
return {
"id": str(m.id),
"tenant_id": m.tenant_id,
"control_id": m.control_id,
"name": m.name,
"description": m.description,
"category": m.category,
"type": m.type,
"applicability": m.applicability,
"applicability_reason": m.applicability_reason,
"implementation_status": m.implementation_status,
"responsible_person": m.responsible_person,
"responsible_department": m.responsible_department,
"implementation_date": m.implementation_date.isoformat() if m.implementation_date else None,
"review_date": m.review_date.isoformat() if m.review_date else None,
"review_frequency": m.review_frequency,
"priority": m.priority,
"complexity": m.complexity,
"linked_evidence": m.linked_evidence or [],
"evidence_gaps": m.evidence_gaps or [],
"related_controls": m.related_controls or {},
"verified_at": m.verified_at.isoformat() if m.verified_at else None,
"verified_by": m.verified_by,
"effectiveness_rating": m.effectiveness_rating,
"created_by": m.created_by,
"created_at": m.created_at.isoformat() if m.created_at else None,
"updated_at": m.updated_at.isoformat() if m.updated_at else None,
}
# =============================================================================
# STATE ENDPOINTS
# =============================================================================
@router.get("/state")
async def get_tom_state(
tenant_id: Optional[str] = Query(None, alias="tenant_id"),
tenantId: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Load TOM generator state for a tenant."""
tid = tenant_id or tenantId or DEFAULT_TENANT_ID
row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first()
if not row:
return {
"success": True,
"data": {
"tenantId": tid,
"state": {},
"version": 0,
"isNew": True,
},
}
return {
"success": True,
"data": {
"tenantId": tid,
"state": row.state,
"version": row.version,
"lastModified": row.updated_at.isoformat() if row.updated_at else None,
},
}
@router.post("/state")
async def save_tom_state(body: TOMStateBody, db: Session = Depends(get_db)):
"""Save TOM generator state with optimistic locking (version check)."""
tid = body.get_tenant_id()
existing = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first()
# Version conflict check
if body.version is not None and existing and existing.version != body.version:
raise HTTPException(
status_code=409,
detail={
"success": False,
"error": "Version conflict. State was modified by another request.",
"code": "VERSION_CONFLICT",
},
)
now = datetime.now(timezone.utc)
if existing:
existing.state = body.state
existing.version = existing.version + 1
existing.updated_at = now
else:
existing = TOMStateDB(
tenant_id=tid,
state=body.state,
version=1,
created_at=now,
updated_at=now,
)
db.add(existing)
db.commit()
db.refresh(existing)
return {
"success": True,
"data": {
"tenantId": tid,
"state": existing.state,
"version": existing.version,
"lastModified": existing.updated_at.isoformat() if existing.updated_at else None,
},
}
@router.delete("/state")
async def delete_tom_state(
tenant_id: Optional[str] = Query(None, alias="tenant_id"),
tenantId: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Clear TOM generator state for a tenant."""
tid = tenant_id or tenantId
if not tid:
raise HTTPException(status_code=400, detail="tenant_id is required")
row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first()
deleted = False
if row:
db.delete(row)
db.commit()
deleted = True
return {
"success": True,
"tenantId": tid,
"deleted": deleted,
"deletedAt": datetime.now(timezone.utc).isoformat(),
}
# =============================================================================
# MEASURES ENDPOINTS
# =============================================================================
@router.get("/measures")
async def list_measures(
tenant_id: Optional[str] = Query(None),
category: Optional[str] = Query(None),
implementation_status: Optional[str] = Query(None),
priority: Optional[str] = Query(None),
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
):
"""List TOM measures with optional filters."""
tid = tenant_id or DEFAULT_TENANT_ID
q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid)
if category:
q = q.filter(TOMMeasureDB.category == category)
if implementation_status:
q = q.filter(TOMMeasureDB.implementation_status == implementation_status)
if priority:
q = q.filter(TOMMeasureDB.priority == priority)
if search:
pattern = f"%{search}%"
q = q.filter(
(TOMMeasureDB.name.ilike(pattern))
| (TOMMeasureDB.description.ilike(pattern))
| (TOMMeasureDB.control_id.ilike(pattern))
)
total = q.count()
rows = q.order_by(TOMMeasureDB.control_id).offset(offset).limit(limit).all()
return {
"measures": [_measure_to_dict(r) for r in rows],
"total": total,
"limit": limit,
"offset": offset,
}
@router.post("/measures", status_code=201)
async def create_measure(
body: TOMMeasureCreate,
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Create a single TOM measure."""
tid = tenant_id or DEFAULT_TENANT_ID
# Check for duplicate control_id
existing = (
db.query(TOMMeasureDB)
.filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == body.control_id)
.first()
)
if existing:
raise HTTPException(status_code=409, detail=f"Measure with control_id '{body.control_id}' already exists")
now = datetime.now(timezone.utc)
measure = TOMMeasureDB(
tenant_id=tid,
control_id=body.control_id,
name=body.name,
description=body.description,
category=body.category,
type=body.type,
applicability=body.applicability,
applicability_reason=body.applicability_reason,
implementation_status=body.implementation_status,
responsible_person=body.responsible_person,
responsible_department=body.responsible_department,
implementation_date=_parse_dt(body.implementation_date),
review_date=_parse_dt(body.review_date),
review_frequency=body.review_frequency,
priority=body.priority,
complexity=body.complexity,
linked_evidence=body.linked_evidence or [],
evidence_gaps=body.evidence_gaps or [],
related_controls=body.related_controls or {},
verified_at=_parse_dt(body.verified_at),
verified_by=body.verified_by,
effectiveness_rating=body.effectiveness_rating,
created_at=now,
updated_at=now,
)
db.add(measure)
db.commit()
db.refresh(measure)
return _measure_to_dict(measure)
@router.put("/measures/{measure_id}")
async def update_measure(
measure_id: UUID,
body: TOMMeasureUpdate,
db: Session = Depends(get_db),
):
"""Update a TOM measure."""
row = db.query(TOMMeasureDB).filter(TOMMeasureDB.id == measure_id).first()
if not row:
raise HTTPException(status_code=404, detail="Measure not found")
update_data = body.model_dump(exclude_unset=True)
for key, val in update_data.items():
if key in ("implementation_date", "review_date", "verified_at"):
val = _parse_dt(val)
setattr(row, key, val)
row.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(row)
return _measure_to_dict(row)
@router.post("/measures/bulk")
async def bulk_upsert_measures(
body: TOMMeasureBulkBody,
db: Session = Depends(get_db),
):
"""Bulk upsert measures — used by deriveTOMs sync from frontend."""
tid = body.tenant_id or DEFAULT_TENANT_ID
now = datetime.now(timezone.utc)
created = 0
updated = 0
for item in body.measures:
existing = (
db.query(TOMMeasureDB)
.filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == item.control_id)
.first()
)
if existing:
existing.name = item.name
existing.description = item.description
existing.category = item.category
existing.type = item.type
existing.applicability = item.applicability
existing.applicability_reason = item.applicability_reason
existing.implementation_status = item.implementation_status
existing.responsible_person = item.responsible_person
existing.responsible_department = item.responsible_department
existing.implementation_date = _parse_dt(item.implementation_date)
existing.review_date = _parse_dt(item.review_date)
existing.review_frequency = item.review_frequency
existing.priority = item.priority
existing.complexity = item.complexity
existing.linked_evidence = item.linked_evidence or []
existing.evidence_gaps = item.evidence_gaps or []
existing.related_controls = item.related_controls or {}
existing.updated_at = now
updated += 1
else:
measure = TOMMeasureDB(
tenant_id=tid,
control_id=item.control_id,
name=item.name,
description=item.description,
category=item.category,
type=item.type,
applicability=item.applicability,
applicability_reason=item.applicability_reason,
implementation_status=item.implementation_status,
responsible_person=item.responsible_person,
responsible_department=item.responsible_department,
implementation_date=_parse_dt(item.implementation_date),
review_date=_parse_dt(item.review_date),
review_frequency=item.review_frequency,
priority=item.priority,
complexity=item.complexity,
linked_evidence=item.linked_evidence or [],
evidence_gaps=item.evidence_gaps or [],
related_controls=item.related_controls or {},
created_at=now,
updated_at=now,
)
db.add(measure)
created += 1
db.commit()
return {
"success": True,
"tenant_id": tid,
"created": created,
"updated": updated,
"total": created + updated,
}
# =============================================================================
# STATS & EXPORT
# =============================================================================
@router.get("/stats")
async def get_tom_stats(
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Return TOM statistics for a tenant."""
tid = tenant_id or DEFAULT_TENANT_ID
base_q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid)
total = base_q.count()
# By status
status_rows = (
db.query(TOMMeasureDB.implementation_status, func.count(TOMMeasureDB.id))
.filter(TOMMeasureDB.tenant_id == tid)
.group_by(TOMMeasureDB.implementation_status)
.all()
)
by_status = {row[0]: row[1] for row in status_rows}
# By category
cat_rows = (
db.query(TOMMeasureDB.category, func.count(TOMMeasureDB.id))
.filter(TOMMeasureDB.tenant_id == tid)
.group_by(TOMMeasureDB.category)
.all()
)
by_category = {row[0]: row[1] for row in cat_rows}
# Overdue reviews
now = datetime.now(timezone.utc)
overdue = (
base_q.filter(
TOMMeasureDB.review_date.isnot(None),
TOMMeasureDB.review_date < now,
)
.count()
)
return {
"total": total,
"by_status": by_status,
"by_category": by_category,
"overdue_review_count": overdue,
"implemented": by_status.get("IMPLEMENTED", 0),
"partial": by_status.get("PARTIAL", 0),
"not_implemented": by_status.get("NOT_IMPLEMENTED", 0),
}
@router.get("/export")
async def export_measures(
tenant_id: Optional[str] = Query(None),
format: str = Query("csv"),
db: Session = Depends(get_db),
):
"""Export TOM measures as CSV (semicolon-separated) or JSON."""
tid = tenant_id or DEFAULT_TENANT_ID
rows = (
db.query(TOMMeasureDB)
.filter(TOMMeasureDB.tenant_id == tid)
.order_by(TOMMeasureDB.control_id)
.all()
)
measures = [_measure_to_dict(r) for r in rows]
if format == "json":
return StreamingResponse(
io.BytesIO(json.dumps(measures, ensure_ascii=False, indent=2).encode("utf-8")),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=tom_export.json"},
)
# CSV (semicolon, like VVT)
output = io.StringIO()
fieldnames = [
"control_id", "name", "description", "category", "type",
"applicability", "implementation_status", "responsible_person",
"responsible_department", "implementation_date", "review_date",
"review_frequency", "priority", "complexity", "effectiveness_rating",
]
writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=";", extrasaction="ignore")
writer.writeheader()
for m in measures:
writer.writerow(m)
output.seek(0)
return StreamingResponse(
io.BytesIO(output.getvalue().encode("utf-8")),
media_type="text/csv; charset=utf-8",
headers={"Content-Disposition": "attachment; filename=tom_export.csv"},
)

View File

@@ -14,12 +14,15 @@ Endpoints:
GET /vvt/stats — Statistics
"""
import csv
import io
import logging
from datetime import datetime
from datetime import datetime, timezone
from typing import Optional, List
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
@@ -150,6 +153,10 @@ def _activity_to_response(act: VVTActivityDB) -> VVTActivityResponse:
status=act.status or 'DRAFT',
responsible=act.responsible,
owner=act.owner,
last_reviewed_at=act.last_reviewed_at,
next_review_at=act.next_review_at,
created_by=act.created_by,
dsfa_id=str(act.dsfa_id) if act.dsfa_id else None,
created_at=act.created_at,
updated_at=act.updated_at,
)
@@ -160,6 +167,7 @@ async def list_activities(
status: Optional[str] = Query(None),
business_function: Optional[str] = Query(None),
search: Optional[str] = Query(None),
review_overdue: Optional[bool] = Query(None),
db: Session = Depends(get_db),
):
"""List all processing activities with optional filters."""
@@ -169,6 +177,12 @@ async def list_activities(
query = query.filter(VVTActivityDB.status == status)
if business_function:
query = query.filter(VVTActivityDB.business_function == business_function)
if review_overdue:
now = datetime.now(timezone.utc)
query = query.filter(
VVTActivityDB.next_review_at.isnot(None),
VVTActivityDB.next_review_at < now,
)
if search:
term = f"%{search}%"
query = query.filter(
@@ -184,6 +198,7 @@ async def list_activities(
@router.post("/activities", response_model=VVTActivityResponse, status_code=201)
async def create_activity(
request: VVTActivityCreate,
http_request: Request,
db: Session = Depends(get_db),
):
"""Create a new processing activity."""
@@ -197,7 +212,12 @@ async def create_activity(
detail=f"Activity with VVT-ID '{request.vvt_id}' already exists"
)
act = VVTActivityDB(**request.dict())
data = request.dict()
# Set created_by from X-User-ID header if not provided in body
if not data.get('created_by'):
data['created_by'] = http_request.headers.get('X-User-ID', 'system')
act = VVTActivityDB(**data)
db.add(act)
db.flush() # get ID before audit log
@@ -312,8 +332,11 @@ async def get_audit_log(
# ============================================================================
@router.get("/export")
async def export_activities(db: Session = Depends(get_db)):
"""JSON export of all activities for external review / PDF generation."""
async def export_activities(
format: str = Query("json", pattern="^(json|csv)$"),
db: Session = Depends(get_db),
):
"""Export all activities as JSON or CSV (semicolon-separated, DE locale)."""
org = db.query(VVTOrganizationDB).order_by(VVTOrganizationDB.created_at).first()
activities = db.query(VVTActivityDB).order_by(VVTActivityDB.created_at).all()
@@ -321,10 +344,13 @@ async def export_activities(db: Session = Depends(get_db)):
db,
action="EXPORT",
entity_type="all_activities",
new_values={"count": len(activities)},
new_values={"count": len(activities), "format": format},
)
db.commit()
if format == "csv":
return _export_csv(activities)
return {
"exported_at": datetime.utcnow().isoformat(),
"organization": {
@@ -351,6 +377,10 @@ async def export_activities(db: Session = Depends(get_db)):
"protection_level": a.protection_level,
"business_function": a.business_function,
"responsible": a.responsible,
"created_by": a.created_by,
"dsfa_id": str(a.dsfa_id) if a.dsfa_id else None,
"last_reviewed_at": a.last_reviewed_at.isoformat() if a.last_reviewed_at else None,
"next_review_at": a.next_review_at.isoformat() if a.next_review_at else None,
"created_at": a.created_at.isoformat(),
"updated_at": a.updated_at.isoformat() if a.updated_at else None,
}
@@ -359,6 +389,48 @@ async def export_activities(db: Session = Depends(get_db)):
}
def _export_csv(activities: list) -> StreamingResponse:
"""Generate semicolon-separated CSV with UTF-8 BOM for German Excel compatibility."""
output = io.StringIO()
# UTF-8 BOM for Excel
output.write('\ufeff')
writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL)
writer.writerow([
'ID', 'VVT-ID', 'Name', 'Zweck', 'Rechtsgrundlage',
'Datenkategorien', 'Betroffene', 'Empfaenger', 'Drittland',
'Aufbewahrung', 'Status', 'Verantwortlich', 'Erstellt von',
'Erstellt am',
])
for a in activities:
writer.writerow([
str(a.id),
a.vvt_id,
a.name,
'; '.join(a.purposes or []),
'; '.join(a.legal_bases or []),
'; '.join(a.personal_data_categories or []),
'; '.join(a.data_subject_categories or []),
'; '.join(a.recipient_categories or []),
'Ja' if a.third_country_transfers else 'Nein',
str(a.retention_period) if a.retention_period else '',
a.status or 'DRAFT',
a.responsible or '',
a.created_by or 'system',
a.created_at.strftime('%d.%m.%Y %H:%M') if a.created_at else '',
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type='text/csv; charset=utf-8',
headers={
'Content-Disposition': f'attachment; filename="vvt_export_{datetime.utcnow().strftime("%Y%m%d")}.csv"'
},
)
@router.get("/stats", response_model=VVTStatsResponse)
async def get_stats(db: Session = Depends(get_db)):
"""Get VVT statistics summary."""
@@ -366,12 +438,16 @@ async def get_stats(db: Session = Depends(get_db)):
by_status: dict = {}
by_bf: dict = {}
now = datetime.now(timezone.utc)
overdue_count = 0
for a in activities:
status = a.status or 'DRAFT'
bf = a.business_function or 'unknown'
by_status[status] = by_status.get(status, 0) + 1
by_bf[bf] = by_bf.get(bf, 0) + 1
if a.next_review_at and a.next_review_at < now:
overdue_count += 1
return VVTStatsResponse(
total=len(activities),
@@ -381,4 +457,5 @@ async def get_stats(db: Session = Depends(get_db)):
third_country_count=sum(1 for a in activities if a.third_country_transfers),
draft_count=by_status.get('DRAFT', 0),
approved_count=by_status.get('APPROVED', 0),
overdue_review_count=overdue_count,
)

View File

@@ -0,0 +1,79 @@
"""
SQLAlchemy models for TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO).
Tables:
- compliance_tom_state: Full TOM-Generator state per tenant (JSONB blob)
- compliance_tom_measures: Individual TOM measures (flat, queryable)
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Column, String, Text, Integer, DateTime, JSON, Index
)
from sqlalchemy.dialects.postgresql import UUID
from classroom_engine.database import Base
class TOMStateDB(Base):
"""Persists the entire TOM-Generator state per tenant."""
__tablename__ = 'compliance_tom_state'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False, unique=True)
state = Column(JSON, nullable=False, default=dict)
version = Column(Integer, nullable=False, default=1)
created_at = Column(DateTime(timezone=True), default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_tom_state_tenant', 'tenant_id'),
)
def __repr__(self):
return f"<TOMState tenant={self.tenant_id} v{self.version}>"
class TOMMeasureDB(Base):
"""Individual TOM measure — flat, queryable, for reports and export."""
__tablename__ = 'compliance_tom_measures'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False)
control_id = Column(String(50), nullable=False)
name = Column(String(300), nullable=False)
description = Column(Text)
category = Column(String(50), nullable=False)
type = Column(String(20), nullable=False)
applicability = Column(String(20), default='REQUIRED')
applicability_reason = Column(Text)
implementation_status = Column(String(20), default='NOT_IMPLEMENTED')
responsible_person = Column(String(255))
responsible_department = Column(String(255))
implementation_date = Column(DateTime(timezone=True))
review_date = Column(DateTime(timezone=True))
review_frequency = Column(String(20))
priority = Column(String(20))
complexity = Column(String(20))
linked_evidence = Column(JSON, default=list)
evidence_gaps = Column(JSON, default=list)
related_controls = Column(JSON, default=dict)
verified_at = Column(DateTime(timezone=True))
verified_by = Column(String(200))
effectiveness_rating = Column(String(20))
created_by = Column(String(200), default='system')
created_at = Column(DateTime(timezone=True), default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_tom_measures_tenant', 'tenant_id'),
Index('idx_tom_measures_category', 'tenant_id', 'category'),
Index('idx_tom_measures_status', 'tenant_id', 'implementation_status'),
)
def __repr__(self):
return f"<TOMMeasure {self.control_id}: {self.name}>"

View File

@@ -73,6 +73,10 @@ class VVTActivityDB(Base):
status = Column(String(20), default='DRAFT')
responsible = Column(String(200))
owner = Column(String(200))
last_reviewed_at = Column(DateTime(timezone=True), nullable=True)
next_review_at = Column(DateTime(timezone=True), nullable=True)
created_by = Column(String(200), default='system')
dsfa_id = Column(UUID(as_uuid=True), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

View File

@@ -0,0 +1,14 @@
-- 033_vvt_consolidation.sql
-- Portiert Go-exklusive VVT-Features nach Python (Source of Truth)
-- P0: Review-Daten, created_by | P1: DSFA-Link
ALTER TABLE compliance_vvt_activities
ADD COLUMN IF NOT EXISTS last_reviewed_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS next_review_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS created_by VARCHAR(200) DEFAULT 'system',
ADD COLUMN IF NOT EXISTS dsfa_id UUID;
CREATE INDEX IF NOT EXISTS idx_vvt_activities_dsfa
ON compliance_vvt_activities(dsfa_id) WHERE dsfa_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_vvt_activities_next_review
ON compliance_vvt_activities(next_review_at) WHERE next_review_at IS NOT NULL;

View File

@@ -0,0 +1,61 @@
-- Migration 034: TOM (Technisch-Organisatorische Massnahmen, Art. 32 DSGVO)
--
-- Two tables:
-- 1. compliance_tom_state: Persists the full TOM-Generator state per tenant (replaces In-Memory)
-- 2. compliance_tom_measures: Individual TOM measures (flat, queryable, for reports/export)
BEGIN;
-- ---------------------------------------------------------------------------
-- 1. TOM Generator State (one JSONB blob per tenant)
-- ---------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS compliance_tom_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL,
state JSONB NOT NULL DEFAULT '{}',
version INT NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id)
);
CREATE INDEX IF NOT EXISTS idx_tom_state_tenant ON compliance_tom_state(tenant_id);
-- ---------------------------------------------------------------------------
-- 2. Individual TOM Measures (flat, queryable)
-- ---------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS compliance_tom_measures (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL,
control_id VARCHAR(50) NOT NULL,
name VARCHAR(300) NOT NULL,
description TEXT,
category VARCHAR(50) NOT NULL,
type VARCHAR(20) NOT NULL,
applicability VARCHAR(20) DEFAULT 'REQUIRED',
applicability_reason TEXT,
implementation_status VARCHAR(20) DEFAULT 'NOT_IMPLEMENTED',
responsible_person VARCHAR(255),
responsible_department VARCHAR(255),
implementation_date TIMESTAMPTZ,
review_date TIMESTAMPTZ,
review_frequency VARCHAR(20),
priority VARCHAR(20),
complexity VARCHAR(20),
linked_evidence JSONB DEFAULT '[]',
evidence_gaps JSONB DEFAULT '[]',
related_controls JSONB DEFAULT '{}',
verified_at TIMESTAMPTZ,
verified_by VARCHAR(200),
effectiveness_rating VARCHAR(20),
created_by VARCHAR(200) DEFAULT 'system',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id, control_id)
);
CREATE INDEX IF NOT EXISTS idx_tom_measures_tenant ON compliance_tom_measures(tenant_id);
CREATE INDEX IF NOT EXISTS idx_tom_measures_category ON compliance_tom_measures(tenant_id, category);
CREATE INDEX IF NOT EXISTS idx_tom_measures_status ON compliance_tom_measures(tenant_id, implementation_status);
COMMIT;

View File

@@ -0,0 +1,735 @@
"""Tests for TOM routes (tom_routes.py, tom_models.py)."""
import pytest
import uuid
from unittest.mock import MagicMock, patch, PropertyMock
from datetime import datetime, timezone
from fastapi.testclient import TestClient
from fastapi import FastAPI
from compliance.api.tom_routes import (
router,
TOMStateBody,
TOMMeasureCreate,
TOMMeasureUpdate,
TOMMeasureBulkBody,
TOMMeasureBulkItem,
_parse_dt,
_measure_to_dict,
DEFAULT_TENANT_ID,
)
from compliance.db.tom_models import TOMStateDB, TOMMeasureDB
from compliance.api.schemas import TOMStatsResponse, TOMMeasureResponse
# =============================================================================
# Test App Setup
# =============================================================================
app = FastAPI()
app.include_router(router)
DEFAULT_TENANT = DEFAULT_TENANT_ID
MEASURE_ID = "ffffffff-0001-0001-0001-000000000001"
UNKNOWN_ID = "aaaaaaaa-9999-9999-9999-999999999999"
# =============================================================================
# Helper: create mock DB session
# =============================================================================
def _make_mock_db():
db = MagicMock()
db.query.return_value = db
db.filter.return_value = db
db.first.return_value = None
db.count.return_value = 0
db.all.return_value = []
db.offset.return_value = db
db.limit.return_value = db
db.order_by.return_value = db
db.group_by.return_value = db
return db
def _make_state_row(tenant_id=DEFAULT_TENANT, version=1, state=None):
row = TOMStateDB()
row.id = uuid.uuid4()
row.tenant_id = tenant_id
row.state = state or {"steps": [], "derivedTOMs": []}
row.version = version
row.created_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
row.updated_at = datetime(2024, 1, 2, tzinfo=timezone.utc)
return row
def _make_measure_row(control_id="TOM.GOV.01", **kwargs):
m = TOMMeasureDB()
m.id = uuid.UUID(kwargs.get("id", MEASURE_ID))
m.tenant_id = kwargs.get("tenant_id", DEFAULT_TENANT)
m.control_id = control_id
m.name = kwargs.get("name", "Datenschutzrichtlinie")
m.description = kwargs.get("description", "Beschreibung")
m.category = kwargs.get("category", "GOVERNANCE")
m.type = kwargs.get("type", "ORGANIZATIONAL")
m.applicability = kwargs.get("applicability", "REQUIRED")
m.applicability_reason = kwargs.get("applicability_reason", None)
m.implementation_status = kwargs.get("implementation_status", "NOT_IMPLEMENTED")
m.responsible_person = kwargs.get("responsible_person", None)
m.responsible_department = kwargs.get("responsible_department", None)
m.implementation_date = kwargs.get("implementation_date", None)
m.review_date = kwargs.get("review_date", None)
m.review_frequency = kwargs.get("review_frequency", "ANNUAL")
m.priority = kwargs.get("priority", "HIGH")
m.complexity = kwargs.get("complexity", "MEDIUM")
m.linked_evidence = kwargs.get("linked_evidence", [])
m.evidence_gaps = kwargs.get("evidence_gaps", [])
m.related_controls = kwargs.get("related_controls", {})
m.verified_at = kwargs.get("verified_at", None)
m.verified_by = kwargs.get("verified_by", None)
m.effectiveness_rating = kwargs.get("effectiveness_rating", None)
m.created_by = kwargs.get("created_by", "system")
m.created_at = kwargs.get("created_at", datetime(2024, 1, 1, tzinfo=timezone.utc))
m.updated_at = kwargs.get("updated_at", datetime(2024, 1, 2, tzinfo=timezone.utc))
return m
# =============================================================================
# Schema Tests
# =============================================================================
class TestTOMStateBody:
def test_get_tenant_id_from_tenant_id(self):
body = TOMStateBody(tenant_id="abc", state={})
assert body.get_tenant_id() == "abc"
def test_get_tenant_id_from_camelcase(self):
body = TOMStateBody(tenantId="def", state={})
assert body.get_tenant_id() == "def"
def test_get_tenant_id_default(self):
body = TOMStateBody(state={})
assert body.get_tenant_id() == DEFAULT_TENANT
def test_version_optional(self):
body = TOMStateBody(tenant_id="x", state={"foo": "bar"})
assert body.version is None
class TestTOMMeasureCreate:
def test_defaults(self):
mc = TOMMeasureCreate(
control_id="TOM.GOV.01",
name="Test",
category="GOVERNANCE",
type="ORGANIZATIONAL",
)
assert mc.applicability == "REQUIRED"
assert mc.implementation_status == "NOT_IMPLEMENTED"
assert mc.priority is None
assert mc.linked_evidence is None
def test_full_values(self):
mc = TOMMeasureCreate(
control_id="TOM.ACC.02",
name="Zugriffskontrolle",
description="RBAC implementieren",
category="ACCESS_CONTROL",
type="TECHNICAL",
applicability="REQUIRED",
implementation_status="IMPLEMENTED",
priority="CRITICAL",
complexity="HIGH",
)
assert mc.control_id == "TOM.ACC.02"
assert mc.priority == "CRITICAL"
class TestTOMMeasureUpdate:
def test_partial(self):
mu = TOMMeasureUpdate(implementation_status="IMPLEMENTED")
data = mu.model_dump(exclude_unset=True)
assert data == {"implementation_status": "IMPLEMENTED"}
def test_empty(self):
mu = TOMMeasureUpdate()
data = mu.model_dump(exclude_unset=True)
assert data == {}
class TestTOMStatsResponse:
def test_defaults(self):
stats = TOMStatsResponse()
assert stats.total == 0
assert stats.by_status == {}
assert stats.overdue_review_count == 0
def test_full(self):
stats = TOMStatsResponse(
total=10,
by_status={"IMPLEMENTED": 5, "NOT_IMPLEMENTED": 3, "PARTIAL": 2},
by_category={"GOVERNANCE": 4, "ACCESS_CONTROL": 6},
overdue_review_count=2,
implemented=5,
partial=2,
not_implemented=3,
)
assert stats.total == 10
assert stats.implemented == 5
class TestTOMMeasureResponse:
def test_from_dict(self):
resp = TOMMeasureResponse(
id="abc",
tenant_id=DEFAULT_TENANT,
control_id="TOM.GOV.01",
name="Test",
category="GOVERNANCE",
type="ORGANIZATIONAL",
)
assert resp.id == "abc"
assert resp.linked_evidence == []
# =============================================================================
# DB Model Tests
# =============================================================================
class TestTOMModels:
def test_state_repr(self):
s = TOMStateDB()
s.tenant_id = "test"
s.version = 3
assert "test" in repr(s)
assert "v3" in repr(s)
def test_measure_repr(self):
m = TOMMeasureDB()
m.control_id = "TOM.ACC.01"
m.name = "Zugriffskontrolle"
assert "TOM.ACC.01" in repr(m)
# =============================================================================
# Helper Function Tests
# =============================================================================
class TestParseDt:
def test_none(self):
assert _parse_dt(None) is None
def test_empty_string(self):
assert _parse_dt("") is None
def test_iso_format(self):
dt = _parse_dt("2024-01-15T10:30:00+00:00")
assert dt is not None
assert dt.year == 2024
assert dt.month == 1
def test_iso_with_z(self):
dt = _parse_dt("2024-06-15T12:00:00Z")
assert dt is not None
assert dt.year == 2024
def test_invalid_string(self):
assert _parse_dt("not-a-date") is None
class TestMeasureToDict:
def test_full_conversion(self):
m = _make_measure_row()
d = _measure_to_dict(m)
assert d["id"] == MEASURE_ID
assert d["control_id"] == "TOM.GOV.01"
assert d["name"] == "Datenschutzrichtlinie"
assert d["category"] == "GOVERNANCE"
assert d["type"] == "ORGANIZATIONAL"
assert d["linked_evidence"] == []
assert d["related_controls"] == {}
assert d["created_at"] is not None
def test_with_dates(self):
m = _make_measure_row(
implementation_date=datetime(2024, 3, 1, tzinfo=timezone.utc),
review_date=datetime(2025, 3, 1, tzinfo=timezone.utc),
)
d = _measure_to_dict(m)
assert "2024-03-01" in d["implementation_date"]
assert "2025-03-01" in d["review_date"]
def test_null_dates(self):
m = _make_measure_row()
d = _measure_to_dict(m)
assert d["implementation_date"] is None
assert d["review_date"] is None
assert d["verified_at"] is None
# =============================================================================
# Route Tests (with mocked DB)
# =============================================================================
from classroom_engine.database import get_db
def override_get_db(mock_db):
def _override():
return mock_db
return _override
class TestStateRoutes:
def test_get_state_new_tenant(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/state?tenant_id=new-tenant")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["isNew"] is True
assert data["data"]["version"] == 0
app.dependency_overrides.clear()
def test_get_state_existing(self):
db = _make_mock_db()
row = _make_state_row(state={"steps": [1, 2, 3]})
db.filter.return_value.first.return_value = row
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get(f"/tom/state?tenant_id={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["data"]["version"] == 1
assert data["data"]["state"]["steps"] == [1, 2, 3]
app.dependency_overrides.clear()
def test_post_state_new(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
def mock_refresh(obj):
obj.version = 1
obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
obj.state = {"test": True}
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenant_id": DEFAULT_TENANT,
"state": {"test": True},
})
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
db.add.assert_called_once()
app.dependency_overrides.clear()
def test_post_state_version_conflict(self):
db = _make_mock_db()
row = _make_state_row(version=5)
db.filter.return_value.first.return_value = row
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenant_id": DEFAULT_TENANT,
"state": {"test": True},
"version": 3, # Expected 3, actual 5
})
assert resp.status_code == 409
app.dependency_overrides.clear()
def test_post_state_update_existing(self):
db = _make_mock_db()
row = _make_state_row(version=2)
db.filter.return_value.first.return_value = row
def mock_refresh(obj):
pass # row already has attributes
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenant_id": DEFAULT_TENANT,
"state": {"new": "data"},
"version": 2,
})
assert resp.status_code == 200
assert row.version == 3
assert row.state == {"new": "data"}
app.dependency_overrides.clear()
def test_delete_state(self):
db = _make_mock_db()
row = _make_state_row()
db.filter.return_value.first.return_value = row
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.delete(f"/tom/state?tenant_id={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["deleted"] is True
db.delete.assert_called_once()
app.dependency_overrides.clear()
def test_delete_state_not_found(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.delete(f"/tom/state?tenant_id={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["deleted"] is False
app.dependency_overrides.clear()
def test_delete_state_missing_tenant(self):
db = _make_mock_db()
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.delete("/tom/state")
assert resp.status_code == 400
app.dependency_overrides.clear()
class TestMeasureRoutes:
def test_list_measures_empty(self):
db = _make_mock_db()
db.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = []
db.filter.return_value.count.return_value = 0
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/measures")
assert resp.status_code == 200
data = resp.json()
assert data["measures"] == []
assert data["total"] == 0
app.dependency_overrides.clear()
def test_list_measures_with_data(self):
db = _make_mock_db()
measures = [_make_measure_row("TOM.GOV.01"), _make_measure_row("TOM.ACC.01", name="Zugriff")]
db.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = measures
db.filter.return_value.count.return_value = 2
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/measures")
assert resp.status_code == 200
data = resp.json()
assert len(data["measures"]) == 2
assert data["total"] == 2
app.dependency_overrides.clear()
def test_create_measure(self):
db = _make_mock_db()
# No existing measure with same control_id
db.filter.return_value.first.return_value = None
def mock_refresh(obj):
obj.id = uuid.UUID(MEASURE_ID)
obj.created_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures", json={
"control_id": "TOM.GOV.01",
"name": "Datenschutzrichtlinie",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
})
assert resp.status_code == 201
data = resp.json()
assert data["control_id"] == "TOM.GOV.01"
db.add.assert_called_once()
app.dependency_overrides.clear()
def test_create_measure_duplicate(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = _make_measure_row()
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures", json={
"control_id": "TOM.GOV.01",
"name": "Duplicate",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
})
assert resp.status_code == 409
app.dependency_overrides.clear()
def test_update_measure(self):
db = _make_mock_db()
row = _make_measure_row()
db.filter.return_value.first.return_value = row
def mock_refresh(obj):
pass
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.put(f"/tom/measures/{MEASURE_ID}", json={
"implementation_status": "IMPLEMENTED",
"responsible_person": "Max Mustermann",
})
assert resp.status_code == 200
assert row.implementation_status == "IMPLEMENTED"
assert row.responsible_person == "Max Mustermann"
app.dependency_overrides.clear()
def test_update_measure_not_found(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.put(f"/tom/measures/{UNKNOWN_ID}", json={
"implementation_status": "IMPLEMENTED",
})
assert resp.status_code == 404
app.dependency_overrides.clear()
def test_bulk_upsert_create(self):
db = _make_mock_db()
# No existing measures
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures/bulk", json={
"tenant_id": DEFAULT_TENANT,
"measures": [
{
"control_id": "TOM.GOV.01",
"name": "Datenschutzrichtlinie",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
},
{
"control_id": "TOM.ACC.01",
"name": "Zugriffskontrolle",
"category": "ACCESS_CONTROL",
"type": "TECHNICAL",
},
],
})
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["created"] == 2
assert data["updated"] == 0
assert data["total"] == 2
app.dependency_overrides.clear()
def test_bulk_upsert_update(self):
db = _make_mock_db()
existing = _make_measure_row("TOM.GOV.01")
db.filter.return_value.first.return_value = existing
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/measures/bulk", json={
"measures": [
{
"control_id": "TOM.GOV.01",
"name": "Updated Name",
"category": "GOVERNANCE",
"type": "ORGANIZATIONAL",
},
],
})
assert resp.status_code == 200
data = resp.json()
assert data["updated"] == 1
assert data["created"] == 0
assert existing.name == "Updated Name"
app.dependency_overrides.clear()
class TestStatsRoute:
def test_stats_empty(self):
db = _make_mock_db()
db.filter.return_value.count.return_value = 0
db.filter.return_value.group_by.return_value.all.return_value = []
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["by_status"] == {}
assert data["by_category"] == {}
app.dependency_overrides.clear()
def test_stats_with_data(self):
db = _make_mock_db()
# Total count
base_q = MagicMock()
base_q.count.return_value = 10
base_q.filter.return_value.count.return_value = 2 # overdue
# Status group_by
status_q = MagicMock()
status_q.all.return_value = [("IMPLEMENTED", 5), ("NOT_IMPLEMENTED", 3), ("PARTIAL", 2)]
# Category group_by
cat_q = MagicMock()
cat_q.all.return_value = [("GOVERNANCE", 4), ("ACCESS_CONTROL", 6)]
call_count = [0]
original_filter = db.query.return_value.filter
def mock_filter(*args):
call_count[0] += 1
if call_count[0] == 1:
return base_q
elif call_count[0] == 2:
mock_gby = MagicMock()
mock_gby.all.return_value = [("IMPLEMENTED", 5), ("NOT_IMPLEMENTED", 3), ("PARTIAL", 2)]
result = MagicMock()
result.group_by.return_value = mock_gby
return result
elif call_count[0] == 3:
mock_gby = MagicMock()
mock_gby.all.return_value = [("GOVERNANCE", 4), ("ACCESS_CONTROL", 6)]
result = MagicMock()
result.group_by.return_value = mock_gby
return result
return MagicMock()
db.query.return_value.filter = mock_filter
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 10
app.dependency_overrides.clear()
class TestExportRoute:
def test_export_json(self):
db = _make_mock_db()
measures = [_make_measure_row("TOM.GOV.01")]
db.filter.return_value.order_by.return_value.all.return_value = measures
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/export?format=json")
assert resp.status_code == 200
assert "application/json" in resp.headers.get("content-type", "")
data = resp.json()
assert len(data) == 1
assert data[0]["control_id"] == "TOM.GOV.01"
app.dependency_overrides.clear()
def test_export_csv(self):
db = _make_mock_db()
measures = [_make_measure_row("TOM.GOV.01")]
db.filter.return_value.order_by.return_value.all.return_value = measures
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/export?format=csv")
assert resp.status_code == 200
assert "text/csv" in resp.headers.get("content-type", "")
content = resp.text
assert "control_id" in content # Header
assert "TOM.GOV.01" in content
app.dependency_overrides.clear()
def test_export_csv_empty(self):
db = _make_mock_db()
db.filter.return_value.order_by.return_value.all.return_value = []
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get("/tom/export?format=csv")
assert resp.status_code == 200
content = resp.text
assert "control_id" in content # Header still present
app.dependency_overrides.clear()
# =============================================================================
# camelCase tenantId alias tests
# =============================================================================
class TestTenantIdAlias:
def test_get_state_camelcase(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.get(f"/tom/state?tenantId={DEFAULT_TENANT}")
assert resp.status_code == 200
data = resp.json()
assert data["data"]["tenantId"] == DEFAULT_TENANT
app.dependency_overrides.clear()
def test_post_state_camelcase(self):
db = _make_mock_db()
db.filter.return_value.first.return_value = None
def mock_refresh(obj):
obj.version = 1
obj.updated_at = datetime(2024, 1, 1, tzinfo=timezone.utc)
obj.state = {}
db.refresh = mock_refresh
app.dependency_overrides[get_db] = override_get_db(db)
client = TestClient(app)
resp = client.post("/tom/state", json={
"tenantId": DEFAULT_TENANT,
"state": {},
})
assert resp.status_code == 200
app.dependency_overrides.clear()

View File

@@ -2,7 +2,7 @@
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime, date
from datetime import datetime, date, timedelta, timezone
import uuid
from compliance.api.schemas import (
@@ -104,10 +104,24 @@ class TestVVTStatsResponse:
third_country_count=0,
draft_count=3,
approved_count=2,
overdue_review_count=1,
)
assert stats.total == 5
assert stats.by_status["DRAFT"] == 3
assert stats.dpia_required_count == 1
assert stats.overdue_review_count == 1
def test_stats_overdue_default_zero(self):
stats = VVTStatsResponse(
total=0,
by_status={},
by_business_function={},
dpia_required_count=0,
third_country_count=0,
draft_count=0,
approved_count=0,
)
assert stats.overdue_review_count == 0
# =============================================================================
@@ -168,6 +182,10 @@ class TestActivityToResponse:
act.status = kwargs.get("status", "DRAFT")
act.responsible = kwargs.get("responsible", None)
act.owner = kwargs.get("owner", None)
act.last_reviewed_at = kwargs.get("last_reviewed_at", None)
act.next_review_at = kwargs.get("next_review_at", None)
act.created_by = kwargs.get("created_by", None)
act.dsfa_id = kwargs.get("dsfa_id", None)
act.created_at = datetime.utcnow()
act.updated_at = None
return act
@@ -220,3 +238,196 @@ class TestLogAudit:
_log_audit(mock_db, "DELETE", "activity")
added = mock_db.add.call_args[0][0]
assert added.changed_by == "system"
# =============================================================================
# Consolidation Tests (Go → Python feature parity)
# =============================================================================
class TestVVTConsolidationSchemas:
"""Tests for new fields ported from Go: review dates, created_by, dsfa_id."""
def test_activity_create_with_review_dates(self):
now = datetime.now(timezone.utc)
future = now + timedelta(days=365)
req = VVTActivityCreate(
vvt_id="VVT-REV-001",
name="Review-Test",
last_reviewed_at=now,
next_review_at=future,
)
assert req.last_reviewed_at == now
assert req.next_review_at == future
def test_activity_create_sets_created_by(self):
req = VVTActivityCreate(
vvt_id="VVT-CB-001",
name="Created-By Test",
created_by="admin@example.com",
)
assert req.created_by == "admin@example.com"
def test_activity_create_created_by_defaults_none(self):
req = VVTActivityCreate(vvt_id="VVT-CB-002", name="Default Test")
assert req.created_by is None
def test_activity_create_with_dsfa_id(self):
dsfa_uuid = str(uuid.uuid4())
req = VVTActivityCreate(
vvt_id="VVT-DSFA-001",
name="DSFA-Link Test",
dsfa_id=dsfa_uuid,
)
assert req.dsfa_id == dsfa_uuid
def test_activity_update_review_dates(self):
now = datetime.now(timezone.utc)
req = VVTActivityUpdate(
last_reviewed_at=now,
next_review_at=now + timedelta(days=180),
)
data = req.model_dump(exclude_none=True)
assert "last_reviewed_at" in data
assert "next_review_at" in data
def test_activity_update_dsfa_id(self):
dsfa_uuid = str(uuid.uuid4())
req = VVTActivityUpdate(dsfa_id=dsfa_uuid)
data = req.model_dump(exclude_none=True)
assert data["dsfa_id"] == dsfa_uuid
class TestVVTConsolidationResponse:
"""Tests for new fields in response mapping."""
def _make_activity(self, **kwargs) -> VVTActivityDB:
act = VVTActivityDB()
act.id = uuid.uuid4()
act.vvt_id = kwargs.get("vvt_id", "VVT-001")
act.name = kwargs.get("name", "Test")
act.description = None
act.purposes = []
act.legal_bases = []
act.data_subject_categories = []
act.personal_data_categories = []
act.recipient_categories = []
act.third_country_transfers = []
act.retention_period = {}
act.tom_description = None
act.business_function = None
act.systems = []
act.deployment_model = None
act.data_sources = []
act.data_flows = []
act.protection_level = "MEDIUM"
act.dpia_required = False
act.structured_toms = {}
act.status = "DRAFT"
act.responsible = None
act.owner = None
act.last_reviewed_at = kwargs.get("last_reviewed_at", None)
act.next_review_at = kwargs.get("next_review_at", None)
act.created_by = kwargs.get("created_by", None)
act.dsfa_id = kwargs.get("dsfa_id", None)
act.created_at = datetime.utcnow()
act.updated_at = None
return act
def test_response_includes_review_dates(self):
now = datetime.now(timezone.utc)
future = now + timedelta(days=365)
act = self._make_activity(last_reviewed_at=now, next_review_at=future)
resp = _activity_to_response(act)
assert resp.last_reviewed_at == now
assert resp.next_review_at == future
def test_response_includes_created_by(self):
act = self._make_activity(created_by="admin@example.com")
resp = _activity_to_response(act)
assert resp.created_by == "admin@example.com"
def test_response_includes_dsfa_id(self):
dsfa_uuid = uuid.uuid4()
act = self._make_activity(dsfa_id=dsfa_uuid)
resp = _activity_to_response(act)
assert resp.dsfa_id == str(dsfa_uuid)
def test_response_null_new_fields(self):
act = self._make_activity()
resp = _activity_to_response(act)
assert resp.last_reviewed_at is None
assert resp.next_review_at is None
assert resp.created_by is None
assert resp.dsfa_id is None
class TestVVTCsvExport:
"""Tests for CSV export functionality."""
def _collect_csv_body(self, response) -> str:
"""Extract text from StreamingResponse (async generator)."""
import asyncio
async def _read():
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk)
return ''.join(chunks)
return asyncio.get_event_loop().run_until_complete(_read())
def test_export_csv_format(self):
from compliance.api.vvt_routes import _export_csv
act = VVTActivityDB()
act.id = uuid.uuid4()
act.vvt_id = "VVT-CSV-001"
act.name = "CSV Test"
act.purposes = ["Zweck A", "Zweck B"]
act.legal_bases = ["Art. 6 Abs. 1b"]
act.personal_data_categories = ["Email"]
act.data_subject_categories = ["Kunden"]
act.recipient_categories = ["IT-Dienstleister"]
act.third_country_transfers = ["USA"]
act.retention_period = {"duration": "3 Jahre"}
act.status = "APPROVED"
act.responsible = "DSB"
act.created_by = "admin"
act.created_at = datetime(2026, 1, 15, 10, 30)
act.updated_at = None
response = _export_csv([act])
text = self._collect_csv_body(response)
assert 'VVT-CSV-001' in text
assert 'CSV Test' in text
assert 'APPROVED' in text
def test_export_csv_semicolon_separator(self):
from compliance.api.vvt_routes import _export_csv
act = VVTActivityDB()
act.id = uuid.uuid4()
act.vvt_id = "VVT-SEP-001"
act.name = "Separator Test"
act.purposes = []
act.legal_bases = []
act.personal_data_categories = []
act.data_subject_categories = []
act.recipient_categories = []
act.third_country_transfers = []
act.retention_period = {}
act.status = "DRAFT"
act.responsible = ""
act.created_by = "system"
act.created_at = datetime(2026, 3, 1, 12, 0)
act.updated_at = None
response = _export_csv([act])
text = self._collect_csv_body(response)
lines = text.strip().split('\n')
header = lines[0]
assert ';' in header
assert 'ID;VVT-ID;Name' in header.replace('\ufeff', '')
def test_export_csv_empty_list(self):
from compliance.api.vvt_routes import _export_csv
response = _export_csv([])
text = self._collect_csv_body(response)
lines = text.strip().split('\n')
assert len(lines) == 1