Files
breakpilot-core/consent-service/internal/services/dsr_service.go
Benjamin Boenisch ad111d5e69 Initial commit: breakpilot-core - Shared Infrastructure
Docker Compose with 24+ services:
- PostgreSQL (PostGIS), Valkey, MinIO, Qdrant
- Vault (PKI/TLS), Nginx (Reverse Proxy)
- Backend Core API, Consent Service, Billing Service
- RAG Service, Embedding Service
- Gitea, Woodpecker CI/CD
- Night Scheduler, Health Aggregator
- Jitsi (Web/XMPP/JVB/Jicofo), Mailpit

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:13 +01:00

948 lines
32 KiB
Go

package services
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/breakpilot/consent-service/internal/models"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
)
// DSRService handles Data Subject Request business logic
type DSRService struct {
pool *pgxpool.Pool
notificationService *NotificationService
emailService *EmailService
}
// NewDSRService creates a new DSRService
func NewDSRService(pool *pgxpool.Pool, notificationService *NotificationService, emailService *EmailService) *DSRService {
return &DSRService{
pool: pool,
notificationService: notificationService,
emailService: emailService,
}
}
// GetPool returns the database pool for direct queries
func (s *DSRService) GetPool() *pgxpool.Pool {
return s.pool
}
// generateRequestNumber generates a unique request number like DSR-2025-000001
func (s *DSRService) generateRequestNumber(ctx context.Context) (string, error) {
var seqNum int64
err := s.pool.QueryRow(ctx, "SELECT nextval('dsr_request_number_seq')").Scan(&seqNum)
if err != nil {
return "", fmt.Errorf("failed to get next sequence number: %w", err)
}
year := time.Now().Year()
return fmt.Sprintf("DSR-%d-%06d", year, seqNum), nil
}
// CreateRequest creates a new data subject request
func (s *DSRService) CreateRequest(ctx context.Context, req models.CreateDSRRequest, createdBy *uuid.UUID) (*models.DataSubjectRequest, error) {
// Validate request type
requestType := models.DSRRequestType(req.RequestType)
if !isValidRequestType(requestType) {
return nil, fmt.Errorf("invalid request type: %s", req.RequestType)
}
// Generate request number
requestNumber, err := s.generateRequestNumber(ctx)
if err != nil {
return nil, err
}
// Calculate deadline
deadlineDays := requestType.DeadlineDays()
deadline := time.Now().AddDate(0, 0, deadlineDays)
// Determine priority
priority := models.DSRPriorityNormal
if req.Priority != "" {
priority = models.DSRPriority(req.Priority)
} else if requestType.IsExpedited() {
priority = models.DSRPriorityExpedited
}
// Determine source
source := models.DSRSourceAPI
if req.Source != "" {
source = models.DSRSource(req.Source)
}
// Serialize request details
detailsJSON, err := json.Marshal(req.RequestDetails)
if err != nil {
detailsJSON = []byte("{}")
}
// Try to find existing user by email
var userID *uuid.UUID
var foundUserID uuid.UUID
err = s.pool.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", req.RequesterEmail).Scan(&foundUserID)
if err == nil {
userID = &foundUserID
}
// Insert request
var dsr models.DataSubjectRequest
err = s.pool.QueryRow(ctx, `
INSERT INTO data_subject_requests (
user_id, request_number, request_type, status, priority, source,
requester_email, requester_name, requester_phone,
request_details, deadline_at, legal_deadline_days, created_by
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING id, user_id, request_number, request_type, status, priority, source,
requester_email, requester_name, requester_phone, identity_verified,
request_details, deadline_at, legal_deadline_days, created_at, updated_at, created_by
`, userID, requestNumber, requestType, models.DSRStatusIntake, priority, source,
req.RequesterEmail, req.RequesterName, req.RequesterPhone,
detailsJSON, deadline, deadlineDays, createdBy,
).Scan(
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
&dsr.RequesterPhone, &dsr.IdentityVerified, &detailsJSON,
&dsr.DeadlineAt, &dsr.LegalDeadlineDays, &dsr.CreatedAt, &dsr.UpdatedAt, &dsr.CreatedBy,
)
if err != nil {
return nil, fmt.Errorf("failed to create DSR: %w", err)
}
// Parse details back
json.Unmarshal(detailsJSON, &dsr.RequestDetails)
// Record initial status
s.recordStatusChange(ctx, dsr.ID, nil, models.DSRStatusIntake, createdBy, "Anfrage eingegangen")
// Notify DPOs about new request
go s.notifyNewRequest(context.Background(), &dsr)
return &dsr, nil
}
// GetByID retrieves a DSR by ID
func (s *DSRService) GetByID(ctx context.Context, id uuid.UUID) (*models.DataSubjectRequest, error) {
var dsr models.DataSubjectRequest
var detailsJSON, resultDataJSON []byte
err := s.pool.QueryRow(ctx, `
SELECT id, user_id, request_number, request_type, status, priority, source,
requester_email, requester_name, requester_phone,
identity_verified, identity_verified_at, identity_verified_by, identity_verification_method,
request_details, deadline_at, legal_deadline_days, extended_deadline_at, extension_reason,
assigned_to, processing_notes, completed_at, completed_by, result_summary, result_data,
rejected_at, rejected_by, rejection_reason, rejection_legal_basis,
created_at, updated_at, created_by
FROM data_subject_requests WHERE id = $1
`, id).Scan(
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
&dsr.RequesterPhone, &dsr.IdentityVerified, &dsr.IdentityVerifiedAt,
&dsr.IdentityVerifiedBy, &dsr.IdentityVerificationMethod,
&detailsJSON, &dsr.DeadlineAt, &dsr.LegalDeadlineDays,
&dsr.ExtendedDeadlineAt, &dsr.ExtensionReason, &dsr.AssignedTo,
&dsr.ProcessingNotes, &dsr.CompletedAt, &dsr.CompletedBy,
&dsr.ResultSummary, &resultDataJSON, &dsr.RejectedAt, &dsr.RejectedBy,
&dsr.RejectionReason, &dsr.RejectionLegalBasis,
&dsr.CreatedAt, &dsr.UpdatedAt, &dsr.CreatedBy,
)
if err != nil {
return nil, fmt.Errorf("DSR not found: %w", err)
}
json.Unmarshal(detailsJSON, &dsr.RequestDetails)
json.Unmarshal(resultDataJSON, &dsr.ResultData)
return &dsr, nil
}
// GetByNumber retrieves a DSR by request number
func (s *DSRService) GetByNumber(ctx context.Context, requestNumber string) (*models.DataSubjectRequest, error) {
var id uuid.UUID
err := s.pool.QueryRow(ctx, "SELECT id FROM data_subject_requests WHERE request_number = $1", requestNumber).Scan(&id)
if err != nil {
return nil, fmt.Errorf("DSR not found: %w", err)
}
return s.GetByID(ctx, id)
}
// List retrieves DSRs with filters and pagination
func (s *DSRService) List(ctx context.Context, filters models.DSRListFilters, limit, offset int) ([]models.DataSubjectRequest, int, error) {
// Build query
baseQuery := "FROM data_subject_requests WHERE 1=1"
args := []interface{}{}
argIndex := 1
if filters.Status != nil && *filters.Status != "" {
baseQuery += fmt.Sprintf(" AND status = $%d", argIndex)
args = append(args, *filters.Status)
argIndex++
}
if filters.RequestType != nil && *filters.RequestType != "" {
baseQuery += fmt.Sprintf(" AND request_type = $%d", argIndex)
args = append(args, *filters.RequestType)
argIndex++
}
if filters.AssignedTo != nil && *filters.AssignedTo != "" {
baseQuery += fmt.Sprintf(" AND assigned_to = $%d", argIndex)
args = append(args, *filters.AssignedTo)
argIndex++
}
if filters.Priority != nil && *filters.Priority != "" {
baseQuery += fmt.Sprintf(" AND priority = $%d", argIndex)
args = append(args, *filters.Priority)
argIndex++
}
if filters.OverdueOnly {
baseQuery += " AND deadline_at < NOW() AND status NOT IN ('completed', 'rejected', 'cancelled')"
}
if filters.FromDate != nil {
baseQuery += fmt.Sprintf(" AND created_at >= $%d", argIndex)
args = append(args, *filters.FromDate)
argIndex++
}
if filters.ToDate != nil {
baseQuery += fmt.Sprintf(" AND created_at <= $%d", argIndex)
args = append(args, *filters.ToDate)
argIndex++
}
if filters.Search != nil && *filters.Search != "" {
searchPattern := "%" + *filters.Search + "%"
baseQuery += fmt.Sprintf(" AND (request_number ILIKE $%d OR requester_email ILIKE $%d OR requester_name ILIKE $%d)", argIndex, argIndex, argIndex)
args = append(args, searchPattern)
argIndex++
}
// Get total count
var total int
err := s.pool.QueryRow(ctx, "SELECT COUNT(*) "+baseQuery, args...).Scan(&total)
if err != nil {
return nil, 0, fmt.Errorf("failed to count DSRs: %w", err)
}
// Get paginated results
query := fmt.Sprintf(`
SELECT id, user_id, request_number, request_type, status, priority, source,
requester_email, requester_name, requester_phone, identity_verified,
deadline_at, legal_deadline_days, assigned_to, created_at, updated_at
%s ORDER BY created_at DESC LIMIT $%d OFFSET $%d
`, baseQuery, argIndex, argIndex+1)
args = append(args, limit, offset)
rows, err := s.pool.Query(ctx, query, args...)
if err != nil {
return nil, 0, fmt.Errorf("failed to query DSRs: %w", err)
}
defer rows.Close()
var dsrs []models.DataSubjectRequest
for rows.Next() {
var dsr models.DataSubjectRequest
err := rows.Scan(
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
&dsr.RequesterPhone, &dsr.IdentityVerified, &dsr.DeadlineAt,
&dsr.LegalDeadlineDays, &dsr.AssignedTo, &dsr.CreatedAt, &dsr.UpdatedAt,
)
if err != nil {
return nil, 0, fmt.Errorf("failed to scan DSR: %w", err)
}
dsrs = append(dsrs, dsr)
}
return dsrs, total, nil
}
// ListByUser retrieves DSRs for a specific user
func (s *DSRService) ListByUser(ctx context.Context, userID uuid.UUID) ([]models.DataSubjectRequest, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, user_id, request_number, request_type, status, priority, source,
requester_email, requester_name, deadline_at, created_at, updated_at
FROM data_subject_requests
WHERE user_id = $1 OR requester_email = (SELECT email FROM users WHERE id = $1)
ORDER BY created_at DESC
`, userID)
if err != nil {
return nil, fmt.Errorf("failed to query user DSRs: %w", err)
}
defer rows.Close()
var dsrs []models.DataSubjectRequest
for rows.Next() {
var dsr models.DataSubjectRequest
err := rows.Scan(
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
&dsr.DeadlineAt, &dsr.CreatedAt, &dsr.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to scan DSR: %w", err)
}
dsrs = append(dsrs, dsr)
}
return dsrs, nil
}
// UpdateStatus changes the status of a DSR
func (s *DSRService) UpdateStatus(ctx context.Context, id uuid.UUID, newStatus models.DSRStatus, comment string, changedBy *uuid.UUID) error {
// Get current status
var currentStatus models.DSRStatus
err := s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(&currentStatus)
if err != nil {
return fmt.Errorf("DSR not found: %w", err)
}
// Validate transition
if !isValidStatusTransition(currentStatus, newStatus) {
return fmt.Errorf("invalid status transition from %s to %s", currentStatus, newStatus)
}
// Update status
_, err = s.pool.Exec(ctx, `
UPDATE data_subject_requests SET status = $1, updated_at = NOW() WHERE id = $2
`, newStatus, id)
if err != nil {
return fmt.Errorf("failed to update status: %w", err)
}
// Record status change
s.recordStatusChange(ctx, id, &currentStatus, newStatus, changedBy, comment)
return nil
}
// VerifyIdentity marks identity as verified
func (s *DSRService) VerifyIdentity(ctx context.Context, id uuid.UUID, method string, verifiedBy uuid.UUID) error {
_, err := s.pool.Exec(ctx, `
UPDATE data_subject_requests
SET identity_verified = TRUE,
identity_verified_at = NOW(),
identity_verified_by = $1,
identity_verification_method = $2,
status = CASE WHEN status = 'intake' THEN 'identity_verification' ELSE status END,
updated_at = NOW()
WHERE id = $3
`, verifiedBy, method, id)
if err != nil {
return fmt.Errorf("failed to verify identity: %w", err)
}
s.recordStatusChange(ctx, id, nil, models.DSRStatusIdentityVerification, &verifiedBy, "Identität verifiziert via "+method)
return nil
}
// AssignRequest assigns a DSR to a handler
func (s *DSRService) AssignRequest(ctx context.Context, id uuid.UUID, assigneeID uuid.UUID, assignedBy uuid.UUID) error {
_, err := s.pool.Exec(ctx, `
UPDATE data_subject_requests SET assigned_to = $1, updated_at = NOW() WHERE id = $2
`, assigneeID, id)
if err != nil {
return fmt.Errorf("failed to assign DSR: %w", err)
}
// Get assignee name for comment
var assigneeName string
s.pool.QueryRow(ctx, "SELECT COALESCE(name, email) FROM users WHERE id = $1", assigneeID).Scan(&assigneeName)
s.recordStatusChange(ctx, id, nil, "", &assignedBy, "Zugewiesen an "+assigneeName)
// Notify assignee
go s.notifyAssignment(context.Background(), id, assigneeID)
return nil
}
// ExtendDeadline extends the deadline for a DSR
func (s *DSRService) ExtendDeadline(ctx context.Context, id uuid.UUID, reason string, days int, extendedBy uuid.UUID) error {
// Default extension is 2 months (60 days) per Art. 12(3)
if days <= 0 {
days = 60
}
_, err := s.pool.Exec(ctx, `
UPDATE data_subject_requests
SET extended_deadline_at = deadline_at + ($1 || ' days')::INTERVAL,
extension_reason = $2,
updated_at = NOW()
WHERE id = $3
`, days, reason, id)
if err != nil {
return fmt.Errorf("failed to extend deadline: %w", err)
}
s.recordStatusChange(ctx, id, nil, "", &extendedBy, fmt.Sprintf("Frist um %d Tage verlängert: %s", days, reason))
return nil
}
// CompleteRequest marks a DSR as completed
func (s *DSRService) CompleteRequest(ctx context.Context, id uuid.UUID, summary string, resultData map[string]interface{}, completedBy uuid.UUID) error {
resultJSON, _ := json.Marshal(resultData)
// Get current status
var currentStatus models.DSRStatus
s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(&currentStatus)
_, err := s.pool.Exec(ctx, `
UPDATE data_subject_requests
SET status = 'completed',
completed_at = NOW(),
completed_by = $1,
result_summary = $2,
result_data = $3,
updated_at = NOW()
WHERE id = $4
`, completedBy, summary, resultJSON, id)
if err != nil {
return fmt.Errorf("failed to complete DSR: %w", err)
}
s.recordStatusChange(ctx, id, &currentStatus, models.DSRStatusCompleted, &completedBy, summary)
return nil
}
// RejectRequest rejects a DSR with legal basis
func (s *DSRService) RejectRequest(ctx context.Context, id uuid.UUID, reason, legalBasis string, rejectedBy uuid.UUID) error {
// Get current status
var currentStatus models.DSRStatus
s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(&currentStatus)
_, err := s.pool.Exec(ctx, `
UPDATE data_subject_requests
SET status = 'rejected',
rejected_at = NOW(),
rejected_by = $1,
rejection_reason = $2,
rejection_legal_basis = $3,
updated_at = NOW()
WHERE id = $4
`, rejectedBy, reason, legalBasis, id)
if err != nil {
return fmt.Errorf("failed to reject DSR: %w", err)
}
s.recordStatusChange(ctx, id, &currentStatus, models.DSRStatusRejected, &rejectedBy, fmt.Sprintf("Abgelehnt (%s): %s", legalBasis, reason))
return nil
}
// CancelRequest cancels a DSR (by user)
func (s *DSRService) CancelRequest(ctx context.Context, id uuid.UUID, cancelledBy uuid.UUID) error {
// Verify ownership
var userID *uuid.UUID
err := s.pool.QueryRow(ctx, "SELECT user_id FROM data_subject_requests WHERE id = $1", id).Scan(&userID)
if err != nil {
return fmt.Errorf("DSR not found: %w", err)
}
if userID == nil || *userID != cancelledBy {
return fmt.Errorf("unauthorized: can only cancel own requests")
}
// Get current status
var currentStatus models.DSRStatus
s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(&currentStatus)
_, err = s.pool.Exec(ctx, `
UPDATE data_subject_requests SET status = 'cancelled', updated_at = NOW() WHERE id = $1
`, id)
if err != nil {
return fmt.Errorf("failed to cancel DSR: %w", err)
}
s.recordStatusChange(ctx, id, &currentStatus, models.DSRStatusCancelled, &cancelledBy, "Vom Antragsteller storniert")
return nil
}
// GetDashboardStats returns statistics for the admin dashboard
func (s *DSRService) GetDashboardStats(ctx context.Context) (*models.DSRDashboardStats, error) {
stats := &models.DSRDashboardStats{
ByType: make(map[string]int),
ByStatus: make(map[string]int),
}
// Total requests
s.pool.QueryRow(ctx, "SELECT COUNT(*) FROM data_subject_requests").Scan(&stats.TotalRequests)
// Pending requests (not completed, rejected, or cancelled)
s.pool.QueryRow(ctx, `
SELECT COUNT(*) FROM data_subject_requests
WHERE status NOT IN ('completed', 'rejected', 'cancelled')
`).Scan(&stats.PendingRequests)
// Overdue requests
s.pool.QueryRow(ctx, `
SELECT COUNT(*) FROM data_subject_requests
WHERE COALESCE(extended_deadline_at, deadline_at) < NOW()
AND status NOT IN ('completed', 'rejected', 'cancelled')
`).Scan(&stats.OverdueRequests)
// Completed this month
s.pool.QueryRow(ctx, `
SELECT COUNT(*) FROM data_subject_requests
WHERE status = 'completed'
AND completed_at >= DATE_TRUNC('month', NOW())
`).Scan(&stats.CompletedThisMonth)
// Average processing days
s.pool.QueryRow(ctx, `
SELECT COALESCE(AVG(EXTRACT(EPOCH FROM (completed_at - created_at)) / 86400), 0)
FROM data_subject_requests WHERE status = 'completed'
`).Scan(&stats.AverageProcessingDays)
// Count by type
rows, _ := s.pool.Query(ctx, `
SELECT request_type, COUNT(*) FROM data_subject_requests GROUP BY request_type
`)
for rows.Next() {
var t string
var count int
rows.Scan(&t, &count)
stats.ByType[t] = count
}
rows.Close()
// Count by status
rows, _ = s.pool.Query(ctx, `
SELECT status, COUNT(*) FROM data_subject_requests GROUP BY status
`)
for rows.Next() {
var s string
var count int
rows.Scan(&s, &count)
stats.ByStatus[s] = count
}
rows.Close()
// Upcoming deadlines (next 7 days)
rows, _ = s.pool.Query(ctx, `
SELECT id, request_number, request_type, status, requester_email, deadline_at
FROM data_subject_requests
WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN NOW() AND NOW() + INTERVAL '7 days'
AND status NOT IN ('completed', 'rejected', 'cancelled')
ORDER BY deadline_at ASC LIMIT 10
`)
for rows.Next() {
var dsr models.DataSubjectRequest
rows.Scan(&dsr.ID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status, &dsr.RequesterEmail, &dsr.DeadlineAt)
stats.UpcomingDeadlines = append(stats.UpcomingDeadlines, dsr)
}
rows.Close()
return stats, nil
}
// GetStatusHistory retrieves the status history for a DSR
func (s *DSRService) GetStatusHistory(ctx context.Context, requestID uuid.UUID) ([]models.DSRStatusHistory, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, request_id, from_status, to_status, changed_by, comment, metadata, created_at
FROM dsr_status_history WHERE request_id = $1 ORDER BY created_at DESC
`, requestID)
if err != nil {
return nil, fmt.Errorf("failed to query status history: %w", err)
}
defer rows.Close()
var history []models.DSRStatusHistory
for rows.Next() {
var h models.DSRStatusHistory
var metadataJSON []byte
err := rows.Scan(&h.ID, &h.RequestID, &h.FromStatus, &h.ToStatus, &h.ChangedBy, &h.Comment, &metadataJSON, &h.CreatedAt)
if err != nil {
continue
}
json.Unmarshal(metadataJSON, &h.Metadata)
history = append(history, h)
}
return history, nil
}
// GetCommunications retrieves communications for a DSR
func (s *DSRService) GetCommunications(ctx context.Context, requestID uuid.UUID) ([]models.DSRCommunication, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, request_id, direction, channel, communication_type, template_version_id,
subject, body_html, body_text, recipient_email, sent_at, error_message,
attachments, created_at, created_by
FROM dsr_communications WHERE request_id = $1 ORDER BY created_at DESC
`, requestID)
if err != nil {
return nil, fmt.Errorf("failed to query communications: %w", err)
}
defer rows.Close()
var comms []models.DSRCommunication
for rows.Next() {
var c models.DSRCommunication
var attachmentsJSON []byte
err := rows.Scan(&c.ID, &c.RequestID, &c.Direction, &c.Channel, &c.CommunicationType,
&c.TemplateVersionID, &c.Subject, &c.BodyHTML, &c.BodyText, &c.RecipientEmail,
&c.SentAt, &c.ErrorMessage, &attachmentsJSON, &c.CreatedAt, &c.CreatedBy)
if err != nil {
continue
}
json.Unmarshal(attachmentsJSON, &c.Attachments)
comms = append(comms, c)
}
return comms, nil
}
// SendCommunication sends a communication for a DSR
func (s *DSRService) SendCommunication(ctx context.Context, requestID uuid.UUID, req models.SendDSRCommunicationRequest, sentBy uuid.UUID) error {
// Get DSR details
dsr, err := s.GetByID(ctx, requestID)
if err != nil {
return err
}
// Get template if specified
var subject, bodyHTML, bodyText string
if req.TemplateVersionID != nil {
templateVersionID, _ := uuid.Parse(*req.TemplateVersionID)
err := s.pool.QueryRow(ctx, `
SELECT subject, body_html, body_text FROM dsr_template_versions WHERE id = $1 AND status = 'published'
`, templateVersionID).Scan(&subject, &bodyHTML, &bodyText)
if err != nil {
return fmt.Errorf("template version not found or not published: %w", err)
}
}
// Use custom content if provided
if req.CustomSubject != nil {
subject = *req.CustomSubject
}
if req.CustomBody != nil {
bodyHTML = *req.CustomBody
bodyText = stripHTML(*req.CustomBody)
}
// Replace variables
variables := map[string]string{
"requester_name": stringOrDefault(dsr.RequesterName, "Antragsteller/in"),
"request_number": dsr.RequestNumber,
"request_type_de": dsr.RequestType.Label(),
"request_date": dsr.CreatedAt.Format("02.01.2006"),
"deadline_date": dsr.DeadlineAt.Format("02.01.2006"),
}
for k, v := range req.Variables {
variables[k] = v
}
subject = replaceVariables(subject, variables)
bodyHTML = replaceVariables(bodyHTML, variables)
bodyText = replaceVariables(bodyText, variables)
// Send email
if s.emailService != nil {
err = s.emailService.SendEmail(dsr.RequesterEmail, subject, bodyHTML, bodyText)
if err != nil {
// Log error but continue
_, _ = s.pool.Exec(ctx, `
INSERT INTO dsr_communications (request_id, direction, channel, communication_type,
template_version_id, subject, body_html, body_text, recipient_email, error_message, created_by)
VALUES ($1, 'outbound', 'email', $2, $3, $4, $5, $6, $7, $8, $9)
`, requestID, req.CommunicationType, req.TemplateVersionID, subject, bodyHTML, bodyText,
dsr.RequesterEmail, err.Error(), sentBy)
return fmt.Errorf("failed to send email: %w", err)
}
}
// Log communication
now := time.Now()
_, err = s.pool.Exec(ctx, `
INSERT INTO dsr_communications (request_id, direction, channel, communication_type,
template_version_id, subject, body_html, body_text, recipient_email, sent_at, created_by)
VALUES ($1, 'outbound', 'email', $2, $3, $4, $5, $6, $7, $8, $9)
`, requestID, req.CommunicationType, req.TemplateVersionID, subject, bodyHTML, bodyText,
dsr.RequesterEmail, now, sentBy)
return err
}
// InitErasureExceptionChecks initializes exception checks for an erasure request
func (s *DSRService) InitErasureExceptionChecks(ctx context.Context, requestID uuid.UUID) error {
exceptions := []struct {
Type string
Description string
}{
{models.DSRExceptionFreedomExpression, "Ausübung des Rechts auf freie Meinungsäußerung und Information (Art. 17 Abs. 3 lit. a)"},
{models.DSRExceptionLegalObligation, "Erfüllung einer rechtlichen Verpflichtung oder öffentlichen Aufgabe (Art. 17 Abs. 3 lit. b)"},
{models.DSRExceptionPublicHealth, "Gründe des öffentlichen Interesses im Bereich der öffentlichen Gesundheit (Art. 17 Abs. 3 lit. c)"},
{models.DSRExceptionArchiving, "Im öffentlichen Interesse liegende Archivzwecke, Forschung oder Statistik (Art. 17 Abs. 3 lit. d)"},
{models.DSRExceptionLegalClaims, "Geltendmachung, Ausübung oder Verteidigung von Rechtsansprüchen (Art. 17 Abs. 3 lit. e)"},
}
for _, exc := range exceptions {
_, err := s.pool.Exec(ctx, `
INSERT INTO dsr_exception_checks (request_id, exception_type, description)
VALUES ($1, $2, $3) ON CONFLICT DO NOTHING
`, requestID, exc.Type, exc.Description)
if err != nil {
return fmt.Errorf("failed to create exception check: %w", err)
}
}
return nil
}
// GetExceptionChecks retrieves exception checks for a DSR
func (s *DSRService) GetExceptionChecks(ctx context.Context, requestID uuid.UUID) ([]models.DSRExceptionCheck, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, request_id, exception_type, description, applies, checked_by, checked_at, notes, created_at
FROM dsr_exception_checks WHERE request_id = $1 ORDER BY created_at
`, requestID)
if err != nil {
return nil, fmt.Errorf("failed to query exception checks: %w", err)
}
defer rows.Close()
var checks []models.DSRExceptionCheck
for rows.Next() {
var c models.DSRExceptionCheck
err := rows.Scan(&c.ID, &c.RequestID, &c.ExceptionType, &c.Description, &c.Applies,
&c.CheckedBy, &c.CheckedAt, &c.Notes, &c.CreatedAt)
if err != nil {
continue
}
checks = append(checks, c)
}
return checks, nil
}
// UpdateExceptionCheck updates an exception check
func (s *DSRService) UpdateExceptionCheck(ctx context.Context, checkID uuid.UUID, applies bool, notes *string, checkedBy uuid.UUID) error {
_, err := s.pool.Exec(ctx, `
UPDATE dsr_exception_checks
SET applies = $1, notes = $2, checked_by = $3, checked_at = NOW()
WHERE id = $4
`, applies, notes, checkedBy, checkID)
return err
}
// ProcessDeadlines checks for approaching and overdue deadlines
func (s *DSRService) ProcessDeadlines(ctx context.Context) error {
now := time.Now()
// Find requests with deadlines in 3 days
threeDaysAhead := now.AddDate(0, 0, 3)
rows, _ := s.pool.Query(ctx, `
SELECT id, request_number, request_type, assigned_to, deadline_at
FROM data_subject_requests
WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN $1 AND $2
AND status NOT IN ('completed', 'rejected', 'cancelled')
`, now, threeDaysAhead)
for rows.Next() {
var id uuid.UUID
var requestNumber, requestType string
var assignedTo *uuid.UUID
var deadline time.Time
rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline)
// Notify assigned user or all DPOs
if assignedTo != nil {
s.notifyDeadlineWarning(ctx, id, *assignedTo, requestNumber, deadline, 3)
} else {
s.notifyAllDPOs(ctx, id, requestNumber, "Frist in 3 Tagen", deadline)
}
}
rows.Close()
// Find requests with deadlines in 1 day
oneDayAhead := now.AddDate(0, 0, 1)
rows, _ = s.pool.Query(ctx, `
SELECT id, request_number, request_type, assigned_to, deadline_at
FROM data_subject_requests
WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN $1 AND $2
AND status NOT IN ('completed', 'rejected', 'cancelled')
`, now, oneDayAhead)
for rows.Next() {
var id uuid.UUID
var requestNumber, requestType string
var assignedTo *uuid.UUID
var deadline time.Time
rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline)
if assignedTo != nil {
s.notifyDeadlineWarning(ctx, id, *assignedTo, requestNumber, deadline, 1)
} else {
s.notifyAllDPOs(ctx, id, requestNumber, "Frist morgen!", deadline)
}
}
rows.Close()
// Find overdue requests
rows, _ = s.pool.Query(ctx, `
SELECT id, request_number, request_type, assigned_to, deadline_at
FROM data_subject_requests
WHERE COALESCE(extended_deadline_at, deadline_at) < $1
AND status NOT IN ('completed', 'rejected', 'cancelled')
`, now)
for rows.Next() {
var id uuid.UUID
var requestNumber, requestType string
var assignedTo *uuid.UUID
var deadline time.Time
rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline)
// Notify all DPOs for overdue
s.notifyAllDPOs(ctx, id, requestNumber, "ÜBERFÄLLIG!", deadline)
// Log to audit
s.pool.Exec(ctx, `
INSERT INTO consent_audit_log (action, entity_type, entity_id, details)
VALUES ('dsr_overdue', 'dsr', $1, $2)
`, id, fmt.Sprintf(`{"request_number": "%s", "deadline": "%s"}`, requestNumber, deadline.Format(time.RFC3339)))
}
rows.Close()
return nil
}
// Helper functions
func (s *DSRService) recordStatusChange(ctx context.Context, requestID uuid.UUID, fromStatus *models.DSRStatus, toStatus models.DSRStatus, changedBy *uuid.UUID, comment string) {
s.pool.Exec(ctx, `
INSERT INTO dsr_status_history (request_id, from_status, to_status, changed_by, comment)
VALUES ($1, $2, $3, $4, $5)
`, requestID, fromStatus, toStatus, changedBy, comment)
}
func (s *DSRService) notifyNewRequest(ctx context.Context, dsr *models.DataSubjectRequest) {
if s.notificationService == nil {
return
}
// Notify all DPOs
rows, _ := s.pool.Query(ctx, "SELECT id FROM users WHERE role = 'data_protection_officer'")
defer rows.Close()
for rows.Next() {
var userID uuid.UUID
rows.Scan(&userID)
s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRReceived,
"Neue Betroffenenanfrage",
fmt.Sprintf("Neue %s eingegangen: %s", dsr.RequestType.Label(), dsr.RequestNumber),
map[string]interface{}{"dsr_id": dsr.ID, "request_number": dsr.RequestNumber})
}
}
func (s *DSRService) notifyAssignment(ctx context.Context, dsrID, assigneeID uuid.UUID) {
if s.notificationService == nil {
return
}
dsr, _ := s.GetByID(ctx, dsrID)
if dsr != nil {
s.notificationService.CreateNotification(ctx, assigneeID, NotificationTypeDSRAssigned,
"Betroffenenanfrage zugewiesen",
fmt.Sprintf("Ihnen wurde die Anfrage %s zugewiesen", dsr.RequestNumber),
map[string]interface{}{"dsr_id": dsrID, "request_number": dsr.RequestNumber})
}
}
func (s *DSRService) notifyDeadlineWarning(ctx context.Context, dsrID, userID uuid.UUID, requestNumber string, deadline time.Time, daysLeft int) {
if s.notificationService == nil {
return
}
s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRDeadline,
fmt.Sprintf("Fristwarnung: %s", requestNumber),
fmt.Sprintf("Die Frist für %s läuft in %d Tag(en) ab (%s)", requestNumber, daysLeft, deadline.Format("02.01.2006")),
map[string]interface{}{"dsr_id": dsrID, "deadline": deadline, "days_left": daysLeft})
}
func (s *DSRService) notifyAllDPOs(ctx context.Context, dsrID uuid.UUID, requestNumber, message string, deadline time.Time) {
if s.notificationService == nil {
return
}
rows, _ := s.pool.Query(ctx, "SELECT id FROM users WHERE role = 'data_protection_officer'")
defer rows.Close()
for rows.Next() {
var userID uuid.UUID
rows.Scan(&userID)
s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRDeadline,
fmt.Sprintf("%s: %s", message, requestNumber),
fmt.Sprintf("Anfrage %s: %s (Frist: %s)", requestNumber, message, deadline.Format("02.01.2006")),
map[string]interface{}{"dsr_id": dsrID, "deadline": deadline})
}
}
func isValidRequestType(rt models.DSRRequestType) bool {
switch rt {
case models.DSRTypeAccess, models.DSRTypeRectification, models.DSRTypeErasure,
models.DSRTypeRestriction, models.DSRTypePortability:
return true
}
return false
}
func isValidStatusTransition(from, to models.DSRStatus) bool {
validTransitions := map[models.DSRStatus][]models.DSRStatus{
models.DSRStatusIntake: {models.DSRStatusIdentityVerification, models.DSRStatusProcessing, models.DSRStatusRejected, models.DSRStatusCancelled},
models.DSRStatusIdentityVerification: {models.DSRStatusProcessing, models.DSRStatusRejected, models.DSRStatusCancelled},
models.DSRStatusProcessing: {models.DSRStatusCompleted, models.DSRStatusRejected, models.DSRStatusCancelled},
models.DSRStatusCompleted: {},
models.DSRStatusRejected: {},
models.DSRStatusCancelled: {},
}
allowed, exists := validTransitions[from]
if !exists {
return false
}
for _, s := range allowed {
if s == to {
return true
}
}
return false
}
func stringOrDefault(s *string, def string) string {
if s != nil {
return *s
}
return def
}
func replaceVariables(text string, variables map[string]string) string {
for k, v := range variables {
text = strings.ReplaceAll(text, "{{"+k+"}}", v)
}
return text
}
func stripHTML(html string) string {
// Simple HTML stripping - in production use a proper library
text := strings.ReplaceAll(html, "<br>", "\n")
text = strings.ReplaceAll(text, "<br/>", "\n")
text = strings.ReplaceAll(text, "<br />", "\n")
text = strings.ReplaceAll(text, "</p>", "\n\n")
// Remove all remaining tags
for {
start := strings.Index(text, "<")
if start == -1 {
break
}
end := strings.Index(text[start:], ">")
if end == -1 {
break
}
text = text[:start] + text[start+end+1:]
}
return strings.TrimSpace(text)
}