feat: Consent-Service Module nach Compliance migriert (DSR, E-Mail-Templates, Legal Docs, Banner)
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 36s
CI / test-python-backend-compliance (push) Successful in 31s
CI / test-python-document-crawler (push) Successful in 23s
CI / test-python-dsms-gateway (push) Successful in 18s

5-Phasen-Migration: Go consent-service Proxies durch native Python/FastAPI ersetzt.

Phase 1 — DSR (Betroffenenrechte): 6 Tabellen, 30 Endpoints, Frontend-API umgestellt
Phase 2 — E-Mail-Templates: 5 Tabellen, 20 Endpoints, neues Frontend, SDK_STEPS erweitert
Phase 3 — Legal Documents Extension: User Consents, Audit Log, Cookie-Kategorien
Phase 4 — Banner Consent: Device-Consents, Site-Configs, Kategorien, Vendors
Phase 5 — Cleanup: DSR-Proxy aus main.py entfernt, Frontend-URLs aktualisiert

148 neue Tests (50 + 47 + 26 + 25), alle bestanden.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-05 00:36:24 +01:00
parent 2211cb9349
commit b7c1a5da1a
23 changed files with 7146 additions and 542 deletions

View File

@@ -22,6 +22,9 @@ from .loeschfristen_routes import router as loeschfristen_router
from .legal_template_routes import router as legal_template_router
from .compliance_scope_routes import router as compliance_scope_router
from .dsfa_routes import router as dsfa_router
from .dsr_routes import router as dsr_router
from .email_template_routes import router as email_template_router
from .banner_routes import router as banner_router
# Include sub-routers
router.include_router(audit_router)
@@ -45,6 +48,9 @@ router.include_router(loeschfristen_router)
router.include_router(legal_template_router)
router.include_router(compliance_scope_router)
router.include_router(dsfa_router)
router.include_router(dsr_router)
router.include_router(email_template_router)
router.include_router(banner_router)
__all__ = [
"router",
@@ -69,4 +75,7 @@ __all__ = [
"legal_template_router",
"compliance_scope_router",
"dsfa_router",
"dsr_router",
"email_template_router",
"banner_router",
]

View File

@@ -0,0 +1,654 @@
"""
Banner Consent Routes — Device-basierte Cookie-Consents fuer Kunden-Websites.
Public SDK-Endpoints (fuer Einbettung) + Admin-Endpoints (Konfiguration & Stats).
"""
import uuid
import hashlib
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import func
from classroom_engine.database import get_db
from ..db.banner_models import (
BannerConsentDB, BannerConsentAuditLogDB,
BannerSiteConfigDB, BannerCategoryConfigDB, BannerVendorConfigDB,
)
router = APIRouter(prefix="/banner", tags=["compliance-banner"])
DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# =============================================================================
# Schemas
# =============================================================================
class ConsentCreate(BaseModel):
site_id: str
device_fingerprint: str
categories: List[str] = []
vendors: List[str] = []
ip_address: Optional[str] = None
user_agent: Optional[str] = None
consent_string: Optional[str] = None
class SiteConfigCreate(BaseModel):
site_id: str
site_name: Optional[str] = None
site_url: Optional[str] = None
banner_title: Optional[str] = None
banner_description: Optional[str] = None
privacy_url: Optional[str] = None
imprint_url: Optional[str] = None
dsb_name: Optional[str] = None
dsb_email: Optional[str] = None
theme: Optional[dict] = None
tcf_enabled: bool = False
class SiteConfigUpdate(BaseModel):
site_name: Optional[str] = None
site_url: Optional[str] = None
banner_title: Optional[str] = None
banner_description: Optional[str] = None
privacy_url: Optional[str] = None
imprint_url: Optional[str] = None
dsb_name: Optional[str] = None
dsb_email: Optional[str] = None
theme: Optional[dict] = None
tcf_enabled: Optional[bool] = None
is_active: Optional[bool] = None
class CategoryConfigCreate(BaseModel):
category_key: str
name_de: str
name_en: Optional[str] = None
description_de: Optional[str] = None
description_en: Optional[str] = None
is_required: bool = False
sort_order: int = 0
class VendorConfigCreate(BaseModel):
vendor_name: str
vendor_url: Optional[str] = None
category_key: str
description_de: Optional[str] = None
description_en: Optional[str] = None
cookie_names: List[str] = []
retention_days: int = 365
# =============================================================================
# Helpers
# =============================================================================
def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str:
return x_tenant_id or DEFAULT_TENANT
def _hash_ip(ip: Optional[str]) -> Optional[str]:
if not ip:
return None
return hashlib.sha256(ip.encode()).hexdigest()[:16]
def _consent_to_dict(c: BannerConsentDB) -> dict:
return {
"id": str(c.id),
"site_id": c.site_id,
"device_fingerprint": c.device_fingerprint,
"categories": c.categories or [],
"vendors": c.vendors or [],
"ip_hash": c.ip_hash,
"consent_string": c.consent_string,
"expires_at": c.expires_at.isoformat() if c.expires_at else None,
"created_at": c.created_at.isoformat() if c.created_at else None,
"updated_at": c.updated_at.isoformat() if c.updated_at else None,
}
def _site_config_to_dict(s: BannerSiteConfigDB) -> dict:
return {
"id": str(s.id),
"site_id": s.site_id,
"site_name": s.site_name,
"site_url": s.site_url,
"banner_title": s.banner_title,
"banner_description": s.banner_description,
"privacy_url": s.privacy_url,
"imprint_url": s.imprint_url,
"dsb_name": s.dsb_name,
"dsb_email": s.dsb_email,
"theme": s.theme or {},
"tcf_enabled": s.tcf_enabled,
"is_active": s.is_active,
"created_at": s.created_at.isoformat() if s.created_at else None,
"updated_at": s.updated_at.isoformat() if s.updated_at else None,
}
def _category_to_dict(c: BannerCategoryConfigDB) -> dict:
return {
"id": str(c.id),
"site_config_id": str(c.site_config_id),
"category_key": c.category_key,
"name_de": c.name_de,
"name_en": c.name_en,
"description_de": c.description_de,
"description_en": c.description_en,
"is_required": c.is_required,
"sort_order": c.sort_order,
"is_active": c.is_active,
}
def _vendor_to_dict(v: BannerVendorConfigDB) -> dict:
return {
"id": str(v.id),
"site_config_id": str(v.site_config_id),
"vendor_name": v.vendor_name,
"vendor_url": v.vendor_url,
"category_key": v.category_key,
"description_de": v.description_de,
"description_en": v.description_en,
"cookie_names": v.cookie_names or [],
"retention_days": v.retention_days,
"is_active": v.is_active,
}
def _log_banner_audit(db, tenant_id, consent_id, action, site_id, device_fingerprint=None, categories=None, ip_hash=None):
entry = BannerConsentAuditLogDB(
tenant_id=tenant_id,
consent_id=consent_id,
action=action,
site_id=site_id,
device_fingerprint=device_fingerprint,
categories=categories or [],
ip_hash=ip_hash,
)
db.add(entry)
return entry
# =============================================================================
# Public SDK Endpoints (fuer Einbettung in Kunden-Websites)
# =============================================================================
@router.post("/consent")
async def record_consent(
body: ConsentCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Record device consent (upsert by site_id + device_fingerprint)."""
tid = uuid.UUID(tenant_id)
ip_hash = _hash_ip(body.ip_address)
# Upsert: check existing
existing = db.query(BannerConsentDB).filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == body.site_id,
BannerConsentDB.device_fingerprint == body.device_fingerprint,
).first()
if existing:
existing.categories = body.categories
existing.vendors = body.vendors
existing.ip_hash = ip_hash
existing.user_agent = body.user_agent
existing.consent_string = body.consent_string
existing.expires_at = datetime.utcnow() + timedelta(days=365)
existing.updated_at = datetime.utcnow()
db.flush()
_log_banner_audit(
db, tid, existing.id, "consent_updated",
body.site_id, body.device_fingerprint, body.categories, ip_hash,
)
db.commit()
db.refresh(existing)
return _consent_to_dict(existing)
consent = BannerConsentDB(
tenant_id=tid,
site_id=body.site_id,
device_fingerprint=body.device_fingerprint,
categories=body.categories,
vendors=body.vendors,
ip_hash=ip_hash,
user_agent=body.user_agent,
consent_string=body.consent_string,
expires_at=datetime.utcnow() + timedelta(days=365),
)
db.add(consent)
db.flush()
_log_banner_audit(
db, tid, consent.id, "consent_given",
body.site_id, body.device_fingerprint, body.categories, ip_hash,
)
db.commit()
db.refresh(consent)
return _consent_to_dict(consent)
@router.get("/consent")
async def get_consent(
site_id: str = Query(...),
device_fingerprint: str = Query(...),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Retrieve consent for a device."""
tid = uuid.UUID(tenant_id)
consent = db.query(BannerConsentDB).filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
).first()
if not consent:
return {"has_consent": False, "consent": None}
return {"has_consent": True, "consent": _consent_to_dict(consent)}
@router.delete("/consent/{consent_id}")
async def withdraw_consent(
consent_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Withdraw a banner consent."""
tid = uuid.UUID(tenant_id)
try:
cid = uuid.UUID(consent_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid consent ID")
consent = db.query(BannerConsentDB).filter(
BannerConsentDB.id == cid,
BannerConsentDB.tenant_id == tid,
).first()
if not consent:
raise HTTPException(status_code=404, detail="Consent not found")
_log_banner_audit(
db, tid, cid, "consent_withdrawn",
consent.site_id, consent.device_fingerprint,
)
db.delete(consent)
db.commit()
return {"success": True, "message": "Consent withdrawn"}
@router.get("/config/{site_id}")
async def get_site_config(
site_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Load site configuration for banner display."""
tid = uuid.UUID(tenant_id)
config = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
).first()
if not config:
return {
"site_id": site_id,
"banner_title": "Cookie-Einstellungen",
"banner_description": "Wir verwenden Cookies, um Ihnen die bestmoegliche Erfahrung zu bieten.",
"categories": [],
"vendors": [],
}
categories = db.query(BannerCategoryConfigDB).filter(
BannerCategoryConfigDB.site_config_id == config.id,
BannerCategoryConfigDB.is_active == True,
).order_by(BannerCategoryConfigDB.sort_order).all()
vendors = db.query(BannerVendorConfigDB).filter(
BannerVendorConfigDB.site_config_id == config.id,
BannerVendorConfigDB.is_active == True,
).all()
result = _site_config_to_dict(config)
result["categories"] = [_category_to_dict(c) for c in categories]
result["vendors"] = [_vendor_to_dict(v) for v in vendors]
return result
@router.get("/consent/export")
async def export_consent(
site_id: str = Query(...),
device_fingerprint: str = Query(...),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""DSGVO export of all consent data for a device."""
tid = uuid.UUID(tenant_id)
consents = db.query(BannerConsentDB).filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
).all()
audit = db.query(BannerConsentAuditLogDB).filter(
BannerConsentAuditLogDB.tenant_id == tid,
BannerConsentAuditLogDB.site_id == site_id,
BannerConsentAuditLogDB.device_fingerprint == device_fingerprint,
).order_by(BannerConsentAuditLogDB.created_at.desc()).all()
return {
"device_fingerprint": device_fingerprint,
"site_id": site_id,
"consents": [_consent_to_dict(c) for c in consents],
"audit_trail": [
{
"id": str(a.id),
"action": a.action,
"categories": a.categories or [],
"created_at": a.created_at.isoformat() if a.created_at else None,
}
for a in audit
],
}
# =============================================================================
# Admin Endpoints
# =============================================================================
@router.get("/admin/stats/{site_id}")
async def get_site_stats(
site_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Consent statistics per site."""
tid = uuid.UUID(tenant_id)
base = db.query(BannerConsentDB).filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
)
total = base.count()
# Count category acceptance rates
category_stats = {}
all_consents = base.all()
for c in all_consents:
for cat in (c.categories or []):
category_stats[cat] = category_stats.get(cat, 0) + 1
return {
"site_id": site_id,
"total_consents": total,
"category_acceptance": {
cat: {"count": count, "rate": round(count / total * 100, 1) if total > 0 else 0}
for cat, count in category_stats.items()
},
}
@router.get("/admin/sites")
async def list_site_configs(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""List all site configurations."""
tid = uuid.UUID(tenant_id)
configs = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
).order_by(BannerSiteConfigDB.created_at.desc()).all()
return [_site_config_to_dict(c) for c in configs]
@router.post("/admin/sites")
async def create_site_config(
body: SiteConfigCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Create a site configuration."""
tid = uuid.UUID(tenant_id)
existing = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == body.site_id,
).first()
if existing:
raise HTTPException(status_code=409, detail=f"Site config for '{body.site_id}' already exists")
config = BannerSiteConfigDB(
tenant_id=tid,
site_id=body.site_id,
site_name=body.site_name,
site_url=body.site_url,
banner_title=body.banner_title or "Cookie-Einstellungen",
banner_description=body.banner_description,
privacy_url=body.privacy_url,
imprint_url=body.imprint_url,
dsb_name=body.dsb_name,
dsb_email=body.dsb_email,
theme=body.theme or {},
tcf_enabled=body.tcf_enabled,
)
db.add(config)
db.commit()
db.refresh(config)
return _site_config_to_dict(config)
@router.put("/admin/sites/{site_id}")
async def update_site_config(
site_id: str,
body: SiteConfigUpdate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Update a site configuration."""
tid = uuid.UUID(tenant_id)
config = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
).first()
if not config:
raise HTTPException(status_code=404, detail="Site config not found")
for field in ["site_name", "site_url", "banner_title", "banner_description",
"privacy_url", "imprint_url", "dsb_name", "dsb_email",
"theme", "tcf_enabled", "is_active"]:
val = getattr(body, field, None)
if val is not None:
setattr(config, field, val)
config.updated_at = datetime.utcnow()
db.commit()
db.refresh(config)
return _site_config_to_dict(config)
@router.delete("/admin/sites/{site_id}", status_code=204)
async def delete_site_config(
site_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Delete a site configuration."""
tid = uuid.UUID(tenant_id)
config = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
).first()
if not config:
raise HTTPException(status_code=404, detail="Site config not found")
db.delete(config)
db.commit()
# =============================================================================
# Admin Category Endpoints
# =============================================================================
@router.get("/admin/sites/{site_id}/categories")
async def list_categories(
site_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""List categories for a site."""
tid = uuid.UUID(tenant_id)
config = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
).first()
if not config:
raise HTTPException(status_code=404, detail="Site config not found")
cats = db.query(BannerCategoryConfigDB).filter(
BannerCategoryConfigDB.site_config_id == config.id,
).order_by(BannerCategoryConfigDB.sort_order).all()
return [_category_to_dict(c) for c in cats]
@router.post("/admin/sites/{site_id}/categories")
async def create_category(
site_id: str,
body: CategoryConfigCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Create a category for a site."""
tid = uuid.UUID(tenant_id)
config = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
).first()
if not config:
raise HTTPException(status_code=404, detail="Site config not found")
cat = BannerCategoryConfigDB(
site_config_id=config.id,
category_key=body.category_key,
name_de=body.name_de,
name_en=body.name_en,
description_de=body.description_de,
description_en=body.description_en,
is_required=body.is_required,
sort_order=body.sort_order,
)
db.add(cat)
db.commit()
db.refresh(cat)
return _category_to_dict(cat)
@router.delete("/admin/categories/{category_id}", status_code=204)
async def delete_category(
category_id: str,
db: Session = Depends(get_db),
):
"""Delete a category."""
try:
cid = uuid.UUID(category_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid category ID")
cat = db.query(BannerCategoryConfigDB).filter(BannerCategoryConfigDB.id == cid).first()
if not cat:
raise HTTPException(status_code=404, detail="Category not found")
db.delete(cat)
db.commit()
# =============================================================================
# Admin Vendor Endpoints
# =============================================================================
@router.get("/admin/sites/{site_id}/vendors")
async def list_vendors(
site_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""List vendors for a site."""
tid = uuid.UUID(tenant_id)
config = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
).first()
if not config:
raise HTTPException(status_code=404, detail="Site config not found")
vendors = db.query(BannerVendorConfigDB).filter(
BannerVendorConfigDB.site_config_id == config.id,
).all()
return [_vendor_to_dict(v) for v in vendors]
@router.post("/admin/sites/{site_id}/vendors")
async def create_vendor(
site_id: str,
body: VendorConfigCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Create a vendor for a site."""
tid = uuid.UUID(tenant_id)
config = db.query(BannerSiteConfigDB).filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
).first()
if not config:
raise HTTPException(status_code=404, detail="Site config not found")
vendor = BannerVendorConfigDB(
site_config_id=config.id,
vendor_name=body.vendor_name,
vendor_url=body.vendor_url,
category_key=body.category_key,
description_de=body.description_de,
description_en=body.description_en,
cookie_names=body.cookie_names,
retention_days=body.retention_days,
)
db.add(vendor)
db.commit()
db.refresh(vendor)
return _vendor_to_dict(vendor)
@router.delete("/admin/vendors/{vendor_id}", status_code=204)
async def delete_vendor(
vendor_id: str,
db: Session = Depends(get_db),
):
"""Delete a vendor."""
try:
vid = uuid.UUID(vendor_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid vendor ID")
vendor = db.query(BannerVendorConfigDB).filter(BannerVendorConfigDB.id == vid).first()
if not vendor:
raise HTTPException(status_code=404, detail="Vendor not found")
db.delete(vendor)
db.commit()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,825 @@
"""
E-Mail-Template Routes — Benachrichtigungsvorlagen fuer DSGVO-Compliance.
Verwaltet Templates fuer DSR, Consent, Breach, Vendor und Training E-Mails.
Inklusive Versionierung, Approval-Workflow, Vorschau und Send-Logging.
"""
import uuid
import re
from datetime import datetime
from typing import Optional, List, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import func
from classroom_engine.database import get_db
from ..db.email_template_models import (
EmailTemplateDB, EmailTemplateVersionDB, EmailTemplateApprovalDB,
EmailSendLogDB, EmailTemplateSettingsDB,
)
router = APIRouter(prefix="/email-templates", tags=["compliance-email-templates"])
DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# Template-Typen und zugehoerige Variablen
TEMPLATE_TYPES = {
"welcome": {"name": "Willkommen", "category": "general", "variables": ["user_name", "company_name", "login_url"]},
"verification": {"name": "E-Mail-Verifizierung", "category": "general", "variables": ["user_name", "verification_url", "expiry_hours"]},
"password_reset": {"name": "Passwort zuruecksetzen", "category": "general", "variables": ["user_name", "reset_url", "expiry_hours"]},
"dsr_receipt": {"name": "DSR Eingangsbestaetigung", "category": "dsr", "variables": ["requester_name", "reference_number", "request_type", "deadline"]},
"dsr_identity_request": {"name": "DSR Identitaetsanfrage", "category": "dsr", "variables": ["requester_name", "reference_number"]},
"dsr_completion": {"name": "DSR Abschluss", "category": "dsr", "variables": ["requester_name", "reference_number", "request_type", "completion_date"]},
"dsr_rejection": {"name": "DSR Ablehnung", "category": "dsr", "variables": ["requester_name", "reference_number", "rejection_reason", "legal_basis"]},
"dsr_extension": {"name": "DSR Fristverlaengerung", "category": "dsr", "variables": ["requester_name", "reference_number", "new_deadline", "extension_reason"]},
"consent_request": {"name": "Einwilligungsanfrage", "category": "consent", "variables": ["user_name", "purpose", "consent_url"]},
"consent_confirmation": {"name": "Einwilligungsbestaetigung", "category": "consent", "variables": ["user_name", "purpose", "consent_date"]},
"consent_withdrawal": {"name": "Widerruf bestaetigt", "category": "consent", "variables": ["user_name", "purpose", "withdrawal_date"]},
"consent_reminder": {"name": "Einwilligungs-Erinnerung", "category": "consent", "variables": ["user_name", "purpose", "expiry_date"]},
"breach_notification_authority": {"name": "Datenpanne Aufsichtsbehoerde", "category": "breach", "variables": ["incident_date", "incident_description", "affected_count", "measures_taken", "authority_name"]},
"breach_notification_affected": {"name": "Datenpanne Betroffene", "category": "breach", "variables": ["user_name", "incident_date", "incident_description", "measures_taken", "contact_info"]},
"breach_internal": {"name": "Datenpanne intern", "category": "breach", "variables": ["reporter_name", "incident_date", "incident_description", "severity"]},
"vendor_dpa_request": {"name": "AVV-Anfrage", "category": "vendor", "variables": ["vendor_name", "contact_name", "deadline", "requirements"]},
"vendor_review_reminder": {"name": "Vendor-Pruefung Erinnerung", "category": "vendor", "variables": ["vendor_name", "review_due_date", "last_review_date"]},
"training_invitation": {"name": "Schulungseinladung", "category": "training", "variables": ["user_name", "training_title", "training_date", "training_url"]},
"training_reminder": {"name": "Schulungs-Erinnerung", "category": "training", "variables": ["user_name", "training_title", "deadline"]},
"training_completion": {"name": "Schulung abgeschlossen", "category": "training", "variables": ["user_name", "training_title", "completion_date", "certificate_url"]},
}
VALID_STATUSES = ["draft", "review", "approved", "published"]
VALID_CATEGORIES = ["general", "dsr", "consent", "breach", "vendor", "training"]
# =============================================================================
# Pydantic Schemas
# =============================================================================
class TemplateCreate(BaseModel):
template_type: str
name: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
is_active: bool = True
class VersionCreate(BaseModel):
version: str = "1.0"
language: str = "de"
subject: str
body_html: str
body_text: Optional[str] = None
class VersionUpdate(BaseModel):
subject: Optional[str] = None
body_html: Optional[str] = None
body_text: Optional[str] = None
class PreviewRequest(BaseModel):
variables: Optional[Dict[str, str]] = None
class SendTestRequest(BaseModel):
recipient: str
variables: Optional[Dict[str, str]] = None
class SettingsUpdate(BaseModel):
sender_name: Optional[str] = None
sender_email: Optional[str] = None
reply_to: Optional[str] = None
logo_url: Optional[str] = None
primary_color: Optional[str] = None
secondary_color: Optional[str] = None
footer_text: Optional[str] = None
company_name: Optional[str] = None
company_address: Optional[str] = None
# =============================================================================
# Helpers
# =============================================================================
def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str:
return x_tenant_id or DEFAULT_TENANT
def _template_to_dict(t: EmailTemplateDB, latest_version=None) -> dict:
result = {
"id": str(t.id),
"tenant_id": str(t.tenant_id),
"template_type": t.template_type,
"name": t.name,
"description": t.description,
"category": t.category,
"is_active": t.is_active,
"sort_order": t.sort_order,
"variables": t.variables or [],
"created_at": t.created_at.isoformat() if t.created_at else None,
"updated_at": t.updated_at.isoformat() if t.updated_at else None,
}
if latest_version:
result["latest_version"] = _version_to_dict(latest_version)
return result
def _version_to_dict(v: EmailTemplateVersionDB) -> dict:
return {
"id": str(v.id),
"template_id": str(v.template_id),
"version": v.version,
"language": v.language,
"subject": v.subject,
"body_html": v.body_html,
"body_text": v.body_text,
"status": v.status,
"submitted_at": v.submitted_at.isoformat() if v.submitted_at else None,
"submitted_by": v.submitted_by,
"published_at": v.published_at.isoformat() if v.published_at else None,
"published_by": v.published_by,
"created_at": v.created_at.isoformat() if v.created_at else None,
"created_by": v.created_by,
}
def _render_template(html: str, variables: Dict[str, str]) -> str:
"""Replace {{variable}} placeholders with values."""
result = html
for key, value in variables.items():
result = result.replace(f"{{{{{key}}}}}", str(value))
return result
# =============================================================================
# Template Type Info (MUST be before parameterized routes)
# =============================================================================
@router.get("/types")
async def get_template_types():
"""Gibt alle verfuegbaren Template-Typen mit Variablen zurueck."""
return [
{
"type": ttype,
"name": info["name"],
"category": info["category"],
"variables": info["variables"],
}
for ttype, info in TEMPLATE_TYPES.items()
]
@router.get("/stats")
async def get_stats(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Statistiken ueber E-Mail-Templates."""
tid = uuid.UUID(tenant_id)
base = db.query(EmailTemplateDB).filter(EmailTemplateDB.tenant_id == tid)
total = base.count()
active = base.filter(EmailTemplateDB.is_active == True).count()
# Count templates with published versions
published_count = 0
templates = base.all()
for t in templates:
has_published = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.template_id == t.id,
EmailTemplateVersionDB.status == "published",
).count() > 0
if has_published:
published_count += 1
# By category
by_category = {}
for cat in VALID_CATEGORIES:
by_category[cat] = base.filter(EmailTemplateDB.category == cat).count()
# Send logs stats
total_sent = db.query(EmailSendLogDB).filter(EmailSendLogDB.tenant_id == tid).count()
return {
"total": total,
"active": active,
"published": published_count,
"draft": total - published_count,
"by_category": by_category,
"total_sent": total_sent,
}
@router.get("/settings")
async def get_settings(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Globale E-Mail-Einstellungen laden."""
tid = uuid.UUID(tenant_id)
settings = db.query(EmailTemplateSettingsDB).filter(
EmailTemplateSettingsDB.tenant_id == tid,
).first()
if not settings:
return {
"sender_name": "Datenschutzbeauftragter",
"sender_email": "datenschutz@example.de",
"reply_to": None,
"logo_url": None,
"primary_color": "#4F46E5",
"secondary_color": "#7C3AED",
"footer_text": "Datenschutzhinweis: Diese E-Mail enthaelt vertrauliche Informationen.",
"company_name": None,
"company_address": None,
}
return {
"sender_name": settings.sender_name,
"sender_email": settings.sender_email,
"reply_to": settings.reply_to,
"logo_url": settings.logo_url,
"primary_color": settings.primary_color,
"secondary_color": settings.secondary_color,
"footer_text": settings.footer_text,
"company_name": settings.company_name,
"company_address": settings.company_address,
}
@router.put("/settings")
async def update_settings(
body: SettingsUpdate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Globale E-Mail-Einstellungen speichern."""
tid = uuid.UUID(tenant_id)
settings = db.query(EmailTemplateSettingsDB).filter(
EmailTemplateSettingsDB.tenant_id == tid,
).first()
if not settings:
settings = EmailTemplateSettingsDB(tenant_id=tid)
db.add(settings)
for field in ["sender_name", "sender_email", "reply_to", "logo_url",
"primary_color", "secondary_color", "footer_text",
"company_name", "company_address"]:
val = getattr(body, field, None)
if val is not None:
setattr(settings, field, val)
settings.updated_at = datetime.utcnow()
db.commit()
db.refresh(settings)
return {
"sender_name": settings.sender_name,
"sender_email": settings.sender_email,
"reply_to": settings.reply_to,
"logo_url": settings.logo_url,
"primary_color": settings.primary_color,
"secondary_color": settings.secondary_color,
"footer_text": settings.footer_text,
"company_name": settings.company_name,
"company_address": settings.company_address,
}
@router.get("/logs")
async def get_send_logs(
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
template_type: Optional[str] = Query(None),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Send-Logs (paginiert)."""
tid = uuid.UUID(tenant_id)
query = db.query(EmailSendLogDB).filter(EmailSendLogDB.tenant_id == tid)
if template_type:
query = query.filter(EmailSendLogDB.template_type == template_type)
total = query.count()
logs = query.order_by(EmailSendLogDB.sent_at.desc()).offset(offset).limit(limit).all()
return {
"logs": [
{
"id": str(l.id),
"template_type": l.template_type,
"recipient": l.recipient,
"subject": l.subject,
"status": l.status,
"variables": l.variables or {},
"error_message": l.error_message,
"sent_at": l.sent_at.isoformat() if l.sent_at else None,
}
for l in logs
],
"total": total,
"limit": limit,
"offset": offset,
}
@router.post("/initialize")
async def initialize_defaults(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Default-Templates fuer einen Tenant initialisieren."""
tid = uuid.UUID(tenant_id)
existing = db.query(EmailTemplateDB).filter(EmailTemplateDB.tenant_id == tid).count()
if existing > 0:
return {"message": "Templates already initialized", "count": existing}
created = 0
for idx, (ttype, info) in enumerate(TEMPLATE_TYPES.items()):
t = EmailTemplateDB(
tenant_id=tid,
template_type=ttype,
name=info["name"],
category=info["category"],
sort_order=idx * 10,
variables=info["variables"],
)
db.add(t)
created += 1
db.commit()
return {"message": f"{created} templates created", "count": created}
@router.get("/default/{template_type}")
async def get_default_content(template_type: str):
"""Default-Content fuer einen Template-Typ."""
if template_type not in TEMPLATE_TYPES:
raise HTTPException(status_code=404, detail=f"Unknown template type: {template_type}")
info = TEMPLATE_TYPES[template_type]
vars_html = " ".join([f'<span style="color:#4F46E5">{{{{{v}}}}}</span>' for v in info["variables"]])
return {
"template_type": template_type,
"name": info["name"],
"category": info["category"],
"variables": info["variables"],
"default_subject": f"{info['name']} - {{{{company_name}}}}",
"default_body_html": f"<p>Sehr geehrte(r) {{{{user_name}}}},</p>\n<p>[Inhalt hier einfuegen]</p>\n<p>Verfuegbare Variablen: {vars_html}</p>\n<p>Mit freundlichen Gruessen<br/>{{{{sender_name}}}}</p>",
}
# =============================================================================
# Template CRUD (MUST be before /{id} parameterized routes)
# =============================================================================
@router.get("")
async def list_templates(
category: Optional[str] = Query(None),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Alle Templates mit letzter publizierter Version."""
tid = uuid.UUID(tenant_id)
query = db.query(EmailTemplateDB).filter(EmailTemplateDB.tenant_id == tid)
if category:
query = query.filter(EmailTemplateDB.category == category)
templates = query.order_by(EmailTemplateDB.sort_order).all()
result = []
for t in templates:
latest = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.template_id == t.id,
).order_by(EmailTemplateVersionDB.created_at.desc()).first()
result.append(_template_to_dict(t, latest))
return result
@router.post("")
async def create_template(
body: TemplateCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Template erstellen."""
if body.template_type not in TEMPLATE_TYPES:
raise HTTPException(status_code=400, detail=f"Unknown template type: {body.template_type}")
tid = uuid.UUID(tenant_id)
existing = db.query(EmailTemplateDB).filter(
EmailTemplateDB.tenant_id == tid,
EmailTemplateDB.template_type == body.template_type,
).first()
if existing:
raise HTTPException(status_code=409, detail=f"Template type '{body.template_type}' already exists")
info = TEMPLATE_TYPES[body.template_type]
t = EmailTemplateDB(
tenant_id=tid,
template_type=body.template_type,
name=body.name or info["name"],
description=body.description,
category=body.category or info["category"],
is_active=body.is_active,
variables=info["variables"],
)
db.add(t)
db.commit()
db.refresh(t)
return _template_to_dict(t)
# =============================================================================
# Version Management (static paths before parameterized)
# =============================================================================
@router.post("/versions")
async def create_version(
body: VersionCreate,
template_id: str = Query(..., alias="template_id"),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Neue Version erstellen (via query param template_id)."""
try:
tid = uuid.UUID(template_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid template ID")
template = db.query(EmailTemplateDB).filter(
EmailTemplateDB.id == tid,
EmailTemplateDB.tenant_id == uuid.UUID(tenant_id),
).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
v = EmailTemplateVersionDB(
template_id=tid,
version=body.version,
language=body.language,
subject=body.subject,
body_html=body.body_html,
body_text=body.body_text,
status="draft",
)
db.add(v)
db.commit()
db.refresh(v)
return _version_to_dict(v)
# =============================================================================
# Single Template (parameterized — after all static paths)
# =============================================================================
@router.get("/{template_id}")
async def get_template(
template_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Template-Detail."""
try:
tid = uuid.UUID(template_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid template ID")
t = db.query(EmailTemplateDB).filter(
EmailTemplateDB.id == tid,
EmailTemplateDB.tenant_id == uuid.UUID(tenant_id),
).first()
if not t:
raise HTTPException(status_code=404, detail="Template not found")
latest = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.template_id == t.id,
).order_by(EmailTemplateVersionDB.created_at.desc()).first()
return _template_to_dict(t, latest)
@router.get("/{template_id}/versions")
async def get_versions(
template_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Versionen eines Templates."""
try:
tid = uuid.UUID(template_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid template ID")
template = db.query(EmailTemplateDB).filter(
EmailTemplateDB.id == tid,
EmailTemplateDB.tenant_id == uuid.UUID(tenant_id),
).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
versions = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.template_id == tid,
).order_by(EmailTemplateVersionDB.created_at.desc()).all()
return [_version_to_dict(v) for v in versions]
@router.post("/{template_id}/versions")
async def create_version_for_template(
template_id: str,
body: VersionCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Neue Version fuer ein Template erstellen."""
try:
tid = uuid.UUID(template_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid template ID")
template = db.query(EmailTemplateDB).filter(
EmailTemplateDB.id == tid,
EmailTemplateDB.tenant_id == uuid.UUID(tenant_id),
).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
v = EmailTemplateVersionDB(
template_id=tid,
version=body.version,
language=body.language,
subject=body.subject,
body_html=body.body_html,
body_text=body.body_text,
status="draft",
)
db.add(v)
db.commit()
db.refresh(v)
return _version_to_dict(v)
# =============================================================================
# Version Workflow (parameterized by version_id)
# =============================================================================
@router.get("/versions/{version_id}")
async def get_version(
version_id: str,
db: Session = Depends(get_db),
):
"""Version-Detail."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
return _version_to_dict(v)
@router.put("/versions/{version_id}")
async def update_version(
version_id: str,
body: VersionUpdate,
db: Session = Depends(get_db),
):
"""Draft aktualisieren."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
if v.status != "draft":
raise HTTPException(status_code=400, detail="Only draft versions can be edited")
if body.subject is not None:
v.subject = body.subject
if body.body_html is not None:
v.body_html = body.body_html
if body.body_text is not None:
v.body_text = body.body_text
db.commit()
db.refresh(v)
return _version_to_dict(v)
@router.post("/versions/{version_id}/submit")
async def submit_version(
version_id: str,
db: Session = Depends(get_db),
):
"""Zur Pruefung einreichen."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
if v.status != "draft":
raise HTTPException(status_code=400, detail="Only draft versions can be submitted")
v.status = "review"
v.submitted_at = datetime.utcnow()
v.submitted_by = "admin"
db.commit()
db.refresh(v)
return _version_to_dict(v)
@router.post("/versions/{version_id}/approve")
async def approve_version(
version_id: str,
comment: Optional[str] = None,
db: Session = Depends(get_db),
):
"""Genehmigen."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
if v.status != "review":
raise HTTPException(status_code=400, detail="Only review versions can be approved")
v.status = "approved"
approval = EmailTemplateApprovalDB(
version_id=vid,
action="approve",
comment=comment,
approved_by="admin",
)
db.add(approval)
db.commit()
db.refresh(v)
return _version_to_dict(v)
@router.post("/versions/{version_id}/reject")
async def reject_version(
version_id: str,
comment: Optional[str] = None,
db: Session = Depends(get_db),
):
"""Ablehnen."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
if v.status != "review":
raise HTTPException(status_code=400, detail="Only review versions can be rejected")
v.status = "draft" # Back to draft
approval = EmailTemplateApprovalDB(
version_id=vid,
action="reject",
comment=comment,
approved_by="admin",
)
db.add(approval)
db.commit()
db.refresh(v)
return _version_to_dict(v)
@router.post("/versions/{version_id}/publish")
async def publish_version(
version_id: str,
db: Session = Depends(get_db),
):
"""Publizieren."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
if v.status not in ("approved", "review", "draft"):
raise HTTPException(status_code=400, detail="Version cannot be published")
now = datetime.utcnow()
v.status = "published"
v.published_at = now
v.published_by = "admin"
db.commit()
db.refresh(v)
return _version_to_dict(v)
@router.post("/versions/{version_id}/preview")
async def preview_version(
version_id: str,
body: PreviewRequest,
db: Session = Depends(get_db),
):
"""Vorschau mit Test-Variablen."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
variables = body.variables or {}
# Fill in defaults for missing variables
template = db.query(EmailTemplateDB).filter(
EmailTemplateDB.id == v.template_id,
).first()
if template and template.variables:
for var in template.variables:
if var not in variables:
variables[var] = f"[{var}]"
rendered_subject = _render_template(v.subject, variables)
rendered_html = _render_template(v.body_html, variables)
return {
"subject": rendered_subject,
"body_html": rendered_html,
"variables_used": variables,
}
@router.post("/versions/{version_id}/send-test")
async def send_test_email(
version_id: str,
body: SendTestRequest,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Test-E-Mail senden (Simulation — loggt nur)."""
try:
vid = uuid.UUID(version_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid version ID")
v = db.query(EmailTemplateVersionDB).filter(
EmailTemplateVersionDB.id == vid,
).first()
if not v:
raise HTTPException(status_code=404, detail="Version not found")
template = db.query(EmailTemplateDB).filter(
EmailTemplateDB.id == v.template_id,
).first()
variables = body.variables or {}
rendered_subject = _render_template(v.subject, variables)
# Log the send attempt
log = EmailSendLogDB(
tenant_id=uuid.UUID(tenant_id),
template_type=template.template_type if template else "unknown",
version_id=vid,
recipient=body.recipient,
subject=rendered_subject,
status="test_sent",
variables=variables,
)
db.add(log)
db.commit()
return {
"success": True,
"message": f"Test-E-Mail an {body.recipient} gesendet (Simulation)",
"subject": rendered_subject,
}

View File

@@ -1,27 +1,18 @@
"""
FastAPI routes for Legal Documents — Rechtliche Texte mit Versionierung und Approval-Workflow.
Endpoints:
GET /legal-documents/documents — Liste aller Dokumente
POST /legal-documents/documents — Dokument erstellen
GET /legal-documents/documents/{id}/versions — Versionen eines Dokuments
POST /legal-documents/versions — Neue Version erstellen
PUT /legal-documents/versions/{id} — Version aktualisieren
POST /legal-documents/versions/upload-word — DOCX → HTML
POST /legal-documents/versions/{id}/submit-review — Status: draft → review
POST /legal-documents/versions/{id}/approve — Status: review → approved
POST /legal-documents/versions/{id}/reject — Status: review → rejected
POST /legal-documents/versions/{id}/publish — Status: approved → published
GET /legal-documents/versions/{id}/approval-history — Approval-Audit-Trail
Extended with: Public endpoints, User Consents, Consent Audit Log, Cookie Categories.
"""
import uuid as uuid_mod
import logging
from datetime import datetime
from typing import Optional, List, Any, Dict
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File
from fastapi import APIRouter, Depends, HTTPException, Query, Header, UploadFile, File
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import func
from classroom_engine.database import get_db
from ..db.legal_document_models import (
@@ -29,10 +20,21 @@ from ..db.legal_document_models import (
LegalDocumentVersionDB,
LegalDocumentApprovalDB,
)
from ..db.legal_document_extend_models import (
UserConsentDB,
ConsentAuditLogDB,
CookieCategoryDB,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/legal-documents", tags=["legal-documents"])
DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str:
return x_tenant_id or DEFAULT_TENANT
# ============================================================================
# Pydantic Schemas
@@ -432,3 +434,500 @@ async def get_approval_history(version_id: str, db: Session = Depends(get_db)):
)
for e in entries
]
# ============================================================================
# Extended Schemas
# ============================================================================
class UserConsentCreate(BaseModel):
user_id: str
document_id: str
document_version_id: Optional[str] = None
document_type: str
consented: bool = True
ip_address: Optional[str] = None
user_agent: Optional[str] = None
class CookieCategoryCreate(BaseModel):
name_de: str
name_en: Optional[str] = None
description_de: Optional[str] = None
description_en: Optional[str] = None
is_required: bool = False
sort_order: int = 0
class CookieCategoryUpdate(BaseModel):
name_de: Optional[str] = None
name_en: Optional[str] = None
description_de: Optional[str] = None
description_en: Optional[str] = None
is_required: Optional[bool] = None
sort_order: Optional[int] = None
is_active: Optional[bool] = None
# ============================================================================
# Extended Helpers
# ============================================================================
def _log_consent_audit(
db: Session,
tenant_id,
action: str,
entity_type: str,
entity_id=None,
user_id: Optional[str] = None,
details: Optional[dict] = None,
ip_address: Optional[str] = None,
):
entry = ConsentAuditLogDB(
tenant_id=tenant_id,
action=action,
entity_type=entity_type,
entity_id=entity_id,
user_id=user_id,
details=details or {},
ip_address=ip_address,
)
db.add(entry)
return entry
def _consent_to_dict(c: UserConsentDB) -> dict:
return {
"id": str(c.id),
"tenant_id": str(c.tenant_id),
"user_id": c.user_id,
"document_id": str(c.document_id),
"document_version_id": str(c.document_version_id) if c.document_version_id else None,
"document_type": c.document_type,
"consented": c.consented,
"ip_address": c.ip_address,
"user_agent": c.user_agent,
"consented_at": c.consented_at.isoformat() if c.consented_at else None,
"withdrawn_at": c.withdrawn_at.isoformat() if c.withdrawn_at else None,
"created_at": c.created_at.isoformat() if c.created_at else None,
}
def _cookie_cat_to_dict(c: CookieCategoryDB) -> dict:
return {
"id": str(c.id),
"tenant_id": str(c.tenant_id),
"name_de": c.name_de,
"name_en": c.name_en,
"description_de": c.description_de,
"description_en": c.description_en,
"is_required": c.is_required,
"sort_order": c.sort_order,
"is_active": c.is_active,
"created_at": c.created_at.isoformat() if c.created_at else None,
"updated_at": c.updated_at.isoformat() if c.updated_at else None,
}
# ============================================================================
# Public Endpoints (for end users)
# ============================================================================
@router.get("/public")
async def list_public_documents(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Active documents for end-user display."""
docs = (
db.query(LegalDocumentDB)
.filter(LegalDocumentDB.tenant_id == tenant_id)
.order_by(LegalDocumentDB.created_at.desc())
.all()
)
result = []
for doc in docs:
# Find latest published version
published = (
db.query(LegalDocumentVersionDB)
.filter(
LegalDocumentVersionDB.document_id == doc.id,
LegalDocumentVersionDB.status == "published",
)
.order_by(LegalDocumentVersionDB.created_at.desc())
.first()
)
if published:
result.append({
"id": str(doc.id),
"type": doc.type,
"name": doc.name,
"version": published.version,
"title": published.title,
"content": published.content,
"language": published.language,
"published_at": published.approved_at.isoformat() if published.approved_at else None,
})
return result
@router.get("/public/{document_type}/latest")
async def get_latest_published(
document_type: str,
language: str = Query("de"),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Get the latest published version of a document type."""
doc = (
db.query(LegalDocumentDB)
.filter(
LegalDocumentDB.tenant_id == tenant_id,
LegalDocumentDB.type == document_type,
)
.first()
)
if not doc:
raise HTTPException(status_code=404, detail=f"No document of type '{document_type}' found")
version = (
db.query(LegalDocumentVersionDB)
.filter(
LegalDocumentVersionDB.document_id == doc.id,
LegalDocumentVersionDB.status == "published",
LegalDocumentVersionDB.language == language,
)
.order_by(LegalDocumentVersionDB.created_at.desc())
.first()
)
if not version:
raise HTTPException(status_code=404, detail=f"No published version for type '{document_type}' in language '{language}'")
return {
"document_id": str(doc.id),
"type": doc.type,
"name": doc.name,
"version_id": str(version.id),
"version": version.version,
"title": version.title,
"content": version.content,
"language": version.language,
}
# ============================================================================
# User Consents
# ============================================================================
@router.post("/consents")
async def record_consent(
body: UserConsentCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Record user consent for a legal document."""
tid = uuid_mod.UUID(tenant_id)
doc_id = uuid_mod.UUID(body.document_id)
doc = db.query(LegalDocumentDB).filter(LegalDocumentDB.id == doc_id).first()
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
consent = UserConsentDB(
tenant_id=tid,
user_id=body.user_id,
document_id=doc_id,
document_version_id=uuid_mod.UUID(body.document_version_id) if body.document_version_id else None,
document_type=body.document_type,
consented=body.consented,
ip_address=body.ip_address,
user_agent=body.user_agent,
)
db.add(consent)
db.flush()
_log_consent_audit(
db, tid, "consent_given", "user_consent",
entity_id=consent.id, user_id=body.user_id,
details={"document_type": body.document_type, "document_id": body.document_id},
ip_address=body.ip_address,
)
db.commit()
db.refresh(consent)
return _consent_to_dict(consent)
@router.get("/consents/my")
async def get_my_consents(
user_id: str = Query(...),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Get all consents for a specific user."""
tid = uuid_mod.UUID(tenant_id)
consents = (
db.query(UserConsentDB)
.filter(
UserConsentDB.tenant_id == tid,
UserConsentDB.user_id == user_id,
UserConsentDB.withdrawn_at == None,
)
.order_by(UserConsentDB.consented_at.desc())
.all()
)
return [_consent_to_dict(c) for c in consents]
@router.get("/consents/check/{document_type}")
async def check_consent(
document_type: str,
user_id: str = Query(...),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Check if user has active consent for a document type."""
tid = uuid_mod.UUID(tenant_id)
consent = (
db.query(UserConsentDB)
.filter(
UserConsentDB.tenant_id == tid,
UserConsentDB.user_id == user_id,
UserConsentDB.document_type == document_type,
UserConsentDB.consented == True,
UserConsentDB.withdrawn_at == None,
)
.order_by(UserConsentDB.consented_at.desc())
.first()
)
return {
"has_consent": consent is not None,
"consent": _consent_to_dict(consent) if consent else None,
}
@router.delete("/consents/{consent_id}")
async def withdraw_consent(
consent_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Withdraw a consent (DSGVO Art. 7 Abs. 3)."""
tid = uuid_mod.UUID(tenant_id)
try:
cid = uuid_mod.UUID(consent_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid consent ID")
consent = db.query(UserConsentDB).filter(
UserConsentDB.id == cid,
UserConsentDB.tenant_id == tid,
).first()
if not consent:
raise HTTPException(status_code=404, detail="Consent not found")
if consent.withdrawn_at:
raise HTTPException(status_code=400, detail="Consent already withdrawn")
consent.withdrawn_at = datetime.utcnow()
consent.consented = False
_log_consent_audit(
db, tid, "consent_withdrawn", "user_consent",
entity_id=cid, user_id=consent.user_id,
details={"document_type": consent.document_type},
)
db.commit()
db.refresh(consent)
return _consent_to_dict(consent)
# ============================================================================
# Consent Statistics
# ============================================================================
@router.get("/stats/consents")
async def get_consent_stats(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Consent statistics for dashboard."""
tid = uuid_mod.UUID(tenant_id)
base = db.query(UserConsentDB).filter(UserConsentDB.tenant_id == tid)
total = base.count()
active = base.filter(
UserConsentDB.consented == True,
UserConsentDB.withdrawn_at == None,
).count()
withdrawn = base.filter(UserConsentDB.withdrawn_at != None).count()
# By document type
by_type = {}
type_counts = (
db.query(UserConsentDB.document_type, func.count(UserConsentDB.id))
.filter(UserConsentDB.tenant_id == tid)
.group_by(UserConsentDB.document_type)
.all()
)
for dtype, count in type_counts:
by_type[dtype] = count
# Unique users
unique_users = (
db.query(func.count(func.distinct(UserConsentDB.user_id)))
.filter(UserConsentDB.tenant_id == tid)
.scalar()
) or 0
return {
"total": total,
"active": active,
"withdrawn": withdrawn,
"unique_users": unique_users,
"by_type": by_type,
}
# ============================================================================
# Audit Log
# ============================================================================
@router.get("/audit-log")
async def get_audit_log(
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
action: Optional[str] = Query(None),
entity_type: Optional[str] = Query(None),
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Consent audit trail (paginated)."""
tid = uuid_mod.UUID(tenant_id)
query = db.query(ConsentAuditLogDB).filter(ConsentAuditLogDB.tenant_id == tid)
if action:
query = query.filter(ConsentAuditLogDB.action == action)
if entity_type:
query = query.filter(ConsentAuditLogDB.entity_type == entity_type)
total = query.count()
entries = query.order_by(ConsentAuditLogDB.created_at.desc()).offset(offset).limit(limit).all()
return {
"entries": [
{
"id": str(e.id),
"action": e.action,
"entity_type": e.entity_type,
"entity_id": str(e.entity_id) if e.entity_id else None,
"user_id": e.user_id,
"details": e.details or {},
"ip_address": e.ip_address,
"created_at": e.created_at.isoformat() if e.created_at else None,
}
for e in entries
],
"total": total,
"limit": limit,
"offset": offset,
}
# ============================================================================
# Cookie Categories CRUD
# ============================================================================
@router.get("/cookie-categories")
async def list_cookie_categories(
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""List all cookie categories."""
tid = uuid_mod.UUID(tenant_id)
cats = (
db.query(CookieCategoryDB)
.filter(CookieCategoryDB.tenant_id == tid)
.order_by(CookieCategoryDB.sort_order)
.all()
)
return [_cookie_cat_to_dict(c) for c in cats]
@router.post("/cookie-categories")
async def create_cookie_category(
body: CookieCategoryCreate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Create a cookie category."""
tid = uuid_mod.UUID(tenant_id)
cat = CookieCategoryDB(
tenant_id=tid,
name_de=body.name_de,
name_en=body.name_en,
description_de=body.description_de,
description_en=body.description_en,
is_required=body.is_required,
sort_order=body.sort_order,
)
db.add(cat)
db.commit()
db.refresh(cat)
return _cookie_cat_to_dict(cat)
@router.put("/cookie-categories/{category_id}")
async def update_cookie_category(
category_id: str,
body: CookieCategoryUpdate,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Update a cookie category."""
tid = uuid_mod.UUID(tenant_id)
try:
cid = uuid_mod.UUID(category_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid category ID")
cat = db.query(CookieCategoryDB).filter(
CookieCategoryDB.id == cid,
CookieCategoryDB.tenant_id == tid,
).first()
if not cat:
raise HTTPException(status_code=404, detail="Cookie category not found")
for field in ["name_de", "name_en", "description_de", "description_en",
"is_required", "sort_order", "is_active"]:
val = getattr(body, field, None)
if val is not None:
setattr(cat, field, val)
cat.updated_at = datetime.utcnow()
db.commit()
db.refresh(cat)
return _cookie_cat_to_dict(cat)
@router.delete("/cookie-categories/{category_id}", status_code=204)
async def delete_cookie_category(
category_id: str,
tenant_id: str = Depends(_get_tenant),
db: Session = Depends(get_db),
):
"""Delete a cookie category."""
tid = uuid_mod.UUID(tenant_id)
try:
cid = uuid_mod.UUID(category_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid category ID")
cat = db.query(CookieCategoryDB).filter(
CookieCategoryDB.id == cid,
CookieCategoryDB.tenant_id == tid,
).first()
if not cat:
raise HTTPException(status_code=404, detail="Cookie category not found")
db.delete(cat)
db.commit()