diff --git a/backend-compliance/compliance/api/dsr_routes.py b/backend-compliance/compliance/api/dsr_routes.py index 506c1e4..45503d6 100644 --- a/backend-compliance/compliance/api/dsr_routes.py +++ b/backend-compliance/compliance/api/dsr_routes.py @@ -1,321 +1,79 @@ """ DSR (Data Subject Request) Routes — Betroffenenanfragen nach DSGVO Art. 15-21. -Native Python/FastAPI Implementierung, ersetzt Go consent-service Proxy. +Phase 1 Step 4 refactor: thin handlers delegate to DSRService (CRUD/stats/ +export/deadlines) and DSRWorkflowService (status/identity/assign/complete/ +reject/communications/exception-checks/templates). """ -import io -import csv -import uuid -from datetime import datetime, timedelta, timezone -from typing import Optional, List, Dict, Any +from typing import Optional -from fastapi import APIRouter, Depends, HTTPException, Query, Header +from fastapi import APIRouter, Depends, Query, Header from fastapi.responses import StreamingResponse -from pydantic import BaseModel from sqlalchemy.orm import Session -from sqlalchemy import text, func, and_, or_ from classroom_engine.database import get_db -from ..db.dsr_models import ( - DSRRequestDB, DSRStatusHistoryDB, DSRCommunicationDB, - DSRTemplateDB, DSRTemplateVersionDB, DSRExceptionCheckDB, +from compliance.api._http_errors import translate_domain_errors +from compliance.schemas.dsr import ( + AssignRequest, + CompleteDSR, + CreateTemplateVersion, + DSRCreate, + DSRUpdate, + ExtendDeadline, + RejectDSR, + SendCommunication, + StatusChange, + UpdateExceptionCheck, + VerifyIdentity, ) +from compliance.services.dsr_service import ( + ART17_EXCEPTIONS, + DEFAULT_TENANT, + DEADLINE_DAYS, + DSRService, + VALID_PRIORITIES, + VALID_REQUEST_TYPES, + VALID_SOURCES, + VALID_STATUSES, + _dsr_to_dict, + _generate_request_number, + _get_dsr_or_404, + _record_history, +) +from compliance.services.dsr_workflow_service import DSRWorkflowService router = APIRouter(prefix="/dsr", tags=["compliance-dsr"]) -# Default-Tenant -DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" -# Art. 17(3) Ausnahmen -ART17_EXCEPTIONS = [ - { - "check_code": "art17_3_a", - "article": "17(3)(a)", - "label": "Meinungs- und Informationsfreiheit", - "description": "Ausuebung des Rechts auf freie Meinungsaeusserung und Information", - }, - { - "check_code": "art17_3_b", - "article": "17(3)(b)", - "label": "Rechtliche Verpflichtung", - "description": "Erfuellung einer rechtlichen Verpflichtung (z.B. Aufbewahrungspflichten)", - }, - { - "check_code": "art17_3_c", - "article": "17(3)(c)", - "label": "Oeffentliches Interesse", - "description": "Gruende des oeffentlichen Interesses im Bereich Gesundheit", - }, - { - "check_code": "art17_3_d", - "article": "17(3)(d)", - "label": "Archivzwecke", - "description": "Archivzwecke, wissenschaftliche/historische Forschung, Statistik", - }, - { - "check_code": "art17_3_e", - "article": "17(3)(e)", - "label": "Rechtsansprueche", - "description": "Geltendmachung, Ausuebung oder Verteidigung von Rechtsanspruechen", - }, -] - -VALID_REQUEST_TYPES = ["access", "rectification", "erasure", "restriction", "portability", "objection"] -VALID_STATUSES = ["intake", "identity_verification", "processing", "completed", "rejected", "cancelled"] -VALID_PRIORITIES = ["low", "normal", "high", "critical"] -VALID_SOURCES = ["web_form", "email", "letter", "phone", "in_person", "other"] - -# Deadline-Tage pro Typ (DSGVO Art. 12 Abs. 3) -DEADLINE_DAYS = { - "access": 30, - "rectification": 14, - "erasure": 14, - "restriction": 14, - "portability": 30, - "objection": 30, -} - - -# ============================================================================= -# Pydantic Schemas -# ============================================================================= - -class DSRCreate(BaseModel): - request_type: str = "access" - requester_name: str - requester_email: str - requester_phone: Optional[str] = None - requester_address: Optional[str] = None - requester_customer_id: Optional[str] = None - source: str = "email" - source_details: Optional[str] = None - request_text: Optional[str] = None - priority: Optional[str] = "normal" - notes: Optional[str] = None - - -class DSRUpdate(BaseModel): - priority: Optional[str] = None - notes: Optional[str] = None - internal_notes: Optional[str] = None - assigned_to: Optional[str] = None - request_text: Optional[str] = None - affected_systems: Optional[List[str]] = None - erasure_checklist: Optional[List[Dict[str, Any]]] = None - rectification_details: Optional[Dict[str, Any]] = None - objection_details: Optional[Dict[str, Any]] = None - - -class StatusChange(BaseModel): - status: str - comment: Optional[str] = None - - -class VerifyIdentity(BaseModel): - method: str - notes: Optional[str] = None - document_ref: Optional[str] = None - - -class AssignRequest(BaseModel): - assignee_id: str - - -class ExtendDeadline(BaseModel): - reason: str - days: Optional[int] = 60 - - -class CompleteDSR(BaseModel): - summary: Optional[str] = None - result_data: Optional[Dict[str, Any]] = None - - -class RejectDSR(BaseModel): - reason: str - legal_basis: Optional[str] = None - - -class SendCommunication(BaseModel): - communication_type: str = "outgoing" - channel: str = "email" - subject: Optional[str] = None - content: str - template_used: Optional[str] = None - - -class UpdateExceptionCheck(BaseModel): - applies: bool - notes: Optional[str] = None - - -class CreateTemplateVersion(BaseModel): - version: str = "1.0" - language: Optional[str] = "de" - subject: str - body_html: str - body_text: Optional[str] = None - - -# ============================================================================= -# Helpers -# ============================================================================= +# --------------------------------------------------------------------------- +# DI helpers +# --------------------------------------------------------------------------- def _get_tenant(x_tenant_id: Optional[str] = Header(None, alias='X-Tenant-ID')) -> str: return x_tenant_id or DEFAULT_TENANT -def _generate_request_number(db: Session, tenant_id: str) -> str: - """Generate next request number: DSR-YYYY-NNNNNN""" - year = datetime.now(timezone.utc).year - try: - result = db.execute(text("SELECT nextval('compliance_dsr_request_number_seq')")) - seq = result.scalar() - except Exception: - # Fallback for non-PostgreSQL (e.g. SQLite tests): count existing + 1 - count = db.query(DSRRequestDB).count() - seq = count + 1 - return f"DSR-{year}-{str(seq).zfill(6)}" +def _dsr_svc(db: Session = Depends(get_db)) -> DSRService: + return DSRService(db) -def _record_history(db: Session, dsr: DSRRequestDB, new_status: str, changed_by: str = "system", comment: str = None): - """Record status change in history.""" - entry = DSRStatusHistoryDB( - tenant_id=dsr.tenant_id, - dsr_id=dsr.id, - previous_status=dsr.status, - new_status=new_status, - changed_by=changed_by, - comment=comment, - ) - db.add(entry) +def _wf_svc(db: Session = Depends(get_db)) -> DSRWorkflowService: + return DSRWorkflowService(db) -def _dsr_to_dict(dsr: DSRRequestDB) -> dict: - """Convert DSR DB record to API response dict.""" - return { - "id": str(dsr.id), - "tenant_id": str(dsr.tenant_id), - "request_number": dsr.request_number, - "request_type": dsr.request_type, - "status": dsr.status, - "priority": dsr.priority, - "requester_name": dsr.requester_name, - "requester_email": dsr.requester_email, - "requester_phone": dsr.requester_phone, - "requester_address": dsr.requester_address, - "requester_customer_id": dsr.requester_customer_id, - "source": dsr.source, - "source_details": dsr.source_details, - "request_text": dsr.request_text, - "notes": dsr.notes, - "internal_notes": dsr.internal_notes, - "received_at": dsr.received_at.isoformat() if dsr.received_at else None, - "deadline_at": dsr.deadline_at.isoformat() if dsr.deadline_at else None, - "extended_deadline_at": dsr.extended_deadline_at.isoformat() if dsr.extended_deadline_at else None, - "extension_reason": dsr.extension_reason, - "extension_approved_by": dsr.extension_approved_by, - "extension_approved_at": dsr.extension_approved_at.isoformat() if dsr.extension_approved_at else None, - "identity_verified": dsr.identity_verified, - "verification_method": dsr.verification_method, - "verified_at": dsr.verified_at.isoformat() if dsr.verified_at else None, - "verified_by": dsr.verified_by, - "verification_notes": dsr.verification_notes, - "verification_document_ref": dsr.verification_document_ref, - "assigned_to": dsr.assigned_to, - "assigned_at": dsr.assigned_at.isoformat() if dsr.assigned_at else None, - "assigned_by": dsr.assigned_by, - "completed_at": dsr.completed_at.isoformat() if dsr.completed_at else None, - "completion_notes": dsr.completion_notes, - "rejection_reason": dsr.rejection_reason, - "rejection_legal_basis": dsr.rejection_legal_basis, - "erasure_checklist": dsr.erasure_checklist or [], - "data_export": dsr.data_export or {}, - "rectification_details": dsr.rectification_details or {}, - "objection_details": dsr.objection_details or {}, - "affected_systems": dsr.affected_systems or [], - "created_at": dsr.created_at.isoformat() if dsr.created_at else None, - "updated_at": dsr.updated_at.isoformat() if dsr.updated_at else None, - "created_by": dsr.created_by, - "updated_by": dsr.updated_by, - } - - -def _get_dsr_or_404(db: Session, dsr_id: str, tenant_id: str) -> DSRRequestDB: - """Get DSR by ID or raise 404.""" - try: - uid = uuid.UUID(dsr_id) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid DSR ID format") - dsr = db.query(DSRRequestDB).filter( - DSRRequestDB.id == uid, - DSRRequestDB.tenant_id == uuid.UUID(tenant_id), - ).first() - if not dsr: - raise HTTPException(status_code=404, detail="DSR not found") - return dsr - - -# ============================================================================= -# DSR CRUD Endpoints -# ============================================================================= +# --------------------------------------------------------------------------- +# CRUD +# --------------------------------------------------------------------------- @router.post("") async def create_dsr( body: DSRCreate, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Erstellt eine neue Betroffenenanfrage.""" - if body.request_type not in VALID_REQUEST_TYPES: - raise HTTPException(status_code=400, detail=f"Invalid request_type. Must be one of: {VALID_REQUEST_TYPES}") - if body.source not in VALID_SOURCES: - raise HTTPException(status_code=400, detail=f"Invalid source. Must be one of: {VALID_SOURCES}") - if body.priority and body.priority not in VALID_PRIORITIES: - raise HTTPException(status_code=400, detail=f"Invalid priority. Must be one of: {VALID_PRIORITIES}") - - now = datetime.now(timezone.utc) - deadline_days = DEADLINE_DAYS.get(body.request_type, 30) - request_number = _generate_request_number(db, tenant_id) - - dsr = DSRRequestDB( - tenant_id=uuid.UUID(tenant_id), - request_number=request_number, - request_type=body.request_type, - status="intake", - priority=body.priority or "normal", - requester_name=body.requester_name, - requester_email=body.requester_email, - requester_phone=body.requester_phone, - requester_address=body.requester_address, - requester_customer_id=body.requester_customer_id, - source=body.source, - source_details=body.source_details, - request_text=body.request_text, - notes=body.notes, - received_at=now, - deadline_at=now + timedelta(days=deadline_days), - created_at=now, - updated_at=now, - ) - db.add(dsr) - db.flush() # Ensure dsr.id is assigned before referencing - - # Initial history entry - history = DSRStatusHistoryDB( - tenant_id=uuid.UUID(tenant_id), - dsr_id=dsr.id, - previous_status=None, - new_status="intake", - changed_by="system", - comment="DSR erstellt", - ) - db.add(history) - - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.create(body, tenant_id) @router.get("") @@ -331,237 +89,64 @@ async def list_dsrs( limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0), tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Liste aller DSRs mit Filtern.""" - query = db.query(DSRRequestDB).filter( - DSRRequestDB.tenant_id == uuid.UUID(tenant_id), - ) - - if status: - query = query.filter(DSRRequestDB.status == status) - if request_type: - query = query.filter(DSRRequestDB.request_type == request_type) - if assigned_to: - query = query.filter(DSRRequestDB.assigned_to == assigned_to) - if priority: - query = query.filter(DSRRequestDB.priority == priority) - if overdue_only: - query = query.filter( - DSRRequestDB.deadline_at < datetime.now(timezone.utc), - DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), + with translate_domain_errors(): + return svc.list( + tenant_id, status=status, request_type=request_type, + assigned_to=assigned_to, priority=priority, + overdue_only=overdue_only, search=search, + from_date=from_date, to_date=to_date, + limit=limit, offset=offset, ) - if search: - search_term = f"%{search.lower()}%" - query = query.filter( - or_( - func.lower(func.coalesce(DSRRequestDB.requester_name, '')).like(search_term), - func.lower(func.coalesce(DSRRequestDB.requester_email, '')).like(search_term), - func.lower(func.coalesce(DSRRequestDB.request_number, '')).like(search_term), - func.lower(func.coalesce(DSRRequestDB.request_text, '')).like(search_term), - ) - ) - if from_date: - query = query.filter(DSRRequestDB.received_at >= from_date) - if to_date: - query = query.filter(DSRRequestDB.received_at <= to_date) - - total = query.count() - dsrs = query.order_by(DSRRequestDB.created_at.desc()).offset(offset).limit(limit).all() - - return { - "requests": [_dsr_to_dict(d) for d in dsrs], - "total": total, - "limit": limit, - "offset": offset, - } @router.get("/stats") async def get_dsr_stats( tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Dashboard-Statistiken fuer DSRs.""" - tid = uuid.UUID(tenant_id) - base = db.query(DSRRequestDB).filter(DSRRequestDB.tenant_id == tid) + with translate_domain_errors(): + return svc.stats(tenant_id) - total = base.count() - - # By status - by_status = {} - for s in VALID_STATUSES: - by_status[s] = base.filter(DSRRequestDB.status == s).count() - - # By type - by_type = {} - for t in VALID_REQUEST_TYPES: - by_type[t] = base.filter(DSRRequestDB.request_type == t).count() - - # Overdue - now = datetime.now(timezone.utc) - overdue = base.filter( - DSRRequestDB.deadline_at < now, - DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), - ).count() - - # Due this week - week_from_now = now + timedelta(days=7) - due_this_week = base.filter( - DSRRequestDB.deadline_at >= now, - DSRRequestDB.deadline_at <= week_from_now, - DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), - ).count() - - # Completed this month - month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - completed_this_month = base.filter( - DSRRequestDB.status == "completed", - DSRRequestDB.completed_at >= month_start, - ).count() - - # Average processing days (completed DSRs) - completed = base.filter(DSRRequestDB.status == "completed", DSRRequestDB.completed_at.isnot(None)).all() - if completed: - total_days = sum( - (d.completed_at - d.received_at).days for d in completed if d.completed_at and d.received_at - ) - avg_days = total_days / len(completed) - else: - avg_days = 0 - - return { - "total": total, - "by_status": by_status, - "by_type": by_type, - "overdue": overdue, - "due_this_week": due_this_week, - "average_processing_days": round(avg_days, 1), - "completed_this_month": completed_this_month, - } - - -# ============================================================================= -# Export -# ============================================================================= @router.get("/export") async def export_dsrs( format: str = Query("csv", pattern="^(csv|json)$"), tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Exportiert alle DSRs als CSV oder JSON.""" - tid = uuid.UUID(tenant_id) - dsrs = db.query(DSRRequestDB).filter( - DSRRequestDB.tenant_id == tid, - ).order_by(DSRRequestDB.created_at.desc()).all() - + with translate_domain_errors(): + result = svc.export(tenant_id, fmt=format) if format == "json": - return { - "exported_at": datetime.now(timezone.utc).isoformat(), - "total": len(dsrs), - "requests": [_dsr_to_dict(d) for d in dsrs], - } - - # CSV export (semicolon-separated, matching Go format + extended fields) - output = io.StringIO() - writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL) - writer.writerow([ - "ID", "Referenznummer", "Typ", "Name", "E-Mail", "Status", - "Prioritaet", "Eingegangen", "Frist", "Abgeschlossen", "Quelle", "Zugewiesen", - ]) - - for dsr in dsrs: - writer.writerow([ - str(dsr.id), - dsr.request_number or "", - dsr.request_type or "", - dsr.requester_name or "", - dsr.requester_email or "", - dsr.status or "", - dsr.priority or "", - dsr.received_at.strftime("%Y-%m-%d") if dsr.received_at else "", - dsr.deadline_at.strftime("%Y-%m-%d") if dsr.deadline_at else "", - dsr.completed_at.strftime("%Y-%m-%d") if dsr.completed_at else "", - dsr.source or "", - dsr.assigned_to or "", - ]) - - output.seek(0) + return result return StreamingResponse( - output, + result, media_type="text/csv; charset=utf-8", headers={"Content-Disposition": "attachment; filename=dsr_export.csv"}, ) -# ============================================================================= -# Deadline Processing (MUST be before /{dsr_id} to avoid path conflicts) -# ============================================================================= - @router.post("/deadlines/process") async def process_deadlines( tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Verarbeitet Fristen und markiert ueberfaellige DSRs.""" - now = datetime.now(timezone.utc) - tid = uuid.UUID(tenant_id) - - overdue = db.query(DSRRequestDB).filter( - DSRRequestDB.tenant_id == tid, - DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), - or_( - and_(DSRRequestDB.extended_deadline_at.isnot(None), DSRRequestDB.extended_deadline_at < now), - and_(DSRRequestDB.extended_deadline_at.is_(None), DSRRequestDB.deadline_at < now), - ), - ).all() - - processed = [] - for dsr in overdue: - processed.append({ - "id": str(dsr.id), - "request_number": dsr.request_number, - "status": dsr.status, - "deadline_at": dsr.deadline_at.isoformat() if dsr.deadline_at else None, - "extended_deadline_at": dsr.extended_deadline_at.isoformat() if dsr.extended_deadline_at else None, - "days_overdue": (now - (dsr.extended_deadline_at or dsr.deadline_at)).days, - }) - - return { - "processed": len(processed), - "overdue_requests": processed, - } + with translate_domain_errors(): + return svc.process_deadlines(tenant_id) -# ============================================================================= -# DSR Templates (MUST be before /{dsr_id} to avoid path conflicts) -# ============================================================================= +# --------------------------------------------------------------------------- +# Templates (static paths before /{dsr_id}) +# --------------------------------------------------------------------------- @router.get("/templates") async def get_templates( tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Gibt alle DSR-Vorlagen zurueck.""" - templates = db.query(DSRTemplateDB).filter( - DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), - ).order_by(DSRTemplateDB.template_type).all() - - return [ - { - "id": str(t.id), - "name": t.name, - "template_type": t.template_type, - "request_type": t.request_type, - "language": t.language, - "is_active": t.is_active, - "created_at": t.created_at.isoformat() if t.created_at else None, - "updated_at": t.updated_at.isoformat() if t.updated_at else None, - } - for t in templates - ] + with translate_domain_errors(): + return svc.get_templates(tenant_id) @router.get("/templates/published") @@ -569,87 +154,22 @@ async def get_published_templates( request_type: Optional[str] = Query(None), language: str = Query("de"), tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Gibt publizierte Vorlagen zurueck.""" - query = db.query(DSRTemplateDB).filter( - DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), - DSRTemplateDB.is_active, - DSRTemplateDB.language == language, - ) - if request_type: - query = query.filter( - or_( - DSRTemplateDB.request_type == request_type, - DSRTemplateDB.request_type.is_(None), - ) + with translate_domain_errors(): + return svc.get_published_templates( + tenant_id, request_type=request_type, language=language, ) - templates = query.all() - result = [] - for t in templates: - latest = db.query(DSRTemplateVersionDB).filter( - DSRTemplateVersionDB.template_id == t.id, - DSRTemplateVersionDB.status == "published", - ).order_by(DSRTemplateVersionDB.created_at.desc()).first() - - result.append({ - "id": str(t.id), - "name": t.name, - "template_type": t.template_type, - "request_type": t.request_type, - "language": t.language, - "latest_version": { - "id": str(latest.id), - "version": latest.version, - "subject": latest.subject, - "body_html": latest.body_html, - "body_text": latest.body_text, - } if latest else None, - }) - - return result - @router.get("/templates/{template_id}/versions") async def get_template_versions( template_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Gibt alle Versionen einer Vorlage zurueck.""" - try: - tid = uuid.UUID(template_id) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid template ID") - - template = db.query(DSRTemplateDB).filter( - DSRTemplateDB.id == tid, - DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), - ).first() - if not template: - raise HTTPException(status_code=404, detail="Template not found") - - versions = db.query(DSRTemplateVersionDB).filter( - DSRTemplateVersionDB.template_id == tid, - ).order_by(DSRTemplateVersionDB.created_at.desc()).all() - - return [ - { - "id": str(v.id), - "template_id": str(v.template_id), - "version": v.version, - "subject": v.subject, - "body_html": v.body_html, - "body_text": v.body_text, - "status": v.status, - "published_at": v.published_at.isoformat() if v.published_at else None, - "published_by": v.published_by, - "created_at": v.created_at.isoformat() if v.created_at else None, - "created_by": v.created_by, - } - for v in versions - ] + with translate_domain_errors(): + return svc.get_template_versions(template_id, tenant_id) @router.post("/templates/{template_id}/versions") @@ -657,93 +177,34 @@ async def create_template_version( template_id: str, body: CreateTemplateVersion, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Erstellt eine neue Version einer Vorlage.""" - try: - tid = uuid.UUID(template_id) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid template ID") - - template = db.query(DSRTemplateDB).filter( - DSRTemplateDB.id == tid, - DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), - ).first() - if not template: - raise HTTPException(status_code=404, detail="Template not found") - - version = DSRTemplateVersionDB( - template_id=tid, - version=body.version, - subject=body.subject, - body_html=body.body_html, - body_text=body.body_text, - status="draft", - ) - db.add(version) - db.commit() - db.refresh(version) - - return { - "id": str(version.id), - "template_id": str(version.template_id), - "version": version.version, - "subject": version.subject, - "body_html": version.body_html, - "body_text": version.body_text, - "status": version.status, - "created_at": version.created_at.isoformat() if version.created_at else None, - } + with translate_domain_errors(): + return svc.create_template_version(template_id, body, tenant_id) @router.put("/template-versions/{version_id}/publish") async def publish_template_version( version_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Veroeffentlicht eine Vorlagen-Version.""" - try: - vid = uuid.UUID(version_id) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid version ID") - - version = db.query(DSRTemplateVersionDB).filter( - DSRTemplateVersionDB.id == vid, - ).first() - if not version: - raise HTTPException(status_code=404, detail="Version not found") - - now = datetime.now(timezone.utc) - version.status = "published" - version.published_at = now - version.published_by = "admin" - db.commit() - db.refresh(version) - - return { - "id": str(version.id), - "template_id": str(version.template_id), - "version": version.version, - "status": version.status, - "published_at": version.published_at.isoformat(), - "published_by": version.published_by, - } + with translate_domain_errors(): + return svc.publish_template_version(version_id, tenant_id) -# ============================================================================= -# Single DSR Endpoints (parameterized — MUST come after static paths) -# ============================================================================= +# --------------------------------------------------------------------------- +# Single DSR (parameterized — after static paths) +# --------------------------------------------------------------------------- @router.get("/{dsr_id}") async def get_dsr( dsr_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Detail einer Betroffenenanfrage.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.get(dsr_id, tenant_id) @router.put("/{dsr_id}") @@ -751,79 +212,35 @@ async def update_dsr( dsr_id: str, body: DSRUpdate, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Aktualisiert eine Betroffenenanfrage.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - - if body.priority is not None: - if body.priority not in VALID_PRIORITIES: - raise HTTPException(status_code=400, detail=f"Invalid priority: {body.priority}") - dsr.priority = body.priority - if body.notes is not None: - dsr.notes = body.notes - if body.internal_notes is not None: - dsr.internal_notes = body.internal_notes - if body.assigned_to is not None: - dsr.assigned_to = body.assigned_to - dsr.assigned_at = datetime.now(timezone.utc) - if body.request_text is not None: - dsr.request_text = body.request_text - if body.affected_systems is not None: - dsr.affected_systems = body.affected_systems - if body.erasure_checklist is not None: - dsr.erasure_checklist = body.erasure_checklist - if body.rectification_details is not None: - dsr.rectification_details = body.rectification_details - if body.objection_details is not None: - dsr.objection_details = body.objection_details - - dsr.updated_at = datetime.now(timezone.utc) - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.update(dsr_id, body, tenant_id) @router.delete("/{dsr_id}") async def delete_dsr( dsr_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRService = Depends(_dsr_svc), ): - """Storniert eine DSR (Soft Delete → Status cancelled).""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - if dsr.status in ("completed", "cancelled"): - raise HTTPException(status_code=400, detail="DSR already completed or cancelled") - - _record_history(db, dsr, "cancelled", comment="DSR storniert") - dsr.status = "cancelled" - dsr.updated_at = datetime.now(timezone.utc) - db.commit() - return {"success": True, "message": "DSR cancelled"} + with translate_domain_errors(): + return svc.delete(dsr_id, tenant_id) -# ============================================================================= -# Workflow Actions -# ============================================================================= +# --------------------------------------------------------------------------- +# Workflow actions +# --------------------------------------------------------------------------- @router.post("/{dsr_id}/status") async def change_status( dsr_id: str, body: StatusChange, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Aendert den Status einer DSR.""" - if body.status not in VALID_STATUSES: - raise HTTPException(status_code=400, detail=f"Invalid status: {body.status}") - - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - _record_history(db, dsr, body.status, comment=body.comment) - dsr.status = body.status - dsr.updated_at = datetime.now(timezone.utc) - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.change_status(dsr_id, body, tenant_id) @router.post("/{dsr_id}/verify-identity") @@ -831,31 +248,10 @@ async def verify_identity( dsr_id: str, body: VerifyIdentity, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Verifiziert die Identitaet des Antragstellers.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - now = datetime.now(timezone.utc) - - dsr.identity_verified = True - dsr.verification_method = body.method - dsr.verified_at = now - dsr.verified_by = "admin" - dsr.verification_notes = body.notes - dsr.verification_document_ref = body.document_ref - - # Auto-advance to processing if in identity_verification - if dsr.status == "identity_verification": - _record_history(db, dsr, "processing", comment="Identitaet verifiziert") - dsr.status = "processing" - elif dsr.status == "intake": - _record_history(db, dsr, "identity_verification", comment="Identitaet verifiziert") - dsr.status = "identity_verification" - - dsr.updated_at = now - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.verify_identity(dsr_id, body, tenant_id) @router.post("/{dsr_id}/assign") @@ -863,17 +259,10 @@ async def assign_dsr( dsr_id: str, body: AssignRequest, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Weist eine DSR einem Bearbeiter zu.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - dsr.assigned_to = body.assignee_id - dsr.assigned_at = datetime.now(timezone.utc) - dsr.assigned_by = "admin" - dsr.updated_at = datetime.now(timezone.utc) - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.assign(dsr_id, body, tenant_id) @router.post("/{dsr_id}/extend") @@ -881,27 +270,10 @@ async def extend_deadline( dsr_id: str, body: ExtendDeadline, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Verlaengert die Bearbeitungsfrist (Art. 12 Abs. 3 DSGVO).""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - if dsr.status in ("completed", "rejected", "cancelled"): - raise HTTPException(status_code=400, detail="Cannot extend deadline for closed DSR") - - now = datetime.now(timezone.utc) - current_deadline = dsr.extended_deadline_at or dsr.deadline_at - new_deadline = current_deadline + timedelta(days=body.days or 60) - - dsr.extended_deadline_at = new_deadline - dsr.extension_reason = body.reason - dsr.extension_approved_by = "admin" - dsr.extension_approved_at = now - dsr.updated_at = now - - _record_history(db, dsr, dsr.status, comment=f"Frist verlaengert: {body.reason}") - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.extend_deadline(dsr_id, body, tenant_id) @router.post("/{dsr_id}/complete") @@ -909,24 +281,10 @@ async def complete_dsr( dsr_id: str, body: CompleteDSR, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Schliesst eine DSR erfolgreich ab.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - if dsr.status in ("completed", "cancelled"): - raise HTTPException(status_code=400, detail="DSR already completed or cancelled") - - now = datetime.now(timezone.utc) - _record_history(db, dsr, "completed", comment=body.summary) - dsr.status = "completed" - dsr.completed_at = now - dsr.completion_notes = body.summary - if body.result_data: - dsr.data_export = body.result_data - dsr.updated_at = now - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.complete(dsr_id, body, tenant_id) @router.post("/{dsr_id}/reject") @@ -934,85 +292,34 @@ async def reject_dsr( dsr_id: str, body: RejectDSR, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Lehnt eine DSR mit Rechtsgrundlage ab.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - if dsr.status in ("completed", "rejected", "cancelled"): - raise HTTPException(status_code=400, detail="DSR already closed") - - now = datetime.now(timezone.utc) - _record_history(db, dsr, "rejected", comment=f"{body.reason} ({body.legal_basis})") - dsr.status = "rejected" - dsr.rejection_reason = body.reason - dsr.rejection_legal_basis = body.legal_basis - dsr.completed_at = now - dsr.updated_at = now - db.commit() - db.refresh(dsr) - return _dsr_to_dict(dsr) + with translate_domain_errors(): + return svc.reject(dsr_id, body, tenant_id) -# ============================================================================= +# --------------------------------------------------------------------------- # History & Communications -# ============================================================================= +# --------------------------------------------------------------------------- @router.get("/{dsr_id}/history") async def get_history( dsr_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Gibt die Status-Historie zurueck.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - entries = db.query(DSRStatusHistoryDB).filter( - DSRStatusHistoryDB.dsr_id == dsr.id, - ).order_by(DSRStatusHistoryDB.created_at.desc()).all() - - return [ - { - "id": str(e.id), - "dsr_id": str(e.dsr_id), - "previous_status": e.previous_status, - "new_status": e.new_status, - "changed_by": e.changed_by, - "comment": e.comment, - "created_at": e.created_at.isoformat() if e.created_at else None, - } - for e in entries - ] + with translate_domain_errors(): + return svc.get_history(dsr_id, tenant_id) @router.get("/{dsr_id}/communications") async def get_communications( dsr_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Gibt die Kommunikationshistorie zurueck.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - comms = db.query(DSRCommunicationDB).filter( - DSRCommunicationDB.dsr_id == dsr.id, - ).order_by(DSRCommunicationDB.created_at.desc()).all() - - return [ - { - "id": str(c.id), - "dsr_id": str(c.dsr_id), - "communication_type": c.communication_type, - "channel": c.channel, - "subject": c.subject, - "content": c.content, - "template_used": c.template_used, - "attachments": c.attachments or [], - "sent_at": c.sent_at.isoformat() if c.sent_at else None, - "sent_by": c.sent_by, - "received_at": c.received_at.isoformat() if c.received_at else None, - "created_at": c.created_at.isoformat() if c.created_at else None, - "created_by": c.created_by, - } - for c in comms - ] + with translate_domain_errors(): + return svc.get_communications(dsr_id, tenant_id) @router.post("/{dsr_id}/communicate") @@ -1020,117 +327,34 @@ async def send_communication( dsr_id: str, body: SendCommunication, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Sendet eine Kommunikation.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - now = datetime.now(timezone.utc) - - comm = DSRCommunicationDB( - tenant_id=uuid.UUID(tenant_id), - dsr_id=dsr.id, - communication_type=body.communication_type, - channel=body.channel, - subject=body.subject, - content=body.content, - template_used=body.template_used, - sent_at=now if body.communication_type == "outgoing" else None, - sent_by="admin" if body.communication_type == "outgoing" else None, - received_at=now if body.communication_type == "incoming" else None, - created_at=now, - ) - db.add(comm) - db.commit() - db.refresh(comm) - - return { - "id": str(comm.id), - "dsr_id": str(comm.dsr_id), - "communication_type": comm.communication_type, - "channel": comm.channel, - "subject": comm.subject, - "content": comm.content, - "sent_at": comm.sent_at.isoformat() if comm.sent_at else None, - "created_at": comm.created_at.isoformat() if comm.created_at else None, - } + with translate_domain_errors(): + return svc.send_communication(dsr_id, body, tenant_id) -# ============================================================================= +# --------------------------------------------------------------------------- # Exception Checks (Art. 17) -# ============================================================================= +# --------------------------------------------------------------------------- @router.get("/{dsr_id}/exception-checks") async def get_exception_checks( dsr_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Gibt die Art. 17(3) Ausnahmepruefungen zurueck.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - checks = db.query(DSRExceptionCheckDB).filter( - DSRExceptionCheckDB.dsr_id == dsr.id, - ).order_by(DSRExceptionCheckDB.check_code).all() - - return [ - { - "id": str(c.id), - "dsr_id": str(c.dsr_id), - "check_code": c.check_code, - "article": c.article, - "label": c.label, - "description": c.description, - "applies": c.applies, - "notes": c.notes, - "checked_by": c.checked_by, - "checked_at": c.checked_at.isoformat() if c.checked_at else None, - } - for c in checks - ] + with translate_domain_errors(): + return svc.get_exception_checks(dsr_id, tenant_id) @router.post("/{dsr_id}/exception-checks/init") async def init_exception_checks( dsr_id: str, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Initialisiert die Art. 17(3) Ausnahmepruefungen fuer eine Loeschanfrage.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - if dsr.request_type != "erasure": - raise HTTPException(status_code=400, detail="Exception checks only for erasure requests") - - # Check if already initialized - existing = db.query(DSRExceptionCheckDB).filter(DSRExceptionCheckDB.dsr_id == dsr.id).count() - if existing > 0: - raise HTTPException(status_code=400, detail="Exception checks already initialized") - - checks = [] - for exc in ART17_EXCEPTIONS: - check = DSRExceptionCheckDB( - tenant_id=uuid.UUID(tenant_id), - dsr_id=dsr.id, - check_code=exc["check_code"], - article=exc["article"], - label=exc["label"], - description=exc["description"], - ) - db.add(check) - checks.append(check) - - db.commit() - return [ - { - "id": str(c.id), - "dsr_id": str(c.dsr_id), - "check_code": c.check_code, - "article": c.article, - "label": c.label, - "description": c.description, - "applies": c.applies, - "notes": c.notes, - } - for c in checks - ] + with translate_domain_errors(): + return svc.init_exception_checks(dsr_id, tenant_id) @router.put("/{dsr_id}/exception-checks/{check_id}") @@ -1139,38 +363,7 @@ async def update_exception_check( check_id: str, body: UpdateExceptionCheck, tenant_id: str = Depends(_get_tenant), - db: Session = Depends(get_db), + svc: DSRWorkflowService = Depends(_wf_svc), ): - """Aktualisiert eine einzelne Ausnahmepruefung.""" - dsr = _get_dsr_or_404(db, dsr_id, tenant_id) - try: - cid = uuid.UUID(check_id) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid check ID") - - check = db.query(DSRExceptionCheckDB).filter( - DSRExceptionCheckDB.id == cid, - DSRExceptionCheckDB.dsr_id == dsr.id, - ).first() - if not check: - raise HTTPException(status_code=404, detail="Exception check not found") - - check.applies = body.applies - check.notes = body.notes - check.checked_by = "admin" - check.checked_at = datetime.now(timezone.utc) - db.commit() - db.refresh(check) - - return { - "id": str(check.id), - "dsr_id": str(check.dsr_id), - "check_code": check.check_code, - "article": check.article, - "label": check.label, - "description": check.description, - "applies": check.applies, - "notes": check.notes, - "checked_by": check.checked_by, - "checked_at": check.checked_at.isoformat() if check.checked_at else None, - } + with translate_domain_errors(): + return svc.update_exception_check(dsr_id, check_id, body, tenant_id) diff --git a/backend-compliance/compliance/schemas/dsr.py b/backend-compliance/compliance/schemas/dsr.py new file mode 100644 index 0000000..428ac6e --- /dev/null +++ b/backend-compliance/compliance/schemas/dsr.py @@ -0,0 +1,101 @@ +""" +DSR (Data Subject Request) schemas — Betroffenenanfragen nach DSGVO Art. 15-21. + +Phase 1 Step 4: extracted from ``compliance.api.dsr_routes``. +""" + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + + +class DSRCreate(BaseModel): + request_type: str = "access" + requester_name: str + requester_email: str + requester_phone: Optional[str] = None + requester_address: Optional[str] = None + requester_customer_id: Optional[str] = None + source: str = "email" + source_details: Optional[str] = None + request_text: Optional[str] = None + priority: Optional[str] = "normal" + notes: Optional[str] = None + + +class DSRUpdate(BaseModel): + priority: Optional[str] = None + notes: Optional[str] = None + internal_notes: Optional[str] = None + assigned_to: Optional[str] = None + request_text: Optional[str] = None + affected_systems: Optional[List[str]] = None + erasure_checklist: Optional[List[Dict[str, Any]]] = None + rectification_details: Optional[Dict[str, Any]] = None + objection_details: Optional[Dict[str, Any]] = None + + +class StatusChange(BaseModel): + status: str + comment: Optional[str] = None + + +class VerifyIdentity(BaseModel): + method: str + notes: Optional[str] = None + document_ref: Optional[str] = None + + +class AssignRequest(BaseModel): + assignee_id: str + + +class ExtendDeadline(BaseModel): + reason: str + days: Optional[int] = 60 + + +class CompleteDSR(BaseModel): + summary: Optional[str] = None + result_data: Optional[Dict[str, Any]] = None + + +class RejectDSR(BaseModel): + reason: str + legal_basis: Optional[str] = None + + +class SendCommunication(BaseModel): + communication_type: str = "outgoing" + channel: str = "email" + subject: Optional[str] = None + content: str + template_used: Optional[str] = None + + +class UpdateExceptionCheck(BaseModel): + applies: bool + notes: Optional[str] = None + + +class CreateTemplateVersion(BaseModel): + version: str = "1.0" + language: Optional[str] = "de" + subject: str + body_html: str + body_text: Optional[str] = None + + +__all__ = [ + "DSRCreate", + "DSRUpdate", + "StatusChange", + "VerifyIdentity", + "AssignRequest", + "ExtendDeadline", + "CompleteDSR", + "RejectDSR", + "SendCommunication", + "UpdateExceptionCheck", + "CreateTemplateVersion", +] diff --git a/backend-compliance/compliance/services/dsr_service.py b/backend-compliance/compliance/services/dsr_service.py new file mode 100644 index 0000000..829b748 --- /dev/null +++ b/backend-compliance/compliance/services/dsr_service.py @@ -0,0 +1,469 @@ +# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return" +""" +DSR service — CRUD, stats, export, deadline processing. + +Phase 1 Step 4: extracted from ``compliance.api.dsr_routes``. Workflow +actions (status changes, identity verification, assignment, completion, +rejection, communications, exception checks, templates) live in +``compliance.services.dsr_workflow_service``. + +Helpers ``_dsr_to_dict``, ``_get_dsr_or_404``, ``_record_history``, +``_generate_request_number`` and constants are defined here and +re-exported from ``compliance.api.dsr_routes`` for legacy test imports. +""" + +import csv +import io +import uuid +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional + +from sqlalchemy import func, or_, text +from sqlalchemy.orm import Session + +from compliance.db.dsr_models import DSRRequestDB, DSRStatusHistoryDB +from compliance.domain import NotFoundError, ValidationError +from compliance.schemas.dsr import DSRCreate, DSRUpdate + +# ============================================================================ +# Constants +# ============================================================================ + +DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" + +ART17_EXCEPTIONS: List[Dict[str, str]] = [ + { + "check_code": "art17_3_a", + "article": "17(3)(a)", + "label": "Meinungs- und Informationsfreiheit", + "description": "Ausuebung des Rechts auf freie Meinungsaeusserung und Information", + }, + { + "check_code": "art17_3_b", + "article": "17(3)(b)", + "label": "Rechtliche Verpflichtung", + "description": "Erfuellung einer rechtlichen Verpflichtung (z.B. Aufbewahrungspflichten)", + }, + { + "check_code": "art17_3_c", + "article": "17(3)(c)", + "label": "Oeffentliches Interesse", + "description": "Gruende des oeffentlichen Interesses im Bereich Gesundheit", + }, + { + "check_code": "art17_3_d", + "article": "17(3)(d)", + "label": "Archivzwecke", + "description": "Archivzwecke, wissenschaftliche/historische Forschung, Statistik", + }, + { + "check_code": "art17_3_e", + "article": "17(3)(e)", + "label": "Rechtsansprueche", + "description": "Geltendmachung, Ausuebung oder Verteidigung von Rechtsanspruechen", + }, +] + +VALID_REQUEST_TYPES = [ + "access", "rectification", "erasure", "restriction", "portability", "objection", +] +VALID_STATUSES = [ + "intake", "identity_verification", "processing", "completed", "rejected", "cancelled", +] +VALID_PRIORITIES = ["low", "normal", "high", "critical"] +VALID_SOURCES = ["web_form", "email", "letter", "phone", "in_person", "other"] + +DEADLINE_DAYS: Dict[str, int] = { + "access": 30, + "rectification": 14, + "erasure": 14, + "restriction": 14, + "portability": 30, + "objection": 30, +} + + +# ============================================================================ +# Module-level helpers (re-exported by compliance.api.dsr_routes) +# ============================================================================ + + +def _generate_request_number(db: Session, tenant_id: str) -> str: + """Generate next request number: DSR-YYYY-NNNNNN.""" + year = datetime.now(timezone.utc).year + try: + result = db.execute(text("SELECT nextval('compliance_dsr_request_number_seq')")) + seq = result.scalar() + except Exception: + count = db.query(DSRRequestDB).count() + seq = count + 1 + return f"DSR-{year}-{str(seq).zfill(6)}" + + +def _record_history( + db: Session, + dsr: DSRRequestDB, + new_status: str, + changed_by: str = "system", + comment: Optional[str] = None, +) -> None: + """Record status change in history.""" + entry = DSRStatusHistoryDB( + tenant_id=dsr.tenant_id, + dsr_id=dsr.id, + previous_status=dsr.status, + new_status=new_status, + changed_by=changed_by, + comment=comment, + ) + db.add(entry) + + +def _dsr_to_dict(dsr: DSRRequestDB) -> Dict[str, Any]: + """Convert DSR DB record to API response dict.""" + return { + "id": str(dsr.id), + "tenant_id": str(dsr.tenant_id), + "request_number": dsr.request_number, + "request_type": dsr.request_type, + "status": dsr.status, + "priority": dsr.priority, + "requester_name": dsr.requester_name, + "requester_email": dsr.requester_email, + "requester_phone": dsr.requester_phone, + "requester_address": dsr.requester_address, + "requester_customer_id": dsr.requester_customer_id, + "source": dsr.source, + "source_details": dsr.source_details, + "request_text": dsr.request_text, + "notes": dsr.notes, + "internal_notes": dsr.internal_notes, + "received_at": dsr.received_at.isoformat() if dsr.received_at else None, + "deadline_at": dsr.deadline_at.isoformat() if dsr.deadline_at else None, + "extended_deadline_at": dsr.extended_deadline_at.isoformat() if dsr.extended_deadline_at else None, + "extension_reason": dsr.extension_reason, + "extension_approved_by": dsr.extension_approved_by, + "extension_approved_at": dsr.extension_approved_at.isoformat() if dsr.extension_approved_at else None, + "identity_verified": dsr.identity_verified, + "verification_method": dsr.verification_method, + "verified_at": dsr.verified_at.isoformat() if dsr.verified_at else None, + "verified_by": dsr.verified_by, + "verification_notes": dsr.verification_notes, + "verification_document_ref": dsr.verification_document_ref, + "assigned_to": dsr.assigned_to, + "assigned_at": dsr.assigned_at.isoformat() if dsr.assigned_at else None, + "assigned_by": dsr.assigned_by, + "completed_at": dsr.completed_at.isoformat() if dsr.completed_at else None, + "completion_notes": dsr.completion_notes, + "rejection_reason": dsr.rejection_reason, + "rejection_legal_basis": dsr.rejection_legal_basis, + "erasure_checklist": dsr.erasure_checklist or [], + "data_export": dsr.data_export or {}, + "rectification_details": dsr.rectification_details or {}, + "objection_details": dsr.objection_details or {}, + "affected_systems": dsr.affected_systems or [], + "created_at": dsr.created_at.isoformat() if dsr.created_at else None, + "updated_at": dsr.updated_at.isoformat() if dsr.updated_at else None, + "created_by": dsr.created_by, + "updated_by": dsr.updated_by, + } + + +def _get_dsr_or_404(db: Session, dsr_id: str, tenant_id: str) -> DSRRequestDB: + """Get DSR by ID or raise NotFoundError.""" + try: + uid = uuid.UUID(dsr_id) + except ValueError: + raise ValidationError("Invalid DSR ID format") + dsr = db.query(DSRRequestDB).filter( + DSRRequestDB.id == uid, + DSRRequestDB.tenant_id == uuid.UUID(tenant_id), + ).first() + if not dsr: + raise NotFoundError("DSR not found") + return dsr + + +# ============================================================================ +# Service class +# ============================================================================ + + +class DSRService: + """CRUD, stats, export, and deadline processing for DSRs.""" + + def __init__(self, db: Session) -> None: + self._db = db + + # -- Create -------------------------------------------------------------- + + def create(self, body: DSRCreate, tenant_id: str) -> Dict[str, Any]: + if body.request_type not in VALID_REQUEST_TYPES: + raise ValidationError(f"Invalid request_type. Must be one of: {VALID_REQUEST_TYPES}") + if body.source not in VALID_SOURCES: + raise ValidationError(f"Invalid source. Must be one of: {VALID_SOURCES}") + if body.priority and body.priority not in VALID_PRIORITIES: + raise ValidationError(f"Invalid priority. Must be one of: {VALID_PRIORITIES}") + + now = datetime.now(timezone.utc) + deadline_days = DEADLINE_DAYS.get(body.request_type, 30) + request_number = _generate_request_number(self._db, tenant_id) + + dsr = DSRRequestDB( + tenant_id=uuid.UUID(tenant_id), + request_number=request_number, + request_type=body.request_type, + status="intake", + priority=body.priority or "normal", + requester_name=body.requester_name, + requester_email=body.requester_email, + requester_phone=body.requester_phone, + requester_address=body.requester_address, + requester_customer_id=body.requester_customer_id, + source=body.source, + source_details=body.source_details, + request_text=body.request_text, + notes=body.notes, + received_at=now, + deadline_at=now + timedelta(days=deadline_days), + created_at=now, + updated_at=now, + ) + self._db.add(dsr) + self._db.flush() + + history = DSRStatusHistoryDB( + tenant_id=uuid.UUID(tenant_id), + dsr_id=dsr.id, + previous_status=None, + new_status="intake", + changed_by="system", + comment="DSR erstellt", + ) + self._db.add(history) + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- List ---------------------------------------------------------------- + + def list( # noqa: C901 — many filter params, straightforward + self, + tenant_id: str, + *, + status: Optional[str] = None, + request_type: Optional[str] = None, + assigned_to: Optional[str] = None, + priority: Optional[str] = None, + overdue_only: bool = False, + search: Optional[str] = None, + from_date: Optional[str] = None, + to_date: Optional[str] = None, + limit: int = 20, + offset: int = 0, + ) -> Dict[str, Any]: + query = self._db.query(DSRRequestDB).filter( + DSRRequestDB.tenant_id == uuid.UUID(tenant_id), + ) + if status: + query = query.filter(DSRRequestDB.status == status) + if request_type: + query = query.filter(DSRRequestDB.request_type == request_type) + if assigned_to: + query = query.filter(DSRRequestDB.assigned_to == assigned_to) + if priority: + query = query.filter(DSRRequestDB.priority == priority) + if overdue_only: + query = query.filter( + DSRRequestDB.deadline_at < datetime.now(timezone.utc), + DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), + ) + if search: + search_term = f"%{search.lower()}%" + query = query.filter( + or_( + func.lower(func.coalesce(DSRRequestDB.requester_name, '')).like(search_term), + func.lower(func.coalesce(DSRRequestDB.requester_email, '')).like(search_term), + func.lower(func.coalesce(DSRRequestDB.request_number, '')).like(search_term), + func.lower(func.coalesce(DSRRequestDB.request_text, '')).like(search_term), + ) + ) + if from_date: + query = query.filter(DSRRequestDB.received_at >= from_date) + if to_date: + query = query.filter(DSRRequestDB.received_at <= to_date) + + total = query.count() + dsrs = query.order_by(DSRRequestDB.created_at.desc()).offset(offset).limit(limit).all() + return { + "requests": [_dsr_to_dict(d) for d in dsrs], + "total": total, + "limit": limit, + "offset": offset, + } + + # -- Get ----------------------------------------------------------------- + + def get(self, dsr_id: str, tenant_id: str) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + return _dsr_to_dict(dsr) + + # -- Update -------------------------------------------------------------- + + def update(self, dsr_id: str, body: DSRUpdate, tenant_id: str) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + if body.priority is not None: + if body.priority not in VALID_PRIORITIES: + raise ValidationError(f"Invalid priority: {body.priority}") + dsr.priority = body.priority + if body.notes is not None: + dsr.notes = body.notes + if body.internal_notes is not None: + dsr.internal_notes = body.internal_notes + if body.assigned_to is not None: + dsr.assigned_to = body.assigned_to + dsr.assigned_at = datetime.now(timezone.utc) + if body.request_text is not None: + dsr.request_text = body.request_text + if body.affected_systems is not None: + dsr.affected_systems = body.affected_systems + if body.erasure_checklist is not None: + dsr.erasure_checklist = body.erasure_checklist + if body.rectification_details is not None: + dsr.rectification_details = body.rectification_details + if body.objection_details is not None: + dsr.objection_details = body.objection_details + + dsr.updated_at = datetime.now(timezone.utc) + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- Delete (soft) ------------------------------------------------------- + + def delete(self, dsr_id: str, tenant_id: str) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + if dsr.status in ("completed", "cancelled"): + raise ValidationError("DSR already completed or cancelled") + _record_history(self._db, dsr, "cancelled", comment="DSR storniert") + dsr.status = "cancelled" + dsr.updated_at = datetime.now(timezone.utc) + self._db.commit() + return {"success": True, "message": "DSR cancelled"} + + # -- Stats --------------------------------------------------------------- + + def stats(self, tenant_id: str) -> Dict[str, Any]: + tid = uuid.UUID(tenant_id) + base = self._db.query(DSRRequestDB).filter(DSRRequestDB.tenant_id == tid) + total = base.count() + + by_status = {s: base.filter(DSRRequestDB.status == s).count() for s in VALID_STATUSES} + by_type = {t: base.filter(DSRRequestDB.request_type == t).count() for t in VALID_REQUEST_TYPES} + + now = datetime.now(timezone.utc) + overdue = base.filter( + DSRRequestDB.deadline_at < now, + DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), + ).count() + + week_from_now = now + timedelta(days=7) + due_this_week = base.filter( + DSRRequestDB.deadline_at >= now, + DSRRequestDB.deadline_at <= week_from_now, + DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), + ).count() + + month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + completed_this_month = base.filter( + DSRRequestDB.status == "completed", + DSRRequestDB.completed_at >= month_start, + ).count() + + completed = base.filter( + DSRRequestDB.status == "completed", DSRRequestDB.completed_at.isnot(None), + ).all() + if completed: + total_days = sum( + (d.completed_at - d.received_at).days + for d in completed if d.completed_at and d.received_at + ) + avg_days = total_days / len(completed) + else: + avg_days = 0 + + return { + "total": total, + "by_status": by_status, + "by_type": by_type, + "overdue": overdue, + "due_this_week": due_this_week, + "average_processing_days": round(avg_days, 1), + "completed_this_month": completed_this_month, + } + + # -- Export -------------------------------------------------------------- + + def export(self, tenant_id: str, fmt: str = "csv") -> Any: + tid = uuid.UUID(tenant_id) + dsrs = self._db.query(DSRRequestDB).filter( + DSRRequestDB.tenant_id == tid, + ).order_by(DSRRequestDB.created_at.desc()).all() + + if fmt == "json": + return { + "exported_at": datetime.now(timezone.utc).isoformat(), + "total": len(dsrs), + "requests": [_dsr_to_dict(d) for d in dsrs], + } + + output = io.StringIO() + writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL) + writer.writerow([ + "ID", "Referenznummer", "Typ", "Name", "E-Mail", "Status", + "Prioritaet", "Eingegangen", "Frist", "Abgeschlossen", "Quelle", "Zugewiesen", + ]) + for dsr in dsrs: + writer.writerow([ + str(dsr.id), + dsr.request_number or "", + dsr.request_type or "", + dsr.requester_name or "", + dsr.requester_email or "", + dsr.status or "", + dsr.priority or "", + dsr.received_at.strftime("%Y-%m-%d") if dsr.received_at else "", + dsr.deadline_at.strftime("%Y-%m-%d") if dsr.deadline_at else "", + dsr.completed_at.strftime("%Y-%m-%d") if dsr.completed_at else "", + dsr.source or "", + dsr.assigned_to or "", + ]) + output.seek(0) + return output + + # -- Deadline processing ------------------------------------------------- + + def process_deadlines(self, tenant_id: str) -> Dict[str, Any]: + now = datetime.now(timezone.utc) + tid = uuid.UUID(tenant_id) + from sqlalchemy import and_ + overdue = self._db.query(DSRRequestDB).filter( + DSRRequestDB.tenant_id == tid, + DSRRequestDB.status.notin_(["completed", "rejected", "cancelled"]), + or_( + and_(DSRRequestDB.extended_deadline_at.isnot(None), DSRRequestDB.extended_deadline_at < now), + and_(DSRRequestDB.extended_deadline_at.is_(None), DSRRequestDB.deadline_at < now), + ), + ).all() + + processed = [] + for dsr in overdue: + processed.append({ + "id": str(dsr.id), + "request_number": dsr.request_number, + "status": dsr.status, + "deadline_at": dsr.deadline_at.isoformat() if dsr.deadline_at else None, + "extended_deadline_at": dsr.extended_deadline_at.isoformat() if dsr.extended_deadline_at else None, + "days_overdue": (now - (dsr.extended_deadline_at or dsr.deadline_at)).days, + }) + return {"processed": len(processed), "overdue_requests": processed} diff --git a/backend-compliance/compliance/services/dsr_workflow_service.py b/backend-compliance/compliance/services/dsr_workflow_service.py new file mode 100644 index 0000000..9e8e921 --- /dev/null +++ b/backend-compliance/compliance/services/dsr_workflow_service.py @@ -0,0 +1,487 @@ +# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return" +""" +DSR workflow service — status changes, identity verification, assignment, +completion, rejection, communications, exception checks, and templates. + +Phase 1 Step 4: extracted from ``compliance.api.dsr_routes``. CRUD, stats, +export, and deadline processing live in ``compliance.services.dsr_service``. +""" + +import uuid +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from compliance.db.dsr_models import ( + DSRCommunicationDB, + DSRExceptionCheckDB, + DSRRequestDB, + DSRTemplateDB, + DSRTemplateVersionDB, +) +from compliance.domain import NotFoundError, ValidationError +from compliance.schemas.dsr import ( + AssignRequest, + CompleteDSR, + CreateTemplateVersion, + ExtendDeadline, + RejectDSR, + SendCommunication, + StatusChange, + UpdateExceptionCheck, + VerifyIdentity, +) +from compliance.services.dsr_service import ( + ART17_EXCEPTIONS, + VALID_STATUSES, + _dsr_to_dict, + _get_dsr_or_404, + _record_history, +) + + +class DSRWorkflowService: + """Workflow actions for DSRs: status, identity, assign, complete, reject, + communications, exception checks, templates.""" + + def __init__(self, db: Session) -> None: + self._db = db + + # -- Status change ------------------------------------------------------- + + def change_status( + self, dsr_id: str, body: StatusChange, tenant_id: str, + ) -> Dict[str, Any]: + if body.status not in VALID_STATUSES: + raise ValidationError(f"Invalid status: {body.status}") + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + _record_history(self._db, dsr, body.status, comment=body.comment) + dsr.status = body.status + dsr.updated_at = datetime.now(timezone.utc) + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- Identity verification ----------------------------------------------- + + def verify_identity( + self, dsr_id: str, body: VerifyIdentity, tenant_id: str, + ) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + now = datetime.now(timezone.utc) + dsr.identity_verified = True + dsr.verification_method = body.method + dsr.verified_at = now + dsr.verified_by = "admin" + dsr.verification_notes = body.notes + dsr.verification_document_ref = body.document_ref + + if dsr.status == "identity_verification": + _record_history(self._db, dsr, "processing", comment="Identitaet verifiziert") + dsr.status = "processing" + elif dsr.status == "intake": + _record_history(self._db, dsr, "identity_verification", comment="Identitaet verifiziert") + dsr.status = "identity_verification" + + dsr.updated_at = now + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- Assignment ---------------------------------------------------------- + + def assign( + self, dsr_id: str, body: AssignRequest, tenant_id: str, + ) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + dsr.assigned_to = body.assignee_id + dsr.assigned_at = datetime.now(timezone.utc) + dsr.assigned_by = "admin" + dsr.updated_at = datetime.now(timezone.utc) + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- Extend deadline ----------------------------------------------------- + + def extend_deadline( + self, dsr_id: str, body: ExtendDeadline, tenant_id: str, + ) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + if dsr.status in ("completed", "rejected", "cancelled"): + raise ValidationError("Cannot extend deadline for closed DSR") + now = datetime.now(timezone.utc) + from datetime import timedelta + current_deadline = dsr.extended_deadline_at or dsr.deadline_at + new_deadline = current_deadline + timedelta(days=body.days or 60) + dsr.extended_deadline_at = new_deadline + dsr.extension_reason = body.reason + dsr.extension_approved_by = "admin" + dsr.extension_approved_at = now + dsr.updated_at = now + _record_history(self._db, dsr, dsr.status, comment=f"Frist verlaengert: {body.reason}") + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- Complete ------------------------------------------------------------ + + def complete( + self, dsr_id: str, body: CompleteDSR, tenant_id: str, + ) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + if dsr.status in ("completed", "cancelled"): + raise ValidationError("DSR already completed or cancelled") + now = datetime.now(timezone.utc) + _record_history(self._db, dsr, "completed", comment=body.summary) + dsr.status = "completed" + dsr.completed_at = now + dsr.completion_notes = body.summary + if body.result_data: + dsr.data_export = body.result_data + dsr.updated_at = now + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- Reject -------------------------------------------------------------- + + def reject( + self, dsr_id: str, body: RejectDSR, tenant_id: str, + ) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + if dsr.status in ("completed", "rejected", "cancelled"): + raise ValidationError("DSR already closed") + now = datetime.now(timezone.utc) + _record_history(self._db, dsr, "rejected", comment=f"{body.reason} ({body.legal_basis})") + dsr.status = "rejected" + dsr.rejection_reason = body.reason + dsr.rejection_legal_basis = body.legal_basis + dsr.completed_at = now + dsr.updated_at = now + self._db.commit() + self._db.refresh(dsr) + return _dsr_to_dict(dsr) + + # -- History ------------------------------------------------------------- + + def get_history(self, dsr_id: str, tenant_id: str) -> List[Dict[str, Any]]: + from compliance.db.dsr_models import DSRStatusHistoryDB + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + entries = self._db.query(DSRStatusHistoryDB).filter( + DSRStatusHistoryDB.dsr_id == dsr.id, + ).order_by(DSRStatusHistoryDB.created_at.desc()).all() + return [ + { + "id": str(e.id), + "dsr_id": str(e.dsr_id), + "previous_status": e.previous_status, + "new_status": e.new_status, + "changed_by": e.changed_by, + "comment": e.comment, + "created_at": e.created_at.isoformat() if e.created_at else None, + } + for e in entries + ] + + # -- Communications ------------------------------------------------------ + + def get_communications(self, dsr_id: str, tenant_id: str) -> List[Dict[str, Any]]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + comms = self._db.query(DSRCommunicationDB).filter( + DSRCommunicationDB.dsr_id == dsr.id, + ).order_by(DSRCommunicationDB.created_at.desc()).all() + return [ + { + "id": str(c.id), + "dsr_id": str(c.dsr_id), + "communication_type": c.communication_type, + "channel": c.channel, + "subject": c.subject, + "content": c.content, + "template_used": c.template_used, + "attachments": c.attachments or [], + "sent_at": c.sent_at.isoformat() if c.sent_at else None, + "sent_by": c.sent_by, + "received_at": c.received_at.isoformat() if c.received_at else None, + "created_at": c.created_at.isoformat() if c.created_at else None, + "created_by": c.created_by, + } + for c in comms + ] + + def send_communication( + self, dsr_id: str, body: SendCommunication, tenant_id: str, + ) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + now = datetime.now(timezone.utc) + comm = DSRCommunicationDB( + tenant_id=uuid.UUID(tenant_id), + dsr_id=dsr.id, + communication_type=body.communication_type, + channel=body.channel, + subject=body.subject, + content=body.content, + template_used=body.template_used, + sent_at=now if body.communication_type == "outgoing" else None, + sent_by="admin" if body.communication_type == "outgoing" else None, + received_at=now if body.communication_type == "incoming" else None, + created_at=now, + ) + self._db.add(comm) + self._db.commit() + self._db.refresh(comm) + return { + "id": str(comm.id), + "dsr_id": str(comm.dsr_id), + "communication_type": comm.communication_type, + "channel": comm.channel, + "subject": comm.subject, + "content": comm.content, + "sent_at": comm.sent_at.isoformat() if comm.sent_at else None, + "created_at": comm.created_at.isoformat() if comm.created_at else None, + } + + # -- Exception checks ---------------------------------------------------- + + def get_exception_checks(self, dsr_id: str, tenant_id: str) -> List[Dict[str, Any]]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + checks = self._db.query(DSRExceptionCheckDB).filter( + DSRExceptionCheckDB.dsr_id == dsr.id, + ).order_by(DSRExceptionCheckDB.check_code).all() + return [ + { + "id": str(c.id), + "dsr_id": str(c.dsr_id), + "check_code": c.check_code, + "article": c.article, + "label": c.label, + "description": c.description, + "applies": c.applies, + "notes": c.notes, + "checked_by": c.checked_by, + "checked_at": c.checked_at.isoformat() if c.checked_at else None, + } + for c in checks + ] + + def init_exception_checks(self, dsr_id: str, tenant_id: str) -> List[Dict[str, Any]]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + if dsr.request_type != "erasure": + raise ValidationError("Exception checks only for erasure requests") + existing = self._db.query(DSRExceptionCheckDB).filter( + DSRExceptionCheckDB.dsr_id == dsr.id, + ).count() + if existing > 0: + raise ValidationError("Exception checks already initialized") + + checks = [] + for exc in ART17_EXCEPTIONS: + check = DSRExceptionCheckDB( + tenant_id=uuid.UUID(tenant_id), + dsr_id=dsr.id, + check_code=exc["check_code"], + article=exc["article"], + label=exc["label"], + description=exc["description"], + ) + self._db.add(check) + checks.append(check) + self._db.commit() + return [ + { + "id": str(c.id), + "dsr_id": str(c.dsr_id), + "check_code": c.check_code, + "article": c.article, + "label": c.label, + "description": c.description, + "applies": c.applies, + "notes": c.notes, + } + for c in checks + ] + + def update_exception_check( + self, dsr_id: str, check_id: str, body: UpdateExceptionCheck, tenant_id: str, + ) -> Dict[str, Any]: + dsr = _get_dsr_or_404(self._db, dsr_id, tenant_id) + try: + cid = uuid.UUID(check_id) + except ValueError: + raise ValidationError("Invalid check ID") + check = self._db.query(DSRExceptionCheckDB).filter( + DSRExceptionCheckDB.id == cid, + DSRExceptionCheckDB.dsr_id == dsr.id, + ).first() + if not check: + raise NotFoundError("Exception check not found") + check.applies = body.applies + check.notes = body.notes + check.checked_by = "admin" + check.checked_at = datetime.now(timezone.utc) + self._db.commit() + self._db.refresh(check) + return { + "id": str(check.id), + "dsr_id": str(check.dsr_id), + "check_code": check.check_code, + "article": check.article, + "label": check.label, + "description": check.description, + "applies": check.applies, + "notes": check.notes, + "checked_by": check.checked_by, + "checked_at": check.checked_at.isoformat() if check.checked_at else None, + } + + # -- Templates ----------------------------------------------------------- + + def get_templates(self, tenant_id: str) -> List[Dict[str, Any]]: + templates = self._db.query(DSRTemplateDB).filter( + DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), + ).order_by(DSRTemplateDB.template_type).all() + return [ + { + "id": str(t.id), + "name": t.name, + "template_type": t.template_type, + "request_type": t.request_type, + "language": t.language, + "is_active": t.is_active, + "created_at": t.created_at.isoformat() if t.created_at else None, + "updated_at": t.updated_at.isoformat() if t.updated_at else None, + } + for t in templates + ] + + def get_published_templates( + self, tenant_id: str, *, request_type: Optional[str] = None, language: str = "de", + ) -> List[Dict[str, Any]]: + query = self._db.query(DSRTemplateDB).filter( + DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), + DSRTemplateDB.is_active, + DSRTemplateDB.language == language, + ) + if request_type: + query = query.filter( + or_( + DSRTemplateDB.request_type == request_type, + DSRTemplateDB.request_type.is_(None), + ) + ) + templates = query.all() + result = [] + for t in templates: + latest = self._db.query(DSRTemplateVersionDB).filter( + DSRTemplateVersionDB.template_id == t.id, + DSRTemplateVersionDB.status == "published", + ).order_by(DSRTemplateVersionDB.created_at.desc()).first() + result.append({ + "id": str(t.id), + "name": t.name, + "template_type": t.template_type, + "request_type": t.request_type, + "language": t.language, + "latest_version": { + "id": str(latest.id), + "version": latest.version, + "subject": latest.subject, + "body_html": latest.body_html, + "body_text": latest.body_text, + } if latest else None, + }) + return result + + def get_template_versions(self, template_id: str, tenant_id: str) -> List[Dict[str, Any]]: + try: + tid = uuid.UUID(template_id) + except ValueError: + raise ValidationError("Invalid template ID") + template = self._db.query(DSRTemplateDB).filter( + DSRTemplateDB.id == tid, + DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), + ).first() + if not template: + raise NotFoundError("Template not found") + versions = self._db.query(DSRTemplateVersionDB).filter( + DSRTemplateVersionDB.template_id == tid, + ).order_by(DSRTemplateVersionDB.created_at.desc()).all() + return [ + { + "id": str(v.id), + "template_id": str(v.template_id), + "version": v.version, + "subject": v.subject, + "body_html": v.body_html, + "body_text": v.body_text, + "status": v.status, + "published_at": v.published_at.isoformat() if v.published_at else None, + "published_by": v.published_by, + "created_at": v.created_at.isoformat() if v.created_at else None, + "created_by": v.created_by, + } + for v in versions + ] + + def create_template_version( + self, template_id: str, body: CreateTemplateVersion, tenant_id: str, + ) -> Dict[str, Any]: + try: + tid = uuid.UUID(template_id) + except ValueError: + raise ValidationError("Invalid template ID") + template = self._db.query(DSRTemplateDB).filter( + DSRTemplateDB.id == tid, + DSRTemplateDB.tenant_id == uuid.UUID(tenant_id), + ).first() + if not template: + raise NotFoundError("Template not found") + version = DSRTemplateVersionDB( + template_id=tid, + version=body.version, + subject=body.subject, + body_html=body.body_html, + body_text=body.body_text, + status="draft", + ) + self._db.add(version) + self._db.commit() + self._db.refresh(version) + return { + "id": str(version.id), + "template_id": str(version.template_id), + "version": version.version, + "subject": version.subject, + "body_html": version.body_html, + "body_text": version.body_text, + "status": version.status, + "created_at": version.created_at.isoformat() if version.created_at else None, + } + + def publish_template_version(self, version_id: str, tenant_id: str) -> Dict[str, Any]: + try: + vid = uuid.UUID(version_id) + except ValueError: + raise ValidationError("Invalid version ID") + version = self._db.query(DSRTemplateVersionDB).filter( + DSRTemplateVersionDB.id == vid, + ).first() + if not version: + raise NotFoundError("Version not found") + now = datetime.now(timezone.utc) + version.status = "published" + version.published_at = now + version.published_by = "admin" + self._db.commit() + self._db.refresh(version) + return { + "id": str(version.id), + "template_id": str(version.template_id), + "version": version.version, + "status": version.status, + "published_at": version.published_at.isoformat(), + "published_by": version.published_by, + }