feat: Package 4 Rechtliche Texte — DB-Persistenz fuer Legal Documents, Einwilligungen und Cookie Banner
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 46s
CI / test-python-backend-compliance (push) Successful in 32s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 17s

- Migration 007: compliance_legal_documents, _versions, _approvals (Approval-Workflow)
- Migration 008: compliance_einwilligungen_catalog, _company, _cookies, _consents
- Backend: legal_document_routes.py (11 Endpoints + draft→review→approved→published Workflow)
- Backend: einwilligungen_routes.py (10 Endpoints inkl. Stats, Pagination, Revoke)
- Frontend: /api/admin/consent/[[...path]] Catch-All-Proxy fuer Legal Documents
- Frontend: catalog/consent/cookie-banner routes von In-Memory auf DB-Proxy umgestellt
- Frontend: einwilligungen/page.tsx + cookie-banner/page.tsx laden jetzt via API (kein Mock)
- Tests: 44/44 pass (test_legal_document_routes.py + test_einwilligungen_routes.py)
- Deploy-Scripts: apply_legal_docs_migration.sh + apply_einwilligungen_migration.sh

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-03 08:25:13 +01:00
parent 799668e472
commit 113ecdfa77
17 changed files with 2501 additions and 664 deletions

View File

@@ -10,6 +10,8 @@ from .scraper_routes import router as scraper_router
from .module_routes import router as module_router
from .isms_routes import router as isms_router
from .vvt_routes import router as vvt_router
from .legal_document_routes import router as legal_document_router
from .einwilligungen_routes import router as einwilligungen_router
# Include sub-routers
router.include_router(audit_router)
@@ -21,6 +23,8 @@ router.include_router(scraper_router)
router.include_router(module_router)
router.include_router(isms_router)
router.include_router(vvt_router)
router.include_router(legal_document_router)
router.include_router(einwilligungen_router)
__all__ = [
"router",
@@ -33,4 +37,6 @@ __all__ = [
"module_router",
"isms_router",
"vvt_router",
"legal_document_router",
"einwilligungen_router",
]

View File

@@ -0,0 +1,390 @@
"""
FastAPI routes for Einwilligungen — Consent-Tracking, Cookie-Banner und Datenpunktkatalog.
Endpoints:
GET /einwilligungen/catalog — Katalog laden
PUT /einwilligungen/catalog — Katalog speichern (Upsert by tenant_id)
GET /einwilligungen/company — Firmeninfo laden
PUT /einwilligungen/company — Firmeninfo speichern (Upsert)
GET /einwilligungen/cookies — Cookie-Banner-Config laden
PUT /einwilligungen/cookies — Cookie-Banner-Config speichern (Upsert)
GET /einwilligungen/consents — Consent-Liste (Pagination + Filter)
POST /einwilligungen/consents — Consent erfassen
PUT /einwilligungen/consents/{id}/revoke — Consent widerrufen
GET /einwilligungen/consents/stats — Statistiken
"""
import logging
from datetime import datetime
from typing import Optional, List, Any, Dict
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db.einwilligungen_models import (
EinwilligungenCatalogDB,
EinwilligungenCompanyDB,
EinwilligungenCookiesDB,
EinwilligungenConsentDB,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/einwilligungen", tags=["einwilligungen"])
# ============================================================================
# Pydantic Schemas
# ============================================================================
class CatalogUpsert(BaseModel):
selected_data_point_ids: List[str] = []
custom_data_points: List[Dict[str, Any]] = []
class CompanyUpsert(BaseModel):
data: Dict[str, Any] = {}
class CookiesUpsert(BaseModel):
categories: List[Dict[str, Any]] = []
config: Dict[str, Any] = {}
class ConsentCreate(BaseModel):
user_id: str
data_point_id: str
granted: bool
consent_version: str = '1.0'
source: Optional[str] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
# ============================================================================
# Helpers
# ============================================================================
def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str:
if not x_tenant_id:
raise HTTPException(status_code=400, detail="X-Tenant-ID header required")
return x_tenant_id
# ============================================================================
# Catalog
# ============================================================================
@router.get("/catalog")
async def get_catalog(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Load the data point catalog for a tenant."""
record = db.query(EinwilligungenCatalogDB).filter(
EinwilligungenCatalogDB.tenant_id == tenant_id
).first()
if not record:
return {
"tenant_id": tenant_id,
"selected_data_point_ids": [],
"custom_data_points": [],
"updated_at": None,
}
return {
"tenant_id": tenant_id,
"selected_data_point_ids": record.selected_data_point_ids or [],
"custom_data_points": record.custom_data_points or [],
"updated_at": record.updated_at,
}
@router.put("/catalog")
async def upsert_catalog(
request: CatalogUpsert,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Create or update the data point catalog for a tenant."""
record = db.query(EinwilligungenCatalogDB).filter(
EinwilligungenCatalogDB.tenant_id == tenant_id
).first()
if record:
record.selected_data_point_ids = request.selected_data_point_ids
record.custom_data_points = request.custom_data_points
record.updated_at = datetime.utcnow()
else:
record = EinwilligungenCatalogDB(
tenant_id=tenant_id,
selected_data_point_ids=request.selected_data_point_ids,
custom_data_points=request.custom_data_points,
)
db.add(record)
db.commit()
db.refresh(record)
return {
"success": True,
"tenant_id": tenant_id,
"selected_data_point_ids": record.selected_data_point_ids,
"custom_data_points": record.custom_data_points,
"updated_at": record.updated_at,
}
# ============================================================================
# Company Info
# ============================================================================
@router.get("/company")
async def get_company(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Load company information for DSI generation."""
record = db.query(EinwilligungenCompanyDB).filter(
EinwilligungenCompanyDB.tenant_id == tenant_id
).first()
if not record:
return {"tenant_id": tenant_id, "data": {}, "updated_at": None}
return {"tenant_id": tenant_id, "data": record.data or {}, "updated_at": record.updated_at}
@router.put("/company")
async def upsert_company(
request: CompanyUpsert,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Create or update company information for a tenant."""
record = db.query(EinwilligungenCompanyDB).filter(
EinwilligungenCompanyDB.tenant_id == tenant_id
).first()
if record:
record.data = request.data
record.updated_at = datetime.utcnow()
else:
record = EinwilligungenCompanyDB(tenant_id=tenant_id, data=request.data)
db.add(record)
db.commit()
db.refresh(record)
return {"success": True, "tenant_id": tenant_id, "data": record.data, "updated_at": record.updated_at}
# ============================================================================
# Cookie Banner Config
# ============================================================================
@router.get("/cookies")
async def get_cookies(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Load cookie banner configuration for a tenant."""
record = db.query(EinwilligungenCookiesDB).filter(
EinwilligungenCookiesDB.tenant_id == tenant_id
).first()
if not record:
return {"tenant_id": tenant_id, "categories": [], "config": {}, "updated_at": None}
return {
"tenant_id": tenant_id,
"categories": record.categories or [],
"config": record.config or {},
"updated_at": record.updated_at,
}
@router.put("/cookies")
async def upsert_cookies(
request: CookiesUpsert,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Create or update cookie banner configuration for a tenant."""
record = db.query(EinwilligungenCookiesDB).filter(
EinwilligungenCookiesDB.tenant_id == tenant_id
).first()
if record:
record.categories = request.categories
record.config = request.config
record.updated_at = datetime.utcnow()
else:
record = EinwilligungenCookiesDB(
tenant_id=tenant_id,
categories=request.categories,
config=request.config,
)
db.add(record)
db.commit()
db.refresh(record)
return {
"success": True,
"tenant_id": tenant_id,
"categories": record.categories,
"config": record.config,
"updated_at": record.updated_at,
}
# ============================================================================
# Consents
# ============================================================================
@router.get("/consents/stats")
async def get_consent_stats(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Get consent statistics for a tenant."""
all_consents = db.query(EinwilligungenConsentDB).filter(
EinwilligungenConsentDB.tenant_id == tenant_id
).all()
total = len(all_consents)
active = sum(1 for c in all_consents if c.granted and not c.revoked_at)
revoked = sum(1 for c in all_consents if c.revoked_at)
# Unique users
unique_users = len(set(c.user_id for c in all_consents))
users_with_active = len(set(c.user_id for c in all_consents if c.granted and not c.revoked_at))
conversion_rate = round((users_with_active / unique_users * 100), 1) if unique_users > 0 else 0.0
# By data point
by_data_point: Dict[str, Dict] = {}
for c in all_consents:
dp = c.data_point_id
if dp not in by_data_point:
by_data_point[dp] = {"total": 0, "active": 0, "revoked": 0}
by_data_point[dp]["total"] += 1
if c.granted and not c.revoked_at:
by_data_point[dp]["active"] += 1
if c.revoked_at:
by_data_point[dp]["revoked"] += 1
return {
"total_consents": total,
"active_consents": active,
"revoked_consents": revoked,
"unique_users": unique_users,
"conversion_rate": conversion_rate,
"by_data_point": by_data_point,
}
@router.get("/consents")
async def list_consents(
tenant_id: str = Depends(_get_tenant),
user_id: Optional[str] = Query(None),
data_point_id: Optional[str] = Query(None),
granted: Optional[bool] = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
):
"""List consent records with optional filters and pagination."""
query = db.query(EinwilligungenConsentDB).filter(
EinwilligungenConsentDB.tenant_id == tenant_id
)
if user_id:
query = query.filter(EinwilligungenConsentDB.user_id == user_id)
if data_point_id:
query = query.filter(EinwilligungenConsentDB.data_point_id == data_point_id)
if granted is not None:
query = query.filter(EinwilligungenConsentDB.granted == granted)
total = query.count()
consents = query.order_by(EinwilligungenConsentDB.created_at.desc()).offset(offset).limit(limit).all()
return {
"total": total,
"offset": offset,
"limit": limit,
"consents": [
{
"id": str(c.id),
"tenant_id": c.tenant_id,
"user_id": c.user_id,
"data_point_id": c.data_point_id,
"granted": c.granted,
"granted_at": c.granted_at,
"revoked_at": c.revoked_at,
"consent_version": c.consent_version,
"source": c.source,
"created_at": c.created_at,
}
for c in consents
],
}
@router.post("/consents", status_code=201)
async def create_consent(
request: ConsentCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Record a new consent entry."""
consent = EinwilligungenConsentDB(
tenant_id=tenant_id,
user_id=request.user_id,
data_point_id=request.data_point_id,
granted=request.granted,
granted_at=datetime.utcnow(),
consent_version=request.consent_version,
source=request.source,
ip_address=request.ip_address,
user_agent=request.user_agent,
)
db.add(consent)
db.commit()
db.refresh(consent)
return {
"success": True,
"id": str(consent.id),
"user_id": consent.user_id,
"data_point_id": consent.data_point_id,
"granted": consent.granted,
"granted_at": consent.granted_at,
}
@router.put("/consents/{consent_id}/revoke")
async def revoke_consent(
consent_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Revoke an active consent."""
consent = db.query(EinwilligungenConsentDB).filter(
EinwilligungenConsentDB.id == consent_id,
EinwilligungenConsentDB.tenant_id == tenant_id,
).first()
if not consent:
raise HTTPException(status_code=404, detail=f"Consent {consent_id} not found")
if consent.revoked_at:
raise HTTPException(status_code=400, detail="Consent is already revoked")
consent.revoked_at = datetime.utcnow()
db.commit()
db.refresh(consent)
return {
"success": True,
"id": str(consent.id),
"revoked_at": consent.revoked_at,
}

View File

@@ -0,0 +1,406 @@
"""
FastAPI routes for Legal Documents — Rechtliche Texte mit Versionierung und Approval-Workflow.
Endpoints:
GET /legal-documents/documents — Liste aller Dokumente
POST /legal-documents/documents — Dokument erstellen
GET /legal-documents/documents/{id}/versions — Versionen eines Dokuments
POST /legal-documents/versions — Neue Version erstellen
PUT /legal-documents/versions/{id} — Version aktualisieren
POST /legal-documents/versions/upload-word — DOCX → HTML
POST /legal-documents/versions/{id}/submit-review — Status: draft → review
POST /legal-documents/versions/{id}/approve — Status: review → approved
POST /legal-documents/versions/{id}/reject — Status: review → rejected
POST /legal-documents/versions/{id}/publish — Status: approved → published
GET /legal-documents/versions/{id}/approval-history — Approval-Audit-Trail
"""
import logging
from datetime import datetime
from typing import Optional, List, Any, Dict
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File
from pydantic import BaseModel
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db.legal_document_models import (
LegalDocumentDB,
LegalDocumentVersionDB,
LegalDocumentApprovalDB,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/legal-documents", tags=["legal-documents"])
# ============================================================================
# Pydantic Schemas
# ============================================================================
class DocumentCreate(BaseModel):
type: str
name: str
description: Optional[str] = None
mandatory: bool = False
tenant_id: Optional[str] = None
class DocumentResponse(BaseModel):
id: str
tenant_id: Optional[str]
type: str
name: str
description: Optional[str]
mandatory: bool
created_at: datetime
updated_at: Optional[datetime]
class VersionCreate(BaseModel):
document_id: str
version: str
language: str = 'de'
title: str
content: str
summary: Optional[str] = None
created_by: Optional[str] = None
class VersionUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
summary: Optional[str] = None
version: Optional[str] = None
language: Optional[str] = None
class VersionResponse(BaseModel):
id: str
document_id: str
version: str
language: str
title: str
content: str
summary: Optional[str]
status: str
created_by: Optional[str]
approved_by: Optional[str]
approved_at: Optional[datetime]
rejection_reason: Optional[str]
created_at: datetime
updated_at: Optional[datetime]
class ApprovalHistoryEntry(BaseModel):
id: str
version_id: str
action: str
approver: Optional[str]
comment: Optional[str]
created_at: datetime
class ActionRequest(BaseModel):
approver: Optional[str] = None
comment: Optional[str] = None
# ============================================================================
# Helpers
# ============================================================================
def _doc_to_response(doc: LegalDocumentDB) -> DocumentResponse:
return DocumentResponse(
id=str(doc.id),
tenant_id=doc.tenant_id,
type=doc.type,
name=doc.name,
description=doc.description,
mandatory=doc.mandatory or False,
created_at=doc.created_at,
updated_at=doc.updated_at,
)
def _version_to_response(v: LegalDocumentVersionDB) -> VersionResponse:
return VersionResponse(
id=str(v.id),
document_id=str(v.document_id),
version=v.version,
language=v.language or 'de',
title=v.title,
content=v.content,
summary=v.summary,
status=v.status or 'draft',
created_by=v.created_by,
approved_by=v.approved_by,
approved_at=v.approved_at,
rejection_reason=v.rejection_reason,
created_at=v.created_at,
updated_at=v.updated_at,
)
def _log_approval(
db: Session,
version_id: Any,
action: str,
approver: Optional[str] = None,
comment: Optional[str] = None,
) -> LegalDocumentApprovalDB:
entry = LegalDocumentApprovalDB(
version_id=version_id,
action=action,
approver=approver,
comment=comment,
)
db.add(entry)
return entry
# ============================================================================
# Documents
# ============================================================================
@router.get("/documents", response_model=Dict[str, Any])
async def list_documents(
tenant_id: Optional[str] = Query(None),
type: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""List all legal documents, optionally filtered by tenant or type."""
query = db.query(LegalDocumentDB)
if tenant_id:
query = query.filter(LegalDocumentDB.tenant_id == tenant_id)
if type:
query = query.filter(LegalDocumentDB.type == type)
docs = query.order_by(LegalDocumentDB.created_at.desc()).all()
return {"documents": [_doc_to_response(d).dict() for d in docs]}
@router.post("/documents", response_model=DocumentResponse, status_code=201)
async def create_document(
request: DocumentCreate,
db: Session = Depends(get_db),
):
"""Create a new legal document type."""
doc = LegalDocumentDB(
tenant_id=request.tenant_id,
type=request.type,
name=request.name,
description=request.description,
mandatory=request.mandatory,
)
db.add(doc)
db.commit()
db.refresh(doc)
return _doc_to_response(doc)
@router.get("/documents/{document_id}/versions", response_model=List[VersionResponse])
async def list_versions(document_id: str, db: Session = Depends(get_db)):
"""List all versions for a legal document."""
doc = db.query(LegalDocumentDB).filter(LegalDocumentDB.id == document_id).first()
if not doc:
raise HTTPException(status_code=404, detail=f"Document {document_id} not found")
versions = (
db.query(LegalDocumentVersionDB)
.filter(LegalDocumentVersionDB.document_id == document_id)
.order_by(LegalDocumentVersionDB.created_at.desc())
.all()
)
return [_version_to_response(v) for v in versions]
# ============================================================================
# Versions
# ============================================================================
@router.post("/versions", response_model=VersionResponse, status_code=201)
async def create_version(
request: VersionCreate,
db: Session = Depends(get_db),
):
"""Create a new version for a legal document."""
doc = db.query(LegalDocumentDB).filter(LegalDocumentDB.id == request.document_id).first()
if not doc:
raise HTTPException(status_code=404, detail=f"Document {request.document_id} not found")
version = LegalDocumentVersionDB(
document_id=request.document_id,
version=request.version,
language=request.language,
title=request.title,
content=request.content,
summary=request.summary,
created_by=request.created_by,
status='draft',
)
db.add(version)
db.flush()
_log_approval(db, version.id, action='created', approver=request.created_by)
db.commit()
db.refresh(version)
return _version_to_response(version)
@router.put("/versions/{version_id}", response_model=VersionResponse)
async def update_version(
version_id: str,
request: VersionUpdate,
db: Session = Depends(get_db),
):
"""Update a draft legal document version."""
version = db.query(LegalDocumentVersionDB).filter(LegalDocumentVersionDB.id == version_id).first()
if not version:
raise HTTPException(status_code=404, detail=f"Version {version_id} not found")
if version.status not in ('draft', 'rejected'):
raise HTTPException(status_code=400, detail=f"Only draft/rejected versions can be edited (current: {version.status})")
for field, value in request.dict(exclude_none=True).items():
setattr(version, field, value)
version.updated_at = datetime.utcnow()
db.commit()
db.refresh(version)
return _version_to_response(version)
@router.post("/versions/upload-word", response_model=Dict[str, Any])
async def upload_word(file: UploadFile = File(...)):
"""Convert DOCX to HTML using mammoth (if available) or return raw text."""
if not file.filename or not file.filename.lower().endswith('.docx'):
raise HTTPException(status_code=400, detail="Only .docx files are supported")
content_bytes = await file.read()
html_content = ""
try:
import mammoth # type: ignore
import io
result = mammoth.convert_to_html(io.BytesIO(content_bytes))
html_content = result.value
except ImportError:
# Fallback: return placeholder if mammoth not installed
html_content = f"<p>[DOCX-Import: {file.filename}]</p><p>Bitte installieren Sie 'mammoth' fuer DOCX-Konvertierung.</p>"
return {"html": html_content, "filename": file.filename}
# ============================================================================
# Approval Workflow Actions
# ============================================================================
def _transition(
db: Session,
version_id: str,
from_statuses: List[str],
to_status: str,
action: str,
approver: Optional[str],
comment: Optional[str],
extra_updates: Optional[Dict] = None,
) -> VersionResponse:
version = db.query(LegalDocumentVersionDB).filter(LegalDocumentVersionDB.id == version_id).first()
if not version:
raise HTTPException(status_code=404, detail=f"Version {version_id} not found")
if version.status not in from_statuses:
raise HTTPException(
status_code=400,
detail=f"Cannot perform '{action}' on version with status '{version.status}' (expected: {from_statuses})"
)
version.status = to_status
version.updated_at = datetime.utcnow()
if extra_updates:
for k, v in extra_updates.items():
setattr(version, k, v)
_log_approval(db, version.id, action=action, approver=approver, comment=comment)
db.commit()
db.refresh(version)
return _version_to_response(version)
@router.post("/versions/{version_id}/submit-review", response_model=VersionResponse)
async def submit_review(
version_id: str,
request: ActionRequest,
db: Session = Depends(get_db),
):
"""Submit a draft version for review."""
return _transition(db, version_id, ['draft', 'rejected'], 'review', 'submitted', request.approver, request.comment)
@router.post("/versions/{version_id}/approve", response_model=VersionResponse)
async def approve_version(
version_id: str,
request: ActionRequest,
db: Session = Depends(get_db),
):
"""Approve a version under review."""
return _transition(
db, version_id, ['review'], 'approved', 'approved',
request.approver, request.comment,
extra_updates={'approved_by': request.approver, 'approved_at': datetime.utcnow()}
)
@router.post("/versions/{version_id}/reject", response_model=VersionResponse)
async def reject_version(
version_id: str,
request: ActionRequest,
db: Session = Depends(get_db),
):
"""Reject a version under review."""
return _transition(
db, version_id, ['review'], 'rejected', 'rejected',
request.approver, request.comment,
extra_updates={'rejection_reason': request.comment}
)
@router.post("/versions/{version_id}/publish", response_model=VersionResponse)
async def publish_version(
version_id: str,
request: ActionRequest,
db: Session = Depends(get_db),
):
"""Publish an approved version."""
return _transition(db, version_id, ['approved'], 'published', 'published', request.approver, request.comment)
# ============================================================================
# Approval History
# ============================================================================
@router.get("/versions/{version_id}/approval-history", response_model=List[ApprovalHistoryEntry])
async def get_approval_history(version_id: str, db: Session = Depends(get_db)):
"""Get the full approval audit trail for a version."""
version = db.query(LegalDocumentVersionDB).filter(LegalDocumentVersionDB.id == version_id).first()
if not version:
raise HTTPException(status_code=404, detail=f"Version {version_id} not found")
entries = (
db.query(LegalDocumentApprovalDB)
.filter(LegalDocumentApprovalDB.version_id == version_id)
.order_by(LegalDocumentApprovalDB.created_at.asc())
.all()
)
return [
ApprovalHistoryEntry(
id=str(e.id),
version_id=str(e.version_id),
action=e.action,
approver=e.approver,
comment=e.comment,
created_at=e.created_at,
)
for e in entries
]

View File

@@ -0,0 +1,99 @@
"""
SQLAlchemy models for Einwilligungen — Consent-Tracking und Cookie-Banner Konfiguration.
Tables:
- compliance_einwilligungen_catalog: Tenant-Katalog (aktive Datenpunkte)
- compliance_einwilligungen_company: Firmeninformationen fuer DSI-Generierung
- compliance_einwilligungen_cookies: Cookie-Banner-Konfiguration
- compliance_einwilligungen_consents: Endnutzer-Consent-Aufzeichnungen
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Column, String, Text, Boolean, DateTime, JSON, Index
)
from sqlalchemy.dialects.postgresql import UUID
from classroom_engine.database import Base
class EinwilligungenCatalogDB(Base):
"""Tenant-spezifischer Datenpunktkatalog — welche Datenpunkte sind aktiv?"""
__tablename__ = 'compliance_einwilligungen_catalog'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False, unique=True)
selected_data_point_ids = Column(JSON, default=list)
custom_data_points = Column(JSON, default=list)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_einw_catalog_tenant', 'tenant_id'),
)
def __repr__(self):
return f"<EinwilligungenCatalog tenant={self.tenant_id}>"
class EinwilligungenCompanyDB(Base):
"""Firmeninformationen fuer die DSI-Generierung."""
__tablename__ = 'compliance_einwilligungen_company'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False, unique=True)
data = Column(JSON, nullable=False, default=dict)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<EinwilligungenCompany tenant={self.tenant_id}>"
class EinwilligungenCookiesDB(Base):
"""Cookie-Banner-Konfiguration pro Tenant."""
__tablename__ = 'compliance_einwilligungen_cookies'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False, unique=True)
categories = Column(JSON, default=list)
config = Column(JSON, default=dict)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_einw_cookies_tenant', 'tenant_id'),
)
def __repr__(self):
return f"<EinwilligungenCookies tenant={self.tenant_id}>"
class EinwilligungenConsentDB(Base):
"""Endnutzer-Consent-Aufzeichnung — granulare Einwilligungen pro Datenpunkt."""
__tablename__ = 'compliance_einwilligungen_consents'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100), nullable=False)
user_id = Column(String(200), nullable=False)
data_point_id = Column(String(100), nullable=False)
granted = Column(Boolean, nullable=False, default=True)
granted_at = Column(DateTime, nullable=False, default=datetime.utcnow)
revoked_at = Column(DateTime)
ip_address = Column(String(45))
user_agent = Column(Text)
consent_version = Column(String(20), default='1.0')
source = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
__table_args__ = (
Index('idx_einw_consents_tenant', 'tenant_id'),
Index('idx_einw_consents_user', 'tenant_id', 'user_id'),
Index('idx_einw_consents_dpid', 'data_point_id'),
)
def __repr__(self):
return f"<EinwilligungenConsent user={self.user_id} dp={self.data_point_id} granted={self.granted}>"

View File

@@ -0,0 +1,90 @@
"""
SQLAlchemy models for Legal Documents — Rechtliche Texte mit Versionierung und Approval-Workflow.
Tables:
- compliance_legal_documents: Dokumenttypen (DSE, AGB, Cookie-Policy etc.)
- compliance_legal_document_versions: Versionen mit Status-Workflow
- compliance_legal_document_approvals: Audit-Trail fuer Freigaben
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Column, String, Text, Boolean, DateTime, Index, ForeignKey
)
from sqlalchemy.dialects.postgresql import UUID
from classroom_engine.database import Base
class LegalDocumentDB(Base):
"""Legal document type — DSE, AGB, Cookie-Policy, Impressum, AVV etc."""
__tablename__ = 'compliance_legal_documents'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(String(100))
type = Column(String(50), nullable=False) # privacy_policy|terms|cookie_policy|imprint|dpa
name = Column(String(300), nullable=False)
description = Column(Text)
mandatory = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_legal_docs_tenant', 'tenant_id'),
Index('idx_legal_docs_type', 'type'),
)
def __repr__(self):
return f"<LegalDocument {self.type}: {self.name}>"
class LegalDocumentVersionDB(Base):
"""Version of a legal document with Approval-Workflow status."""
__tablename__ = 'compliance_legal_document_versions'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
document_id = Column(UUID(as_uuid=True), ForeignKey('compliance_legal_documents.id', ondelete='CASCADE'), nullable=False)
version = Column(String(20), nullable=False)
language = Column(String(10), default='de')
title = Column(String(300), nullable=False)
content = Column(Text, nullable=False)
summary = Column(Text)
status = Column(String(20), default='draft') # draft|review|approved|published|archived|rejected
created_by = Column(String(200))
approved_by = Column(String(200))
approved_at = Column(DateTime)
rejection_reason = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_legal_doc_versions_doc', 'document_id'),
Index('idx_legal_doc_versions_status', 'status'),
)
def __repr__(self):
return f"<LegalDocumentVersion {self.version} [{self.status}]>"
class LegalDocumentApprovalDB(Base):
"""Audit trail for all approval actions on document versions."""
__tablename__ = 'compliance_legal_document_approvals'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
version_id = Column(UUID(as_uuid=True), ForeignKey('compliance_legal_document_versions.id', ondelete='CASCADE'), nullable=False)
action = Column(String(50), nullable=False) # submitted|approved|rejected|published|archived
approver = Column(String(200))
comment = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
__table_args__ = (
Index('idx_legal_doc_approvals_version', 'version_id'),
)
def __repr__(self):
return f"<LegalDocumentApproval {self.action} on version {self.version_id}>"

View File

@@ -0,0 +1,51 @@
-- =========================================================
-- Migration 007: Legal Documents — Rechtliche Texte mit Versionierung
-- Consent/Vorlagen, Dokumentengenerator, Workflow
-- =========================================================
-- compliance_legal_documents: Rechtsdokument-Typen (DSE, AGB, Cookie etc.)
CREATE TABLE IF NOT EXISTS compliance_legal_documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100),
type VARCHAR(50) NOT NULL, -- privacy_policy | terms | cookie_policy | imprint | dpa
name VARCHAR(300) NOT NULL,
description TEXT,
mandatory BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- compliance_legal_document_versions: Versionierung mit Approval-Workflow
CREATE TABLE IF NOT EXISTS compliance_legal_document_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID NOT NULL REFERENCES compliance_legal_documents(id) ON DELETE CASCADE,
version VARCHAR(20) NOT NULL,
language VARCHAR(10) DEFAULT 'de',
title VARCHAR(300) NOT NULL,
content TEXT NOT NULL,
summary TEXT,
status VARCHAR(20) DEFAULT 'draft', -- draft|review|approved|published|archived|rejected
created_by VARCHAR(200),
approved_by VARCHAR(200),
approved_at TIMESTAMPTZ,
rejection_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- compliance_legal_document_approvals: Audit-Trail fuer Freigaben
CREATE TABLE IF NOT EXISTS compliance_legal_document_approvals (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
version_id UUID NOT NULL REFERENCES compliance_legal_document_versions(id) ON DELETE CASCADE,
action VARCHAR(50) NOT NULL, -- submitted|approved|rejected|published|archived
approver VARCHAR(200),
comment TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indizes
CREATE INDEX IF NOT EXISTS idx_legal_docs_tenant ON compliance_legal_documents(tenant_id);
CREATE INDEX IF NOT EXISTS idx_legal_docs_type ON compliance_legal_documents(type);
CREATE INDEX IF NOT EXISTS idx_legal_doc_versions_doc ON compliance_legal_document_versions(document_id);
CREATE INDEX IF NOT EXISTS idx_legal_doc_versions_status ON compliance_legal_document_versions(status);
CREATE INDEX IF NOT EXISTS idx_legal_doc_approvals_version ON compliance_legal_document_approvals(version_id);

View File

@@ -0,0 +1,51 @@
-- =========================================================
-- Migration 008: Einwilligungen — Consent-Tracking & Cookie-Banner
-- =========================================================
-- Tenant-Katalog: Welche Datenpunkte sind aktiv?
CREATE TABLE IF NOT EXISTS compliance_einwilligungen_catalog (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL UNIQUE,
selected_data_point_ids JSONB DEFAULT '[]',
custom_data_points JSONB DEFAULT '[]',
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Firmeninformationen fuer DSI-Generierung
CREATE TABLE IF NOT EXISTS compliance_einwilligungen_company (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL UNIQUE,
data JSONB NOT NULL DEFAULT '{}',
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Cookie-Banner-Konfiguration (persistiert)
CREATE TABLE IF NOT EXISTS compliance_einwilligungen_cookies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL UNIQUE,
categories JSONB DEFAULT '[]',
config JSONB DEFAULT '{}',
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Consent-Aufzeichnungen (Endnutzer-Einwilligungen)
CREATE TABLE IF NOT EXISTS compliance_einwilligungen_consents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL,
user_id VARCHAR(200) NOT NULL,
data_point_id VARCHAR(100) NOT NULL,
granted BOOLEAN NOT NULL DEFAULT TRUE,
granted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
revoked_at TIMESTAMPTZ,
ip_address VARCHAR(45),
user_agent TEXT,
consent_version VARCHAR(20) DEFAULT '1.0',
source VARCHAR(100),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_einw_consents_tenant ON compliance_einwilligungen_consents(tenant_id);
CREATE INDEX IF NOT EXISTS idx_einw_consents_user ON compliance_einwilligungen_consents(tenant_id, user_id);
CREATE INDEX IF NOT EXISTS idx_einw_consents_dpid ON compliance_einwilligungen_consents(data_point_id);
CREATE INDEX IF NOT EXISTS idx_einw_catalog_tenant ON compliance_einwilligungen_catalog(tenant_id);
CREATE INDEX IF NOT EXISTS idx_einw_cookies_tenant ON compliance_einwilligungen_cookies(tenant_id);

View File

@@ -0,0 +1,389 @@
"""
Tests for Einwilligungen Routes — 008_einwilligungen migration.
Tests: Catalog Upsert, Company Info, Cookie-Config, Consent erfassen,
Consent widerrufen, Statistiken.
"""
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
import uuid
# ============================================================================
# Shared Fixtures
# ============================================================================
def make_uuid():
return str(uuid.uuid4())
def make_catalog(tenant_id='test-tenant'):
rec = MagicMock()
rec.id = uuid.uuid4()
rec.tenant_id = tenant_id
rec.selected_data_point_ids = ['dp-001', 'dp-002']
rec.custom_data_points = []
rec.updated_at = datetime.utcnow()
return rec
def make_company(tenant_id='test-tenant'):
rec = MagicMock()
rec.id = uuid.uuid4()
rec.tenant_id = tenant_id
rec.data = {'company_name': 'Test GmbH', 'email': 'datenschutz@test.de'}
rec.updated_at = datetime.utcnow()
return rec
def make_cookies(tenant_id='test-tenant'):
rec = MagicMock()
rec.id = uuid.uuid4()
rec.tenant_id = tenant_id
rec.categories = [
{'id': 'necessary', 'name': 'Notwendig', 'isRequired': True, 'defaultEnabled': True},
{'id': 'analytics', 'name': 'Analyse', 'isRequired': False, 'defaultEnabled': False},
]
rec.config = {'position': 'bottom', 'style': 'bar'}
rec.updated_at = datetime.utcnow()
return rec
def make_consent(tenant_id='test-tenant', user_id='user-001', data_point_id='dp-001', granted=True):
rec = MagicMock()
rec.id = uuid.uuid4()
rec.tenant_id = tenant_id
rec.user_id = user_id
rec.data_point_id = data_point_id
rec.granted = granted
rec.granted_at = datetime.utcnow()
rec.revoked_at = None
rec.consent_version = '1.0'
rec.source = 'website'
rec.ip_address = None
rec.user_agent = None
rec.created_at = datetime.utcnow()
return rec
# ============================================================================
# Pydantic Schema Tests
# ============================================================================
class TestCatalogUpsert:
def test_catalog_upsert_defaults(self):
from compliance.api.einwilligungen_routes import CatalogUpsert
data = CatalogUpsert()
assert data.selected_data_point_ids == []
assert data.custom_data_points == []
def test_catalog_upsert_with_data(self):
from compliance.api.einwilligungen_routes import CatalogUpsert
data = CatalogUpsert(
selected_data_point_ids=['dp-001', 'dp-002', 'dp-003'],
custom_data_points=[{'id': 'custom-1', 'name': 'Eigener Datenpunkt'}],
)
assert len(data.selected_data_point_ids) == 3
assert len(data.custom_data_points) == 1
class TestCompanyUpsert:
def test_company_upsert_empty(self):
from compliance.api.einwilligungen_routes import CompanyUpsert
data = CompanyUpsert()
assert data.data == {}
def test_company_upsert_with_data(self):
from compliance.api.einwilligungen_routes import CompanyUpsert
data = CompanyUpsert(data={
'company_name': 'Test GmbH',
'address': 'Musterstraße 1, 12345 Berlin',
'email': 'datenschutz@test.de',
'dpo_name': 'Max Mustermann',
})
assert data.data['company_name'] == 'Test GmbH'
assert data.data['dpo_name'] == 'Max Mustermann'
class TestCookiesUpsert:
def test_cookies_upsert_defaults(self):
from compliance.api.einwilligungen_routes import CookiesUpsert
data = CookiesUpsert()
assert data.categories == []
assert data.config == {}
def test_cookies_upsert_with_categories(self):
from compliance.api.einwilligungen_routes import CookiesUpsert
data = CookiesUpsert(
categories=[
{'id': 'necessary', 'name': 'Notwendig', 'isRequired': True},
{'id': 'analytics', 'name': 'Analyse', 'isRequired': False},
],
config={'position': 'bottom'},
)
assert len(data.categories) == 2
assert data.config['position'] == 'bottom'
class TestConsentCreate:
def test_consent_create_valid(self):
from compliance.api.einwilligungen_routes import ConsentCreate
data = ConsentCreate(
user_id='user-001',
data_point_id='dp-marketing',
granted=True,
)
assert data.user_id == 'user-001'
assert data.granted is True
assert data.consent_version == '1.0'
assert data.source is None
def test_consent_create_revoke(self):
from compliance.api.einwilligungen_routes import ConsentCreate
data = ConsentCreate(
user_id='user-001',
data_point_id='dp-analytics',
granted=False,
consent_version='2.0',
source='cookie-banner',
)
assert data.granted is False
assert data.consent_version == '2.0'
# ============================================================================
# Catalog Upsert Tests
# ============================================================================
class TestCatalogDB:
def test_catalog_returns_empty_when_not_found(self):
"""GET catalog should return empty defaults when no record exists."""
from compliance.db.einwilligungen_models import EinwilligungenCatalogDB
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = None
result = mock_db.query(EinwilligungenCatalogDB).filter().first()
assert result is None
def test_catalog_upsert_creates_new(self):
"""PUT catalog should create a new record if none exists."""
from compliance.db.einwilligungen_models import EinwilligungenCatalogDB
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = None
new_record = EinwilligungenCatalogDB(
tenant_id='test-tenant',
selected_data_point_ids=['dp-001'],
custom_data_points=[],
)
assert new_record.tenant_id == 'test-tenant'
assert new_record.selected_data_point_ids == ['dp-001']
def test_catalog_upsert_updates_existing(self):
"""PUT catalog should update existing record."""
existing = make_catalog()
existing.selected_data_point_ids = ['dp-001', 'dp-002', 'dp-003']
existing.custom_data_points = [{'id': 'custom-1'}]
assert len(existing.selected_data_point_ids) == 3
assert len(existing.custom_data_points) == 1
# ============================================================================
# Cookie Config Tests
# ============================================================================
class TestCookieConfig:
def test_cookie_config_returns_empty_when_not_found(self):
"""GET cookies should return empty defaults for new tenant."""
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = None
result = mock_db.query().filter().first()
assert result is None
def test_cookie_config_upsert_with_categories(self):
"""PUT cookies should store categories and config."""
from compliance.db.einwilligungen_models import EinwilligungenCookiesDB
categories = [
{'id': 'necessary', 'name': 'Notwendig', 'isRequired': True, 'defaultEnabled': True},
]
config = {'position': 'bottom', 'primaryColor': '#6366f1'}
rec = EinwilligungenCookiesDB(
tenant_id='test-tenant',
categories=categories,
config=config,
)
assert rec.categories[0]['id'] == 'necessary'
assert rec.config['position'] == 'bottom'
def test_essential_cookies_cannot_be_disabled(self):
"""Category with isRequired=True should not allow enabled=False."""
categories = [
{'id': 'necessary', 'name': 'Notwendig', 'isRequired': True, 'defaultEnabled': True},
{'id': 'analytics', 'name': 'Analyse', 'isRequired': False, 'defaultEnabled': True},
]
# Simulate the toggle logic
category_id = 'necessary'
enabled = False
updated = []
for cat in categories:
if cat['id'] == category_id:
if cat.get('isRequired') and not enabled:
updated.append(cat) # Not changed
else:
updated.append({**cat, 'defaultEnabled': enabled})
else:
updated.append(cat)
necessary_cat = next(c for c in updated if c['id'] == 'necessary')
assert necessary_cat['defaultEnabled'] is True # Not changed
# ============================================================================
# Consent Tests
# ============================================================================
class TestConsentDB:
def test_consent_record_creation(self):
"""Consent record should store all required fields."""
from compliance.db.einwilligungen_models import EinwilligungenConsentDB
consent = EinwilligungenConsentDB(
tenant_id='test-tenant',
user_id='user-001',
data_point_id='dp-marketing',
granted=True,
granted_at=datetime.utcnow(),
consent_version='1.0',
source='website',
)
assert consent.tenant_id == 'test-tenant'
assert consent.granted is True
assert consent.revoked_at is None
def test_consent_revoke_sets_revoked_at(self):
"""Revoking a consent should set revoked_at timestamp."""
consent = make_consent()
assert consent.revoked_at is None
consent.revoked_at = datetime.utcnow()
assert consent.revoked_at is not None
def test_cannot_revoke_already_revoked(self):
"""Should not be possible to revoke an already revoked consent."""
consent = make_consent()
consent.revoked_at = datetime.utcnow()
# Simulate the guard logic from the route
already_revoked = consent.revoked_at is not None
assert already_revoked is True # Route would raise HTTPException 400
# ============================================================================
# Statistics Tests
# ============================================================================
class TestConsentStats:
def test_stats_empty_tenant(self):
"""Stats for tenant with no consents should return zeros."""
consents = []
total = len(consents)
active = sum(1 for c in consents if c.granted and not c.revoked_at)
revoked = sum(1 for c in consents if c.revoked_at)
unique_users = len(set(c.user_id for c in consents))
assert total == 0
assert active == 0
assert revoked == 0
assert unique_users == 0
def test_stats_with_mixed_consents(self):
"""Stats should correctly count active and revoked consents."""
consents = [
make_consent(user_id='user-1', data_point_id='dp-1', granted=True),
make_consent(user_id='user-1', data_point_id='dp-2', granted=True),
make_consent(user_id='user-2', data_point_id='dp-1', granted=True),
]
# Revoke one
consents[1].revoked_at = datetime.utcnow()
total = len(consents)
active = sum(1 for c in consents if c.granted and not c.revoked_at)
revoked = sum(1 for c in consents if c.revoked_at)
unique_users = len(set(c.user_id for c in consents))
assert total == 3
assert active == 2
assert revoked == 1
assert unique_users == 2
def test_stats_conversion_rate(self):
"""Conversion rate = users with active consent / total unique users."""
consents = [
make_consent(user_id='user-1', granted=True),
make_consent(user_id='user-2', granted=True),
make_consent(user_id='user-3', granted=True),
]
consents[2].revoked_at = datetime.utcnow() # user-3 revoked
unique_users = len(set(c.user_id for c in consents))
users_with_active = len(set(c.user_id for c in consents if c.granted and not c.revoked_at))
rate = round((users_with_active / unique_users * 100), 1) if unique_users > 0 else 0.0
assert unique_users == 3
assert users_with_active == 2
assert rate == pytest.approx(66.7, 0.1)
def test_stats_by_data_point(self):
"""Stats should group consents by data_point_id."""
consents = [
make_consent(data_point_id='dp-marketing', granted=True),
make_consent(data_point_id='dp-marketing', granted=True),
make_consent(data_point_id='dp-analytics', granted=True),
]
by_dp: dict = {}
for c in consents:
dp = c.data_point_id
if dp not in by_dp:
by_dp[dp] = {'total': 0, 'active': 0}
by_dp[dp]['total'] += 1
if c.granted and not c.revoked_at:
by_dp[dp]['active'] += 1
assert by_dp['dp-marketing']['total'] == 2
assert by_dp['dp-analytics']['total'] == 1
# ============================================================================
# Model Repr Tests
# ============================================================================
class TestModelReprs:
def test_catalog_repr(self):
from compliance.db.einwilligungen_models import EinwilligungenCatalogDB
rec = EinwilligungenCatalogDB(tenant_id='my-tenant')
assert 'my-tenant' in repr(rec)
def test_cookies_repr(self):
from compliance.db.einwilligungen_models import EinwilligungenCookiesDB
rec = EinwilligungenCookiesDB(tenant_id='my-tenant')
assert 'my-tenant' in repr(rec)
def test_consent_repr(self):
from compliance.db.einwilligungen_models import EinwilligungenConsentDB
rec = EinwilligungenConsentDB(
tenant_id='t1', user_id='u1', data_point_id='dp1', granted=True
)
assert 'u1' in repr(rec)
assert 'dp1' in repr(rec)

View File

@@ -0,0 +1,313 @@
"""
Tests for Legal Document Routes — 007_legal_documents migration.
Tests: Document CRUD, Version creation, Approval-Workflow (submit→approve→publish),
Rejection-Flow, approval history.
"""
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
import uuid
# ============================================================================
# Shared Fixtures
# ============================================================================
def make_uuid():
return str(uuid.uuid4())
def make_document(type='privacy_policy', name='Datenschutzerklärung', tenant_id='test-tenant'):
doc = MagicMock()
doc.id = uuid.uuid4()
doc.tenant_id = tenant_id
doc.type = type
doc.name = name
doc.description = 'Test description'
doc.mandatory = False
doc.created_at = datetime.utcnow()
doc.updated_at = None
return doc
def make_version(document_id=None, version='1.0', status='draft', title='Test Version'):
v = MagicMock()
v.id = uuid.uuid4()
v.document_id = document_id or uuid.uuid4()
v.version = version
v.language = 'de'
v.title = title
v.content = '<p>Inhalt der Datenschutzerklärung</p>'
v.summary = 'Kurzzusammenfassung'
v.status = status
v.created_by = 'admin@test.de'
v.approved_by = None
v.approved_at = None
v.rejection_reason = None
v.created_at = datetime.utcnow()
v.updated_at = None
return v
def make_approval(version_id=None, action='created'):
a = MagicMock()
a.id = uuid.uuid4()
a.version_id = version_id or uuid.uuid4()
a.action = action
a.approver = 'admin@test.de'
a.comment = None
a.created_at = datetime.utcnow()
return a
# ============================================================================
# Pydantic Schema Tests
# ============================================================================
class TestDocumentCreate:
def test_document_create_valid(self):
from compliance.api.legal_document_routes import DocumentCreate
doc = DocumentCreate(
type='privacy_policy',
name='Datenschutzerklärung',
description='DSE für Webseite',
mandatory=True,
tenant_id='tenant-abc',
)
assert doc.type == 'privacy_policy'
assert doc.mandatory is True
assert doc.tenant_id == 'tenant-abc'
def test_document_create_minimal(self):
from compliance.api.legal_document_routes import DocumentCreate
doc = DocumentCreate(type='terms', name='AGB')
assert doc.mandatory is False
assert doc.tenant_id is None
assert doc.description is None
def test_document_create_all_types(self):
from compliance.api.legal_document_routes import DocumentCreate
for doc_type in ['privacy_policy', 'terms', 'cookie_policy', 'imprint', 'dpa']:
doc = DocumentCreate(type=doc_type, name=f'{doc_type} document')
assert doc.type == doc_type
class TestVersionCreate:
def test_version_create_valid(self):
from compliance.api.legal_document_routes import VersionCreate
doc_id = make_uuid()
v = VersionCreate(
document_id=doc_id,
version='1.0',
title='DSE Version 1.0',
content='<p>Inhalt</p>',
summary='Zusammenfassung',
created_by='admin@test.de',
)
assert v.version == '1.0'
assert v.language == 'de'
assert v.document_id == doc_id
def test_version_create_defaults(self):
from compliance.api.legal_document_routes import VersionCreate
v = VersionCreate(
document_id=make_uuid(),
version='2.0',
title='Version 2',
content='Content',
)
assert v.language == 'de'
assert v.created_by is None
assert v.summary is None
def test_version_update_partial(self):
from compliance.api.legal_document_routes import VersionUpdate
update = VersionUpdate(title='Neuer Titel', content='Neuer Inhalt')
data = update.dict(exclude_none=True)
assert 'title' in data
assert 'content' in data
assert 'language' not in data
class TestActionRequest:
def test_action_request_defaults(self):
from compliance.api.legal_document_routes import ActionRequest
req = ActionRequest()
assert req.approver is None
assert req.comment is None
def test_action_request_with_data(self):
from compliance.api.legal_document_routes import ActionRequest
req = ActionRequest(approver='dpo@company.de', comment='Alles korrekt')
assert req.approver == 'dpo@company.de'
assert req.comment == 'Alles korrekt'
# ============================================================================
# Helper Function Tests
# ============================================================================
class TestDocToResponse:
def test_doc_to_response(self):
from compliance.api.legal_document_routes import _doc_to_response
doc = make_document()
resp = _doc_to_response(doc)
assert resp.id == str(doc.id)
assert resp.type == 'privacy_policy'
assert resp.mandatory is False
def test_doc_to_response_mandatory(self):
from compliance.api.legal_document_routes import _doc_to_response
doc = make_document()
doc.mandatory = True
resp = _doc_to_response(doc)
assert resp.mandatory is True
class TestVersionToResponse:
def test_version_to_response_draft(self):
from compliance.api.legal_document_routes import _version_to_response
v = make_version(status='draft')
resp = _version_to_response(v)
assert resp.status == 'draft'
assert resp.approved_by is None
assert resp.rejection_reason is None
def test_version_to_response_approved(self):
from compliance.api.legal_document_routes import _version_to_response
v = make_version(status='approved')
v.approved_by = 'dpo@company.de'
v.approved_at = datetime.utcnow()
resp = _version_to_response(v)
assert resp.status == 'approved'
assert resp.approved_by == 'dpo@company.de'
def test_version_to_response_rejected(self):
from compliance.api.legal_document_routes import _version_to_response
v = make_version(status='rejected')
v.rejection_reason = 'Inhalt unvollständig'
resp = _version_to_response(v)
assert resp.status == 'rejected'
assert resp.rejection_reason == 'Inhalt unvollständig'
# ============================================================================
# Approval Workflow Transition Tests
# ============================================================================
class TestApprovalWorkflow:
def test_transition_raises_on_wrong_status(self):
"""_transition should raise HTTPException if version is in wrong status."""
from compliance.api.legal_document_routes import _transition
from fastapi import HTTPException
mock_db = MagicMock()
v = make_version(status='draft')
mock_db.query.return_value.filter.return_value.first.return_value = v
with pytest.raises(HTTPException) as exc_info:
_transition(mock_db, str(v.id), ['review'], 'approved', 'approved', None, None)
assert exc_info.value.status_code == 400
assert 'draft' in exc_info.value.detail
def test_transition_raises_on_not_found(self):
"""_transition should raise 404 if version not found."""
from compliance.api.legal_document_routes import _transition
from fastapi import HTTPException
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = None
with pytest.raises(HTTPException) as exc_info:
_transition(mock_db, make_uuid(), ['draft'], 'review', 'submitted', None, None)
assert exc_info.value.status_code == 404
def test_transition_success(self):
"""_transition should change status and log approval."""
from compliance.api.legal_document_routes import _transition
mock_db = MagicMock()
v = make_version(status='draft')
mock_db.query.return_value.filter.return_value.first.return_value = v
result = _transition(mock_db, str(v.id), ['draft'], 'review', 'submitted', 'admin', None)
assert v.status == 'review'
mock_db.commit.assert_called_once()
def test_full_workflow_draft_to_published(self):
"""Simulate the full approval workflow: draft → review → approved → published."""
from compliance.api.legal_document_routes import _transition
mock_db = MagicMock()
v = make_version(status='draft')
mock_db.query.return_value.filter.return_value.first.return_value = v
# Step 1: Submit for review
_transition(mock_db, str(v.id), ['draft'], 'review', 'submitted', 'author', None)
assert v.status == 'review'
# Step 2: Approve
mock_db.reset_mock()
_transition(mock_db, str(v.id), ['review'], 'approved', 'approved', 'dpo', 'Korrekt',
extra_updates={'approved_by': 'dpo', 'approved_at': datetime.utcnow()})
assert v.status == 'approved'
# Step 3: Publish
mock_db.reset_mock()
_transition(mock_db, str(v.id), ['approved'], 'published', 'published', 'dpo', None)
assert v.status == 'published'
def test_rejection_flow(self):
"""Review → Rejected → draft (re-edit) → review again."""
from compliance.api.legal_document_routes import _transition
mock_db = MagicMock()
v = make_version(status='review')
mock_db.query.return_value.filter.return_value.first.return_value = v
# Reject
_transition(mock_db, str(v.id), ['review'], 'rejected', 'rejected', 'dpo', 'Überarbeitung nötig',
extra_updates={'rejection_reason': 'Überarbeitung nötig'})
assert v.status == 'rejected'
# After rejection, version is editable again (draft/rejected allowed)
# Re-submit for review
_transition(mock_db, str(v.id), ['draft', 'rejected'], 'review', 'submitted', 'author', None)
assert v.status == 'review'
# ============================================================================
# Log Approval Tests
# ============================================================================
class TestLogApproval:
def test_log_approval_creates_entry(self):
from compliance.api.legal_document_routes import _log_approval
from compliance.db.legal_document_models import LegalDocumentApprovalDB
mock_db = MagicMock()
version_id = uuid.uuid4()
entry = _log_approval(mock_db, version_id, 'approved', 'dpo@test.de', 'Gut')
mock_db.add.assert_called_once()
added = mock_db.add.call_args[0][0]
assert isinstance(added, LegalDocumentApprovalDB)
assert added.action == 'approved'
assert added.approver == 'dpo@test.de'
def test_log_approval_without_approver(self):
from compliance.api.legal_document_routes import _log_approval
from compliance.db.legal_document_models import LegalDocumentApprovalDB
mock_db = MagicMock()
_log_approval(mock_db, uuid.uuid4(), 'created')
added = mock_db.add.call_args[0][0]
assert added.approver is None
assert added.comment is None