package services import ( "context" "encoding/json" "fmt" "strings" "time" "github.com/breakpilot/consent-service/internal/models" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" ) // DSRService handles Data Subject Request business logic type DSRService struct { pool *pgxpool.Pool notificationService *NotificationService emailService *EmailService } // NewDSRService creates a new DSRService func NewDSRService(pool *pgxpool.Pool, notificationService *NotificationService, emailService *EmailService) *DSRService { return &DSRService{ pool: pool, notificationService: notificationService, emailService: emailService, } } // GetPool returns the database pool for direct queries func (s *DSRService) GetPool() *pgxpool.Pool { return s.pool } // generateRequestNumber generates a unique request number like DSR-2025-000001 func (s *DSRService) generateRequestNumber(ctx context.Context) (string, error) { var seqNum int64 err := s.pool.QueryRow(ctx, "SELECT nextval('dsr_request_number_seq')").Scan(&seqNum) if err != nil { return "", fmt.Errorf("failed to get next sequence number: %w", err) } year := time.Now().Year() return fmt.Sprintf("DSR-%d-%06d", year, seqNum), nil } // CreateRequest creates a new data subject request func (s *DSRService) CreateRequest(ctx context.Context, req models.CreateDSRRequest, createdBy *uuid.UUID) (*models.DataSubjectRequest, error) { // Validate request type requestType := models.DSRRequestType(req.RequestType) if !isValidRequestType(requestType) { return nil, fmt.Errorf("invalid request type: %s", req.RequestType) } // Generate request number requestNumber, err := s.generateRequestNumber(ctx) if err != nil { return nil, err } // Calculate deadline deadlineDays := requestType.DeadlineDays() deadline := time.Now().AddDate(0, 0, deadlineDays) // Determine priority priority := models.DSRPriorityNormal if req.Priority != "" { priority = models.DSRPriority(req.Priority) } else if requestType.IsExpedited() { priority = models.DSRPriorityExpedited } // Determine source source := models.DSRSourceAPI if req.Source != "" { source = models.DSRSource(req.Source) } // Serialize request details detailsJSON, err := json.Marshal(req.RequestDetails) if err != nil { detailsJSON = []byte("{}") } // Try to find existing user by email var userID *uuid.UUID var foundUserID uuid.UUID err = s.pool.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", req.RequesterEmail).Scan(&foundUserID) if err == nil { userID = &foundUserID } // Insert request var dsr models.DataSubjectRequest err = s.pool.QueryRow(ctx, ` INSERT INTO data_subject_requests ( user_id, request_number, request_type, status, priority, source, requester_email, requester_name, requester_phone, request_details, deadline_at, legal_deadline_days, created_by ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id, user_id, request_number, request_type, status, priority, source, requester_email, requester_name, requester_phone, identity_verified, request_details, deadline_at, legal_deadline_days, created_at, updated_at, created_by `, userID, requestNumber, requestType, models.DSRStatusIntake, priority, source, req.RequesterEmail, req.RequesterName, req.RequesterPhone, detailsJSON, deadline, deadlineDays, createdBy, ).Scan( &dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status, &dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName, &dsr.RequesterPhone, &dsr.IdentityVerified, &detailsJSON, &dsr.DeadlineAt, &dsr.LegalDeadlineDays, &dsr.CreatedAt, &dsr.UpdatedAt, &dsr.CreatedBy, ) if err != nil { return nil, fmt.Errorf("failed to create DSR: %w", err) } // Parse details back json.Unmarshal(detailsJSON, &dsr.RequestDetails) // Record initial status s.recordStatusChange(ctx, dsr.ID, nil, models.DSRStatusIntake, createdBy, "Anfrage eingegangen") // Notify DPOs about new request go s.notifyNewRequest(context.Background(), &dsr) return &dsr, nil } // GetByID retrieves a DSR by ID func (s *DSRService) GetByID(ctx context.Context, id uuid.UUID) (*models.DataSubjectRequest, error) { var dsr models.DataSubjectRequest var detailsJSON, resultDataJSON []byte err := s.pool.QueryRow(ctx, ` SELECT id, user_id, request_number, request_type, status, priority, source, requester_email, requester_name, requester_phone, identity_verified, identity_verified_at, identity_verified_by, identity_verification_method, request_details, deadline_at, legal_deadline_days, extended_deadline_at, extension_reason, assigned_to, processing_notes, completed_at, completed_by, result_summary, result_data, rejected_at, rejected_by, rejection_reason, rejection_legal_basis, created_at, updated_at, created_by FROM data_subject_requests WHERE id = $1 `, id).Scan( &dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status, &dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName, &dsr.RequesterPhone, &dsr.IdentityVerified, &dsr.IdentityVerifiedAt, &dsr.IdentityVerifiedBy, &dsr.IdentityVerificationMethod, &detailsJSON, &dsr.DeadlineAt, &dsr.LegalDeadlineDays, &dsr.ExtendedDeadlineAt, &dsr.ExtensionReason, &dsr.AssignedTo, &dsr.ProcessingNotes, &dsr.CompletedAt, &dsr.CompletedBy, &dsr.ResultSummary, &resultDataJSON, &dsr.RejectedAt, &dsr.RejectedBy, &dsr.RejectionReason, &dsr.RejectionLegalBasis, &dsr.CreatedAt, &dsr.UpdatedAt, &dsr.CreatedBy, ) if err != nil { return nil, fmt.Errorf("DSR not found: %w", err) } json.Unmarshal(detailsJSON, &dsr.RequestDetails) json.Unmarshal(resultDataJSON, &dsr.ResultData) return &dsr, nil } // GetByNumber retrieves a DSR by request number func (s *DSRService) GetByNumber(ctx context.Context, requestNumber string) (*models.DataSubjectRequest, error) { var id uuid.UUID err := s.pool.QueryRow(ctx, "SELECT id FROM data_subject_requests WHERE request_number = $1", requestNumber).Scan(&id) if err != nil { return nil, fmt.Errorf("DSR not found: %w", err) } return s.GetByID(ctx, id) } // List retrieves DSRs with filters and pagination func (s *DSRService) List(ctx context.Context, filters models.DSRListFilters, limit, offset int) ([]models.DataSubjectRequest, int, error) { // Build query baseQuery := "FROM data_subject_requests WHERE 1=1" args := []interface{}{} argIndex := 1 if filters.Status != nil && *filters.Status != "" { baseQuery += fmt.Sprintf(" AND status = $%d", argIndex) args = append(args, *filters.Status) argIndex++ } if filters.RequestType != nil && *filters.RequestType != "" { baseQuery += fmt.Sprintf(" AND request_type = $%d", argIndex) args = append(args, *filters.RequestType) argIndex++ } if filters.AssignedTo != nil && *filters.AssignedTo != "" { baseQuery += fmt.Sprintf(" AND assigned_to = $%d", argIndex) args = append(args, *filters.AssignedTo) argIndex++ } if filters.Priority != nil && *filters.Priority != "" { baseQuery += fmt.Sprintf(" AND priority = $%d", argIndex) args = append(args, *filters.Priority) argIndex++ } if filters.OverdueOnly { baseQuery += " AND deadline_at < NOW() AND status NOT IN ('completed', 'rejected', 'cancelled')" } if filters.FromDate != nil { baseQuery += fmt.Sprintf(" AND created_at >= $%d", argIndex) args = append(args, *filters.FromDate) argIndex++ } if filters.ToDate != nil { baseQuery += fmt.Sprintf(" AND created_at <= $%d", argIndex) args = append(args, *filters.ToDate) argIndex++ } if filters.Search != nil && *filters.Search != "" { searchPattern := "%" + *filters.Search + "%" baseQuery += fmt.Sprintf(" AND (request_number ILIKE $%d OR requester_email ILIKE $%d OR requester_name ILIKE $%d)", argIndex, argIndex, argIndex) args = append(args, searchPattern) argIndex++ } // Get total count var total int err := s.pool.QueryRow(ctx, "SELECT COUNT(*) "+baseQuery, args...).Scan(&total) if err != nil { return nil, 0, fmt.Errorf("failed to count DSRs: %w", err) } // Get paginated results query := fmt.Sprintf(` SELECT id, user_id, request_number, request_type, status, priority, source, requester_email, requester_name, requester_phone, identity_verified, deadline_at, legal_deadline_days, assigned_to, created_at, updated_at %s ORDER BY created_at DESC LIMIT $%d OFFSET $%d `, baseQuery, argIndex, argIndex+1) args = append(args, limit, offset) rows, err := s.pool.Query(ctx, query, args...) if err != nil { return nil, 0, fmt.Errorf("failed to query DSRs: %w", err) } defer rows.Close() var dsrs []models.DataSubjectRequest for rows.Next() { var dsr models.DataSubjectRequest err := rows.Scan( &dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status, &dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName, &dsr.RequesterPhone, &dsr.IdentityVerified, &dsr.DeadlineAt, &dsr.LegalDeadlineDays, &dsr.AssignedTo, &dsr.CreatedAt, &dsr.UpdatedAt, ) if err != nil { return nil, 0, fmt.Errorf("failed to scan DSR: %w", err) } dsrs = append(dsrs, dsr) } return dsrs, total, nil } // ListByUser retrieves DSRs for a specific user func (s *DSRService) ListByUser(ctx context.Context, userID uuid.UUID) ([]models.DataSubjectRequest, error) { rows, err := s.pool.Query(ctx, ` SELECT id, user_id, request_number, request_type, status, priority, source, requester_email, requester_name, deadline_at, created_at, updated_at FROM data_subject_requests WHERE user_id = $1 OR requester_email = (SELECT email FROM users WHERE id = $1) ORDER BY created_at DESC `, userID) if err != nil { return nil, fmt.Errorf("failed to query user DSRs: %w", err) } defer rows.Close() var dsrs []models.DataSubjectRequest for rows.Next() { var dsr models.DataSubjectRequest err := rows.Scan( &dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status, &dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName, &dsr.DeadlineAt, &dsr.CreatedAt, &dsr.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to scan DSR: %w", err) } dsrs = append(dsrs, dsr) } return dsrs, nil } // UpdateStatus changes the status of a DSR func (s *DSRService) UpdateStatus(ctx context.Context, id uuid.UUID, newStatus models.DSRStatus, comment string, changedBy *uuid.UUID) error { // Get current status var currentStatus models.DSRStatus err := s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus) if err != nil { return fmt.Errorf("DSR not found: %w", err) } // Validate transition if !isValidStatusTransition(currentStatus, newStatus) { return fmt.Errorf("invalid status transition from %s to %s", currentStatus, newStatus) } // Update status _, err = s.pool.Exec(ctx, ` UPDATE data_subject_requests SET status = $1, updated_at = NOW() WHERE id = $2 `, newStatus, id) if err != nil { return fmt.Errorf("failed to update status: %w", err) } // Record status change s.recordStatusChange(ctx, id, ¤tStatus, newStatus, changedBy, comment) return nil } // VerifyIdentity marks identity as verified func (s *DSRService) VerifyIdentity(ctx context.Context, id uuid.UUID, method string, verifiedBy uuid.UUID) error { _, err := s.pool.Exec(ctx, ` UPDATE data_subject_requests SET identity_verified = TRUE, identity_verified_at = NOW(), identity_verified_by = $1, identity_verification_method = $2, status = CASE WHEN status = 'intake' THEN 'identity_verification' ELSE status END, updated_at = NOW() WHERE id = $3 `, verifiedBy, method, id) if err != nil { return fmt.Errorf("failed to verify identity: %w", err) } s.recordStatusChange(ctx, id, nil, models.DSRStatusIdentityVerification, &verifiedBy, "Identität verifiziert via "+method) return nil } // AssignRequest assigns a DSR to a handler func (s *DSRService) AssignRequest(ctx context.Context, id uuid.UUID, assigneeID uuid.UUID, assignedBy uuid.UUID) error { _, err := s.pool.Exec(ctx, ` UPDATE data_subject_requests SET assigned_to = $1, updated_at = NOW() WHERE id = $2 `, assigneeID, id) if err != nil { return fmt.Errorf("failed to assign DSR: %w", err) } // Get assignee name for comment var assigneeName string s.pool.QueryRow(ctx, "SELECT COALESCE(name, email) FROM users WHERE id = $1", assigneeID).Scan(&assigneeName) s.recordStatusChange(ctx, id, nil, "", &assignedBy, "Zugewiesen an "+assigneeName) // Notify assignee go s.notifyAssignment(context.Background(), id, assigneeID) return nil } // ExtendDeadline extends the deadline for a DSR func (s *DSRService) ExtendDeadline(ctx context.Context, id uuid.UUID, reason string, days int, extendedBy uuid.UUID) error { // Default extension is 2 months (60 days) per Art. 12(3) if days <= 0 { days = 60 } _, err := s.pool.Exec(ctx, ` UPDATE data_subject_requests SET extended_deadline_at = deadline_at + ($1 || ' days')::INTERVAL, extension_reason = $2, updated_at = NOW() WHERE id = $3 `, days, reason, id) if err != nil { return fmt.Errorf("failed to extend deadline: %w", err) } s.recordStatusChange(ctx, id, nil, "", &extendedBy, fmt.Sprintf("Frist um %d Tage verlängert: %s", days, reason)) return nil } // CompleteRequest marks a DSR as completed func (s *DSRService) CompleteRequest(ctx context.Context, id uuid.UUID, summary string, resultData map[string]interface{}, completedBy uuid.UUID) error { resultJSON, _ := json.Marshal(resultData) // Get current status var currentStatus models.DSRStatus s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus) _, err := s.pool.Exec(ctx, ` UPDATE data_subject_requests SET status = 'completed', completed_at = NOW(), completed_by = $1, result_summary = $2, result_data = $3, updated_at = NOW() WHERE id = $4 `, completedBy, summary, resultJSON, id) if err != nil { return fmt.Errorf("failed to complete DSR: %w", err) } s.recordStatusChange(ctx, id, ¤tStatus, models.DSRStatusCompleted, &completedBy, summary) return nil } // RejectRequest rejects a DSR with legal basis func (s *DSRService) RejectRequest(ctx context.Context, id uuid.UUID, reason, legalBasis string, rejectedBy uuid.UUID) error { // Get current status var currentStatus models.DSRStatus s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus) _, err := s.pool.Exec(ctx, ` UPDATE data_subject_requests SET status = 'rejected', rejected_at = NOW(), rejected_by = $1, rejection_reason = $2, rejection_legal_basis = $3, updated_at = NOW() WHERE id = $4 `, rejectedBy, reason, legalBasis, id) if err != nil { return fmt.Errorf("failed to reject DSR: %w", err) } s.recordStatusChange(ctx, id, ¤tStatus, models.DSRStatusRejected, &rejectedBy, fmt.Sprintf("Abgelehnt (%s): %s", legalBasis, reason)) return nil } // CancelRequest cancels a DSR (by user) func (s *DSRService) CancelRequest(ctx context.Context, id uuid.UUID, cancelledBy uuid.UUID) error { // Verify ownership var userID *uuid.UUID err := s.pool.QueryRow(ctx, "SELECT user_id FROM data_subject_requests WHERE id = $1", id).Scan(&userID) if err != nil { return fmt.Errorf("DSR not found: %w", err) } if userID == nil || *userID != cancelledBy { return fmt.Errorf("unauthorized: can only cancel own requests") } // Get current status var currentStatus models.DSRStatus s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus) _, err = s.pool.Exec(ctx, ` UPDATE data_subject_requests SET status = 'cancelled', updated_at = NOW() WHERE id = $1 `, id) if err != nil { return fmt.Errorf("failed to cancel DSR: %w", err) } s.recordStatusChange(ctx, id, ¤tStatus, models.DSRStatusCancelled, &cancelledBy, "Vom Antragsteller storniert") return nil } // GetDashboardStats returns statistics for the admin dashboard func (s *DSRService) GetDashboardStats(ctx context.Context) (*models.DSRDashboardStats, error) { stats := &models.DSRDashboardStats{ ByType: make(map[string]int), ByStatus: make(map[string]int), } // Total requests s.pool.QueryRow(ctx, "SELECT COUNT(*) FROM data_subject_requests").Scan(&stats.TotalRequests) // Pending requests (not completed, rejected, or cancelled) s.pool.QueryRow(ctx, ` SELECT COUNT(*) FROM data_subject_requests WHERE status NOT IN ('completed', 'rejected', 'cancelled') `).Scan(&stats.PendingRequests) // Overdue requests s.pool.QueryRow(ctx, ` SELECT COUNT(*) FROM data_subject_requests WHERE COALESCE(extended_deadline_at, deadline_at) < NOW() AND status NOT IN ('completed', 'rejected', 'cancelled') `).Scan(&stats.OverdueRequests) // Completed this month s.pool.QueryRow(ctx, ` SELECT COUNT(*) FROM data_subject_requests WHERE status = 'completed' AND completed_at >= DATE_TRUNC('month', NOW()) `).Scan(&stats.CompletedThisMonth) // Average processing days s.pool.QueryRow(ctx, ` SELECT COALESCE(AVG(EXTRACT(EPOCH FROM (completed_at - created_at)) / 86400), 0) FROM data_subject_requests WHERE status = 'completed' `).Scan(&stats.AverageProcessingDays) // Count by type rows, _ := s.pool.Query(ctx, ` SELECT request_type, COUNT(*) FROM data_subject_requests GROUP BY request_type `) for rows.Next() { var t string var count int rows.Scan(&t, &count) stats.ByType[t] = count } rows.Close() // Count by status rows, _ = s.pool.Query(ctx, ` SELECT status, COUNT(*) FROM data_subject_requests GROUP BY status `) for rows.Next() { var s string var count int rows.Scan(&s, &count) stats.ByStatus[s] = count } rows.Close() // Upcoming deadlines (next 7 days) rows, _ = s.pool.Query(ctx, ` SELECT id, request_number, request_type, status, requester_email, deadline_at FROM data_subject_requests WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN NOW() AND NOW() + INTERVAL '7 days' AND status NOT IN ('completed', 'rejected', 'cancelled') ORDER BY deadline_at ASC LIMIT 10 `) for rows.Next() { var dsr models.DataSubjectRequest rows.Scan(&dsr.ID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status, &dsr.RequesterEmail, &dsr.DeadlineAt) stats.UpcomingDeadlines = append(stats.UpcomingDeadlines, dsr) } rows.Close() return stats, nil } // GetStatusHistory retrieves the status history for a DSR func (s *DSRService) GetStatusHistory(ctx context.Context, requestID uuid.UUID) ([]models.DSRStatusHistory, error) { rows, err := s.pool.Query(ctx, ` SELECT id, request_id, from_status, to_status, changed_by, comment, metadata, created_at FROM dsr_status_history WHERE request_id = $1 ORDER BY created_at DESC `, requestID) if err != nil { return nil, fmt.Errorf("failed to query status history: %w", err) } defer rows.Close() var history []models.DSRStatusHistory for rows.Next() { var h models.DSRStatusHistory var metadataJSON []byte err := rows.Scan(&h.ID, &h.RequestID, &h.FromStatus, &h.ToStatus, &h.ChangedBy, &h.Comment, &metadataJSON, &h.CreatedAt) if err != nil { continue } json.Unmarshal(metadataJSON, &h.Metadata) history = append(history, h) } return history, nil } // GetCommunications retrieves communications for a DSR func (s *DSRService) GetCommunications(ctx context.Context, requestID uuid.UUID) ([]models.DSRCommunication, error) { rows, err := s.pool.Query(ctx, ` SELECT id, request_id, direction, channel, communication_type, template_version_id, subject, body_html, body_text, recipient_email, sent_at, error_message, attachments, created_at, created_by FROM dsr_communications WHERE request_id = $1 ORDER BY created_at DESC `, requestID) if err != nil { return nil, fmt.Errorf("failed to query communications: %w", err) } defer rows.Close() var comms []models.DSRCommunication for rows.Next() { var c models.DSRCommunication var attachmentsJSON []byte err := rows.Scan(&c.ID, &c.RequestID, &c.Direction, &c.Channel, &c.CommunicationType, &c.TemplateVersionID, &c.Subject, &c.BodyHTML, &c.BodyText, &c.RecipientEmail, &c.SentAt, &c.ErrorMessage, &attachmentsJSON, &c.CreatedAt, &c.CreatedBy) if err != nil { continue } json.Unmarshal(attachmentsJSON, &c.Attachments) comms = append(comms, c) } return comms, nil } // SendCommunication sends a communication for a DSR func (s *DSRService) SendCommunication(ctx context.Context, requestID uuid.UUID, req models.SendDSRCommunicationRequest, sentBy uuid.UUID) error { // Get DSR details dsr, err := s.GetByID(ctx, requestID) if err != nil { return err } // Get template if specified var subject, bodyHTML, bodyText string if req.TemplateVersionID != nil { templateVersionID, _ := uuid.Parse(*req.TemplateVersionID) err := s.pool.QueryRow(ctx, ` SELECT subject, body_html, body_text FROM dsr_template_versions WHERE id = $1 AND status = 'published' `, templateVersionID).Scan(&subject, &bodyHTML, &bodyText) if err != nil { return fmt.Errorf("template version not found or not published: %w", err) } } // Use custom content if provided if req.CustomSubject != nil { subject = *req.CustomSubject } if req.CustomBody != nil { bodyHTML = *req.CustomBody bodyText = stripHTML(*req.CustomBody) } // Replace variables variables := map[string]string{ "requester_name": stringOrDefault(dsr.RequesterName, "Antragsteller/in"), "request_number": dsr.RequestNumber, "request_type_de": dsr.RequestType.Label(), "request_date": dsr.CreatedAt.Format("02.01.2006"), "deadline_date": dsr.DeadlineAt.Format("02.01.2006"), } for k, v := range req.Variables { variables[k] = v } subject = replaceVariables(subject, variables) bodyHTML = replaceVariables(bodyHTML, variables) bodyText = replaceVariables(bodyText, variables) // Send email if s.emailService != nil { err = s.emailService.SendEmail(dsr.RequesterEmail, subject, bodyHTML, bodyText) if err != nil { // Log error but continue _, _ = s.pool.Exec(ctx, ` INSERT INTO dsr_communications (request_id, direction, channel, communication_type, template_version_id, subject, body_html, body_text, recipient_email, error_message, created_by) VALUES ($1, 'outbound', 'email', $2, $3, $4, $5, $6, $7, $8, $9) `, requestID, req.CommunicationType, req.TemplateVersionID, subject, bodyHTML, bodyText, dsr.RequesterEmail, err.Error(), sentBy) return fmt.Errorf("failed to send email: %w", err) } } // Log communication now := time.Now() _, err = s.pool.Exec(ctx, ` INSERT INTO dsr_communications (request_id, direction, channel, communication_type, template_version_id, subject, body_html, body_text, recipient_email, sent_at, created_by) VALUES ($1, 'outbound', 'email', $2, $3, $4, $5, $6, $7, $8, $9) `, requestID, req.CommunicationType, req.TemplateVersionID, subject, bodyHTML, bodyText, dsr.RequesterEmail, now, sentBy) return err } // InitErasureExceptionChecks initializes exception checks for an erasure request func (s *DSRService) InitErasureExceptionChecks(ctx context.Context, requestID uuid.UUID) error { exceptions := []struct { Type string Description string }{ {models.DSRExceptionFreedomExpression, "Ausübung des Rechts auf freie Meinungsäußerung und Information (Art. 17 Abs. 3 lit. a)"}, {models.DSRExceptionLegalObligation, "Erfüllung einer rechtlichen Verpflichtung oder öffentlichen Aufgabe (Art. 17 Abs. 3 lit. b)"}, {models.DSRExceptionPublicHealth, "Gründe des öffentlichen Interesses im Bereich der öffentlichen Gesundheit (Art. 17 Abs. 3 lit. c)"}, {models.DSRExceptionArchiving, "Im öffentlichen Interesse liegende Archivzwecke, Forschung oder Statistik (Art. 17 Abs. 3 lit. d)"}, {models.DSRExceptionLegalClaims, "Geltendmachung, Ausübung oder Verteidigung von Rechtsansprüchen (Art. 17 Abs. 3 lit. e)"}, } for _, exc := range exceptions { _, err := s.pool.Exec(ctx, ` INSERT INTO dsr_exception_checks (request_id, exception_type, description) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING `, requestID, exc.Type, exc.Description) if err != nil { return fmt.Errorf("failed to create exception check: %w", err) } } return nil } // GetExceptionChecks retrieves exception checks for a DSR func (s *DSRService) GetExceptionChecks(ctx context.Context, requestID uuid.UUID) ([]models.DSRExceptionCheck, error) { rows, err := s.pool.Query(ctx, ` SELECT id, request_id, exception_type, description, applies, checked_by, checked_at, notes, created_at FROM dsr_exception_checks WHERE request_id = $1 ORDER BY created_at `, requestID) if err != nil { return nil, fmt.Errorf("failed to query exception checks: %w", err) } defer rows.Close() var checks []models.DSRExceptionCheck for rows.Next() { var c models.DSRExceptionCheck err := rows.Scan(&c.ID, &c.RequestID, &c.ExceptionType, &c.Description, &c.Applies, &c.CheckedBy, &c.CheckedAt, &c.Notes, &c.CreatedAt) if err != nil { continue } checks = append(checks, c) } return checks, nil } // UpdateExceptionCheck updates an exception check func (s *DSRService) UpdateExceptionCheck(ctx context.Context, checkID uuid.UUID, applies bool, notes *string, checkedBy uuid.UUID) error { _, err := s.pool.Exec(ctx, ` UPDATE dsr_exception_checks SET applies = $1, notes = $2, checked_by = $3, checked_at = NOW() WHERE id = $4 `, applies, notes, checkedBy, checkID) return err } // ProcessDeadlines checks for approaching and overdue deadlines func (s *DSRService) ProcessDeadlines(ctx context.Context) error { now := time.Now() // Find requests with deadlines in 3 days threeDaysAhead := now.AddDate(0, 0, 3) rows, _ := s.pool.Query(ctx, ` SELECT id, request_number, request_type, assigned_to, deadline_at FROM data_subject_requests WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN $1 AND $2 AND status NOT IN ('completed', 'rejected', 'cancelled') `, now, threeDaysAhead) for rows.Next() { var id uuid.UUID var requestNumber, requestType string var assignedTo *uuid.UUID var deadline time.Time rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline) // Notify assigned user or all DPOs if assignedTo != nil { s.notifyDeadlineWarning(ctx, id, *assignedTo, requestNumber, deadline, 3) } else { s.notifyAllDPOs(ctx, id, requestNumber, "Frist in 3 Tagen", deadline) } } rows.Close() // Find requests with deadlines in 1 day oneDayAhead := now.AddDate(0, 0, 1) rows, _ = s.pool.Query(ctx, ` SELECT id, request_number, request_type, assigned_to, deadline_at FROM data_subject_requests WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN $1 AND $2 AND status NOT IN ('completed', 'rejected', 'cancelled') `, now, oneDayAhead) for rows.Next() { var id uuid.UUID var requestNumber, requestType string var assignedTo *uuid.UUID var deadline time.Time rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline) if assignedTo != nil { s.notifyDeadlineWarning(ctx, id, *assignedTo, requestNumber, deadline, 1) } else { s.notifyAllDPOs(ctx, id, requestNumber, "Frist morgen!", deadline) } } rows.Close() // Find overdue requests rows, _ = s.pool.Query(ctx, ` SELECT id, request_number, request_type, assigned_to, deadline_at FROM data_subject_requests WHERE COALESCE(extended_deadline_at, deadline_at) < $1 AND status NOT IN ('completed', 'rejected', 'cancelled') `, now) for rows.Next() { var id uuid.UUID var requestNumber, requestType string var assignedTo *uuid.UUID var deadline time.Time rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline) // Notify all DPOs for overdue s.notifyAllDPOs(ctx, id, requestNumber, "ÜBERFÄLLIG!", deadline) // Log to audit s.pool.Exec(ctx, ` INSERT INTO consent_audit_log (action, entity_type, entity_id, details) VALUES ('dsr_overdue', 'dsr', $1, $2) `, id, fmt.Sprintf(`{"request_number": "%s", "deadline": "%s"}`, requestNumber, deadline.Format(time.RFC3339))) } rows.Close() return nil } // Helper functions func (s *DSRService) recordStatusChange(ctx context.Context, requestID uuid.UUID, fromStatus *models.DSRStatus, toStatus models.DSRStatus, changedBy *uuid.UUID, comment string) { s.pool.Exec(ctx, ` INSERT INTO dsr_status_history (request_id, from_status, to_status, changed_by, comment) VALUES ($1, $2, $3, $4, $5) `, requestID, fromStatus, toStatus, changedBy, comment) } func (s *DSRService) notifyNewRequest(ctx context.Context, dsr *models.DataSubjectRequest) { if s.notificationService == nil { return } // Notify all DPOs rows, _ := s.pool.Query(ctx, "SELECT id FROM users WHERE role = 'data_protection_officer'") defer rows.Close() for rows.Next() { var userID uuid.UUID rows.Scan(&userID) s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRReceived, "Neue Betroffenenanfrage", fmt.Sprintf("Neue %s eingegangen: %s", dsr.RequestType.Label(), dsr.RequestNumber), map[string]interface{}{"dsr_id": dsr.ID, "request_number": dsr.RequestNumber}) } } func (s *DSRService) notifyAssignment(ctx context.Context, dsrID, assigneeID uuid.UUID) { if s.notificationService == nil { return } dsr, _ := s.GetByID(ctx, dsrID) if dsr != nil { s.notificationService.CreateNotification(ctx, assigneeID, NotificationTypeDSRAssigned, "Betroffenenanfrage zugewiesen", fmt.Sprintf("Ihnen wurde die Anfrage %s zugewiesen", dsr.RequestNumber), map[string]interface{}{"dsr_id": dsrID, "request_number": dsr.RequestNumber}) } } func (s *DSRService) notifyDeadlineWarning(ctx context.Context, dsrID, userID uuid.UUID, requestNumber string, deadline time.Time, daysLeft int) { if s.notificationService == nil { return } s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRDeadline, fmt.Sprintf("Fristwarnung: %s", requestNumber), fmt.Sprintf("Die Frist für %s läuft in %d Tag(en) ab (%s)", requestNumber, daysLeft, deadline.Format("02.01.2006")), map[string]interface{}{"dsr_id": dsrID, "deadline": deadline, "days_left": daysLeft}) } func (s *DSRService) notifyAllDPOs(ctx context.Context, dsrID uuid.UUID, requestNumber, message string, deadline time.Time) { if s.notificationService == nil { return } rows, _ := s.pool.Query(ctx, "SELECT id FROM users WHERE role = 'data_protection_officer'") defer rows.Close() for rows.Next() { var userID uuid.UUID rows.Scan(&userID) s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRDeadline, fmt.Sprintf("%s: %s", message, requestNumber), fmt.Sprintf("Anfrage %s: %s (Frist: %s)", requestNumber, message, deadline.Format("02.01.2006")), map[string]interface{}{"dsr_id": dsrID, "deadline": deadline}) } } func isValidRequestType(rt models.DSRRequestType) bool { switch rt { case models.DSRTypeAccess, models.DSRTypeRectification, models.DSRTypeErasure, models.DSRTypeRestriction, models.DSRTypePortability: return true } return false } func isValidStatusTransition(from, to models.DSRStatus) bool { validTransitions := map[models.DSRStatus][]models.DSRStatus{ models.DSRStatusIntake: {models.DSRStatusIdentityVerification, models.DSRStatusProcessing, models.DSRStatusRejected, models.DSRStatusCancelled}, models.DSRStatusIdentityVerification: {models.DSRStatusProcessing, models.DSRStatusRejected, models.DSRStatusCancelled}, models.DSRStatusProcessing: {models.DSRStatusCompleted, models.DSRStatusRejected, models.DSRStatusCancelled}, models.DSRStatusCompleted: {}, models.DSRStatusRejected: {}, models.DSRStatusCancelled: {}, } allowed, exists := validTransitions[from] if !exists { return false } for _, s := range allowed { if s == to { return true } } return false } func stringOrDefault(s *string, def string) string { if s != nil { return *s } return def } func replaceVariables(text string, variables map[string]string) string { for k, v := range variables { text = strings.ReplaceAll(text, "{{"+k+"}}", v) } return text } func stripHTML(html string) string { // Simple HTML stripping - in production use a proper library text := strings.ReplaceAll(html, "
", "\n") text = strings.ReplaceAll(text, "
", "\n") text = strings.ReplaceAll(text, "
", "\n") text = strings.ReplaceAll(text, "

", "\n\n") // Remove all remaining tags for { start := strings.Index(text, "<") if start == -1 { break } end := strings.Index(text[start:], ">") if end == -1 { break } text = text[:start] + text[start+end+1:] } return strings.TrimSpace(text) }