Docker Compose with 24+ services: - PostgreSQL (PostGIS), Valkey, MinIO, Qdrant - Vault (PKI/TLS), Nginx (Reverse Proxy) - Backend Core API, Consent Service, Billing Service - RAG Service, Embedding Service - Gitea, Woodpecker CI/CD - Night Scheduler, Health Aggregator - Jitsi (Web/XMPP/JVB/Jicofo), Mailpit Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
948 lines
32 KiB
Go
948 lines
32 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/breakpilot/consent-service/internal/models"
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// DSRService handles Data Subject Request business logic
|
|
type DSRService struct {
|
|
pool *pgxpool.Pool
|
|
notificationService *NotificationService
|
|
emailService *EmailService
|
|
}
|
|
|
|
// NewDSRService creates a new DSRService
|
|
func NewDSRService(pool *pgxpool.Pool, notificationService *NotificationService, emailService *EmailService) *DSRService {
|
|
return &DSRService{
|
|
pool: pool,
|
|
notificationService: notificationService,
|
|
emailService: emailService,
|
|
}
|
|
}
|
|
|
|
// GetPool returns the database pool for direct queries
|
|
func (s *DSRService) GetPool() *pgxpool.Pool {
|
|
return s.pool
|
|
}
|
|
|
|
// generateRequestNumber generates a unique request number like DSR-2025-000001
|
|
func (s *DSRService) generateRequestNumber(ctx context.Context) (string, error) {
|
|
var seqNum int64
|
|
err := s.pool.QueryRow(ctx, "SELECT nextval('dsr_request_number_seq')").Scan(&seqNum)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to get next sequence number: %w", err)
|
|
}
|
|
year := time.Now().Year()
|
|
return fmt.Sprintf("DSR-%d-%06d", year, seqNum), nil
|
|
}
|
|
|
|
// CreateRequest creates a new data subject request
|
|
func (s *DSRService) CreateRequest(ctx context.Context, req models.CreateDSRRequest, createdBy *uuid.UUID) (*models.DataSubjectRequest, error) {
|
|
// Validate request type
|
|
requestType := models.DSRRequestType(req.RequestType)
|
|
if !isValidRequestType(requestType) {
|
|
return nil, fmt.Errorf("invalid request type: %s", req.RequestType)
|
|
}
|
|
|
|
// Generate request number
|
|
requestNumber, err := s.generateRequestNumber(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Calculate deadline
|
|
deadlineDays := requestType.DeadlineDays()
|
|
deadline := time.Now().AddDate(0, 0, deadlineDays)
|
|
|
|
// Determine priority
|
|
priority := models.DSRPriorityNormal
|
|
if req.Priority != "" {
|
|
priority = models.DSRPriority(req.Priority)
|
|
} else if requestType.IsExpedited() {
|
|
priority = models.DSRPriorityExpedited
|
|
}
|
|
|
|
// Determine source
|
|
source := models.DSRSourceAPI
|
|
if req.Source != "" {
|
|
source = models.DSRSource(req.Source)
|
|
}
|
|
|
|
// Serialize request details
|
|
detailsJSON, err := json.Marshal(req.RequestDetails)
|
|
if err != nil {
|
|
detailsJSON = []byte("{}")
|
|
}
|
|
|
|
// Try to find existing user by email
|
|
var userID *uuid.UUID
|
|
var foundUserID uuid.UUID
|
|
err = s.pool.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", req.RequesterEmail).Scan(&foundUserID)
|
|
if err == nil {
|
|
userID = &foundUserID
|
|
}
|
|
|
|
// Insert request
|
|
var dsr models.DataSubjectRequest
|
|
err = s.pool.QueryRow(ctx, `
|
|
INSERT INTO data_subject_requests (
|
|
user_id, request_number, request_type, status, priority, source,
|
|
requester_email, requester_name, requester_phone,
|
|
request_details, deadline_at, legal_deadline_days, created_by
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
|
RETURNING id, user_id, request_number, request_type, status, priority, source,
|
|
requester_email, requester_name, requester_phone, identity_verified,
|
|
request_details, deadline_at, legal_deadline_days, created_at, updated_at, created_by
|
|
`, userID, requestNumber, requestType, models.DSRStatusIntake, priority, source,
|
|
req.RequesterEmail, req.RequesterName, req.RequesterPhone,
|
|
detailsJSON, deadline, deadlineDays, createdBy,
|
|
).Scan(
|
|
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
|
|
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
|
|
&dsr.RequesterPhone, &dsr.IdentityVerified, &detailsJSON,
|
|
&dsr.DeadlineAt, &dsr.LegalDeadlineDays, &dsr.CreatedAt, &dsr.UpdatedAt, &dsr.CreatedBy,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create DSR: %w", err)
|
|
}
|
|
|
|
// Parse details back
|
|
json.Unmarshal(detailsJSON, &dsr.RequestDetails)
|
|
|
|
// Record initial status
|
|
s.recordStatusChange(ctx, dsr.ID, nil, models.DSRStatusIntake, createdBy, "Anfrage eingegangen")
|
|
|
|
// Notify DPOs about new request
|
|
go s.notifyNewRequest(context.Background(), &dsr)
|
|
|
|
return &dsr, nil
|
|
}
|
|
|
|
// GetByID retrieves a DSR by ID
|
|
func (s *DSRService) GetByID(ctx context.Context, id uuid.UUID) (*models.DataSubjectRequest, error) {
|
|
var dsr models.DataSubjectRequest
|
|
var detailsJSON, resultDataJSON []byte
|
|
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT id, user_id, request_number, request_type, status, priority, source,
|
|
requester_email, requester_name, requester_phone,
|
|
identity_verified, identity_verified_at, identity_verified_by, identity_verification_method,
|
|
request_details, deadline_at, legal_deadline_days, extended_deadline_at, extension_reason,
|
|
assigned_to, processing_notes, completed_at, completed_by, result_summary, result_data,
|
|
rejected_at, rejected_by, rejection_reason, rejection_legal_basis,
|
|
created_at, updated_at, created_by
|
|
FROM data_subject_requests WHERE id = $1
|
|
`, id).Scan(
|
|
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
|
|
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
|
|
&dsr.RequesterPhone, &dsr.IdentityVerified, &dsr.IdentityVerifiedAt,
|
|
&dsr.IdentityVerifiedBy, &dsr.IdentityVerificationMethod,
|
|
&detailsJSON, &dsr.DeadlineAt, &dsr.LegalDeadlineDays,
|
|
&dsr.ExtendedDeadlineAt, &dsr.ExtensionReason, &dsr.AssignedTo,
|
|
&dsr.ProcessingNotes, &dsr.CompletedAt, &dsr.CompletedBy,
|
|
&dsr.ResultSummary, &resultDataJSON, &dsr.RejectedAt, &dsr.RejectedBy,
|
|
&dsr.RejectionReason, &dsr.RejectionLegalBasis,
|
|
&dsr.CreatedAt, &dsr.UpdatedAt, &dsr.CreatedBy,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("DSR not found: %w", err)
|
|
}
|
|
|
|
json.Unmarshal(detailsJSON, &dsr.RequestDetails)
|
|
json.Unmarshal(resultDataJSON, &dsr.ResultData)
|
|
|
|
return &dsr, nil
|
|
}
|
|
|
|
// GetByNumber retrieves a DSR by request number
|
|
func (s *DSRService) GetByNumber(ctx context.Context, requestNumber string) (*models.DataSubjectRequest, error) {
|
|
var id uuid.UUID
|
|
err := s.pool.QueryRow(ctx, "SELECT id FROM data_subject_requests WHERE request_number = $1", requestNumber).Scan(&id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("DSR not found: %w", err)
|
|
}
|
|
return s.GetByID(ctx, id)
|
|
}
|
|
|
|
// List retrieves DSRs with filters and pagination
|
|
func (s *DSRService) List(ctx context.Context, filters models.DSRListFilters, limit, offset int) ([]models.DataSubjectRequest, int, error) {
|
|
// Build query
|
|
baseQuery := "FROM data_subject_requests WHERE 1=1"
|
|
args := []interface{}{}
|
|
argIndex := 1
|
|
|
|
if filters.Status != nil && *filters.Status != "" {
|
|
baseQuery += fmt.Sprintf(" AND status = $%d", argIndex)
|
|
args = append(args, *filters.Status)
|
|
argIndex++
|
|
}
|
|
|
|
if filters.RequestType != nil && *filters.RequestType != "" {
|
|
baseQuery += fmt.Sprintf(" AND request_type = $%d", argIndex)
|
|
args = append(args, *filters.RequestType)
|
|
argIndex++
|
|
}
|
|
|
|
if filters.AssignedTo != nil && *filters.AssignedTo != "" {
|
|
baseQuery += fmt.Sprintf(" AND assigned_to = $%d", argIndex)
|
|
args = append(args, *filters.AssignedTo)
|
|
argIndex++
|
|
}
|
|
|
|
if filters.Priority != nil && *filters.Priority != "" {
|
|
baseQuery += fmt.Sprintf(" AND priority = $%d", argIndex)
|
|
args = append(args, *filters.Priority)
|
|
argIndex++
|
|
}
|
|
|
|
if filters.OverdueOnly {
|
|
baseQuery += " AND deadline_at < NOW() AND status NOT IN ('completed', 'rejected', 'cancelled')"
|
|
}
|
|
|
|
if filters.FromDate != nil {
|
|
baseQuery += fmt.Sprintf(" AND created_at >= $%d", argIndex)
|
|
args = append(args, *filters.FromDate)
|
|
argIndex++
|
|
}
|
|
|
|
if filters.ToDate != nil {
|
|
baseQuery += fmt.Sprintf(" AND created_at <= $%d", argIndex)
|
|
args = append(args, *filters.ToDate)
|
|
argIndex++
|
|
}
|
|
|
|
if filters.Search != nil && *filters.Search != "" {
|
|
searchPattern := "%" + *filters.Search + "%"
|
|
baseQuery += fmt.Sprintf(" AND (request_number ILIKE $%d OR requester_email ILIKE $%d OR requester_name ILIKE $%d)", argIndex, argIndex, argIndex)
|
|
args = append(args, searchPattern)
|
|
argIndex++
|
|
}
|
|
|
|
// Get total count
|
|
var total int
|
|
err := s.pool.QueryRow(ctx, "SELECT COUNT(*) "+baseQuery, args...).Scan(&total)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("failed to count DSRs: %w", err)
|
|
}
|
|
|
|
// Get paginated results
|
|
query := fmt.Sprintf(`
|
|
SELECT id, user_id, request_number, request_type, status, priority, source,
|
|
requester_email, requester_name, requester_phone, identity_verified,
|
|
deadline_at, legal_deadline_days, assigned_to, created_at, updated_at
|
|
%s ORDER BY created_at DESC LIMIT $%d OFFSET $%d
|
|
`, baseQuery, argIndex, argIndex+1)
|
|
args = append(args, limit, offset)
|
|
|
|
rows, err := s.pool.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("failed to query DSRs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var dsrs []models.DataSubjectRequest
|
|
for rows.Next() {
|
|
var dsr models.DataSubjectRequest
|
|
err := rows.Scan(
|
|
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
|
|
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
|
|
&dsr.RequesterPhone, &dsr.IdentityVerified, &dsr.DeadlineAt,
|
|
&dsr.LegalDeadlineDays, &dsr.AssignedTo, &dsr.CreatedAt, &dsr.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("failed to scan DSR: %w", err)
|
|
}
|
|
dsrs = append(dsrs, dsr)
|
|
}
|
|
|
|
return dsrs, total, nil
|
|
}
|
|
|
|
// ListByUser retrieves DSRs for a specific user
|
|
func (s *DSRService) ListByUser(ctx context.Context, userID uuid.UUID) ([]models.DataSubjectRequest, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, user_id, request_number, request_type, status, priority, source,
|
|
requester_email, requester_name, deadline_at, created_at, updated_at
|
|
FROM data_subject_requests
|
|
WHERE user_id = $1 OR requester_email = (SELECT email FROM users WHERE id = $1)
|
|
ORDER BY created_at DESC
|
|
`, userID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query user DSRs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var dsrs []models.DataSubjectRequest
|
|
for rows.Next() {
|
|
var dsr models.DataSubjectRequest
|
|
err := rows.Scan(
|
|
&dsr.ID, &dsr.UserID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status,
|
|
&dsr.Priority, &dsr.Source, &dsr.RequesterEmail, &dsr.RequesterName,
|
|
&dsr.DeadlineAt, &dsr.CreatedAt, &dsr.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan DSR: %w", err)
|
|
}
|
|
dsrs = append(dsrs, dsr)
|
|
}
|
|
|
|
return dsrs, nil
|
|
}
|
|
|
|
// UpdateStatus changes the status of a DSR
|
|
func (s *DSRService) UpdateStatus(ctx context.Context, id uuid.UUID, newStatus models.DSRStatus, comment string, changedBy *uuid.UUID) error {
|
|
// Get current status
|
|
var currentStatus models.DSRStatus
|
|
err := s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus)
|
|
if err != nil {
|
|
return fmt.Errorf("DSR not found: %w", err)
|
|
}
|
|
|
|
// Validate transition
|
|
if !isValidStatusTransition(currentStatus, newStatus) {
|
|
return fmt.Errorf("invalid status transition from %s to %s", currentStatus, newStatus)
|
|
}
|
|
|
|
// Update status
|
|
_, err = s.pool.Exec(ctx, `
|
|
UPDATE data_subject_requests SET status = $1, updated_at = NOW() WHERE id = $2
|
|
`, newStatus, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update status: %w", err)
|
|
}
|
|
|
|
// Record status change
|
|
s.recordStatusChange(ctx, id, ¤tStatus, newStatus, changedBy, comment)
|
|
|
|
return nil
|
|
}
|
|
|
|
// VerifyIdentity marks identity as verified
|
|
func (s *DSRService) VerifyIdentity(ctx context.Context, id uuid.UUID, method string, verifiedBy uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE data_subject_requests
|
|
SET identity_verified = TRUE,
|
|
identity_verified_at = NOW(),
|
|
identity_verified_by = $1,
|
|
identity_verification_method = $2,
|
|
status = CASE WHEN status = 'intake' THEN 'identity_verification' ELSE status END,
|
|
updated_at = NOW()
|
|
WHERE id = $3
|
|
`, verifiedBy, method, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to verify identity: %w", err)
|
|
}
|
|
|
|
s.recordStatusChange(ctx, id, nil, models.DSRStatusIdentityVerification, &verifiedBy, "Identität verifiziert via "+method)
|
|
|
|
return nil
|
|
}
|
|
|
|
// AssignRequest assigns a DSR to a handler
|
|
func (s *DSRService) AssignRequest(ctx context.Context, id uuid.UUID, assigneeID uuid.UUID, assignedBy uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE data_subject_requests SET assigned_to = $1, updated_at = NOW() WHERE id = $2
|
|
`, assigneeID, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to assign DSR: %w", err)
|
|
}
|
|
|
|
// Get assignee name for comment
|
|
var assigneeName string
|
|
s.pool.QueryRow(ctx, "SELECT COALESCE(name, email) FROM users WHERE id = $1", assigneeID).Scan(&assigneeName)
|
|
|
|
s.recordStatusChange(ctx, id, nil, "", &assignedBy, "Zugewiesen an "+assigneeName)
|
|
|
|
// Notify assignee
|
|
go s.notifyAssignment(context.Background(), id, assigneeID)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ExtendDeadline extends the deadline for a DSR
|
|
func (s *DSRService) ExtendDeadline(ctx context.Context, id uuid.UUID, reason string, days int, extendedBy uuid.UUID) error {
|
|
// Default extension is 2 months (60 days) per Art. 12(3)
|
|
if days <= 0 {
|
|
days = 60
|
|
}
|
|
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE data_subject_requests
|
|
SET extended_deadline_at = deadline_at + ($1 || ' days')::INTERVAL,
|
|
extension_reason = $2,
|
|
updated_at = NOW()
|
|
WHERE id = $3
|
|
`, days, reason, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to extend deadline: %w", err)
|
|
}
|
|
|
|
s.recordStatusChange(ctx, id, nil, "", &extendedBy, fmt.Sprintf("Frist um %d Tage verlängert: %s", days, reason))
|
|
|
|
return nil
|
|
}
|
|
|
|
// CompleteRequest marks a DSR as completed
|
|
func (s *DSRService) CompleteRequest(ctx context.Context, id uuid.UUID, summary string, resultData map[string]interface{}, completedBy uuid.UUID) error {
|
|
resultJSON, _ := json.Marshal(resultData)
|
|
|
|
// Get current status
|
|
var currentStatus models.DSRStatus
|
|
s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus)
|
|
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE data_subject_requests
|
|
SET status = 'completed',
|
|
completed_at = NOW(),
|
|
completed_by = $1,
|
|
result_summary = $2,
|
|
result_data = $3,
|
|
updated_at = NOW()
|
|
WHERE id = $4
|
|
`, completedBy, summary, resultJSON, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to complete DSR: %w", err)
|
|
}
|
|
|
|
s.recordStatusChange(ctx, id, ¤tStatus, models.DSRStatusCompleted, &completedBy, summary)
|
|
|
|
return nil
|
|
}
|
|
|
|
// RejectRequest rejects a DSR with legal basis
|
|
func (s *DSRService) RejectRequest(ctx context.Context, id uuid.UUID, reason, legalBasis string, rejectedBy uuid.UUID) error {
|
|
// Get current status
|
|
var currentStatus models.DSRStatus
|
|
s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus)
|
|
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE data_subject_requests
|
|
SET status = 'rejected',
|
|
rejected_at = NOW(),
|
|
rejected_by = $1,
|
|
rejection_reason = $2,
|
|
rejection_legal_basis = $3,
|
|
updated_at = NOW()
|
|
WHERE id = $4
|
|
`, rejectedBy, reason, legalBasis, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to reject DSR: %w", err)
|
|
}
|
|
|
|
s.recordStatusChange(ctx, id, ¤tStatus, models.DSRStatusRejected, &rejectedBy, fmt.Sprintf("Abgelehnt (%s): %s", legalBasis, reason))
|
|
|
|
return nil
|
|
}
|
|
|
|
// CancelRequest cancels a DSR (by user)
|
|
func (s *DSRService) CancelRequest(ctx context.Context, id uuid.UUID, cancelledBy uuid.UUID) error {
|
|
// Verify ownership
|
|
var userID *uuid.UUID
|
|
err := s.pool.QueryRow(ctx, "SELECT user_id FROM data_subject_requests WHERE id = $1", id).Scan(&userID)
|
|
if err != nil {
|
|
return fmt.Errorf("DSR not found: %w", err)
|
|
}
|
|
if userID == nil || *userID != cancelledBy {
|
|
return fmt.Errorf("unauthorized: can only cancel own requests")
|
|
}
|
|
|
|
// Get current status
|
|
var currentStatus models.DSRStatus
|
|
s.pool.QueryRow(ctx, "SELECT status FROM data_subject_requests WHERE id = $1", id).Scan(¤tStatus)
|
|
|
|
_, err = s.pool.Exec(ctx, `
|
|
UPDATE data_subject_requests SET status = 'cancelled', updated_at = NOW() WHERE id = $1
|
|
`, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to cancel DSR: %w", err)
|
|
}
|
|
|
|
s.recordStatusChange(ctx, id, ¤tStatus, models.DSRStatusCancelled, &cancelledBy, "Vom Antragsteller storniert")
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetDashboardStats returns statistics for the admin dashboard
|
|
func (s *DSRService) GetDashboardStats(ctx context.Context) (*models.DSRDashboardStats, error) {
|
|
stats := &models.DSRDashboardStats{
|
|
ByType: make(map[string]int),
|
|
ByStatus: make(map[string]int),
|
|
}
|
|
|
|
// Total requests
|
|
s.pool.QueryRow(ctx, "SELECT COUNT(*) FROM data_subject_requests").Scan(&stats.TotalRequests)
|
|
|
|
// Pending requests (not completed, rejected, or cancelled)
|
|
s.pool.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM data_subject_requests
|
|
WHERE status NOT IN ('completed', 'rejected', 'cancelled')
|
|
`).Scan(&stats.PendingRequests)
|
|
|
|
// Overdue requests
|
|
s.pool.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM data_subject_requests
|
|
WHERE COALESCE(extended_deadline_at, deadline_at) < NOW()
|
|
AND status NOT IN ('completed', 'rejected', 'cancelled')
|
|
`).Scan(&stats.OverdueRequests)
|
|
|
|
// Completed this month
|
|
s.pool.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM data_subject_requests
|
|
WHERE status = 'completed'
|
|
AND completed_at >= DATE_TRUNC('month', NOW())
|
|
`).Scan(&stats.CompletedThisMonth)
|
|
|
|
// Average processing days
|
|
s.pool.QueryRow(ctx, `
|
|
SELECT COALESCE(AVG(EXTRACT(EPOCH FROM (completed_at - created_at)) / 86400), 0)
|
|
FROM data_subject_requests WHERE status = 'completed'
|
|
`).Scan(&stats.AverageProcessingDays)
|
|
|
|
// Count by type
|
|
rows, _ := s.pool.Query(ctx, `
|
|
SELECT request_type, COUNT(*) FROM data_subject_requests GROUP BY request_type
|
|
`)
|
|
for rows.Next() {
|
|
var t string
|
|
var count int
|
|
rows.Scan(&t, &count)
|
|
stats.ByType[t] = count
|
|
}
|
|
rows.Close()
|
|
|
|
// Count by status
|
|
rows, _ = s.pool.Query(ctx, `
|
|
SELECT status, COUNT(*) FROM data_subject_requests GROUP BY status
|
|
`)
|
|
for rows.Next() {
|
|
var s string
|
|
var count int
|
|
rows.Scan(&s, &count)
|
|
stats.ByStatus[s] = count
|
|
}
|
|
rows.Close()
|
|
|
|
// Upcoming deadlines (next 7 days)
|
|
rows, _ = s.pool.Query(ctx, `
|
|
SELECT id, request_number, request_type, status, requester_email, deadline_at
|
|
FROM data_subject_requests
|
|
WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN NOW() AND NOW() + INTERVAL '7 days'
|
|
AND status NOT IN ('completed', 'rejected', 'cancelled')
|
|
ORDER BY deadline_at ASC LIMIT 10
|
|
`)
|
|
for rows.Next() {
|
|
var dsr models.DataSubjectRequest
|
|
rows.Scan(&dsr.ID, &dsr.RequestNumber, &dsr.RequestType, &dsr.Status, &dsr.RequesterEmail, &dsr.DeadlineAt)
|
|
stats.UpcomingDeadlines = append(stats.UpcomingDeadlines, dsr)
|
|
}
|
|
rows.Close()
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
// GetStatusHistory retrieves the status history for a DSR
|
|
func (s *DSRService) GetStatusHistory(ctx context.Context, requestID uuid.UUID) ([]models.DSRStatusHistory, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, request_id, from_status, to_status, changed_by, comment, metadata, created_at
|
|
FROM dsr_status_history WHERE request_id = $1 ORDER BY created_at DESC
|
|
`, requestID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query status history: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var history []models.DSRStatusHistory
|
|
for rows.Next() {
|
|
var h models.DSRStatusHistory
|
|
var metadataJSON []byte
|
|
err := rows.Scan(&h.ID, &h.RequestID, &h.FromStatus, &h.ToStatus, &h.ChangedBy, &h.Comment, &metadataJSON, &h.CreatedAt)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
json.Unmarshal(metadataJSON, &h.Metadata)
|
|
history = append(history, h)
|
|
}
|
|
|
|
return history, nil
|
|
}
|
|
|
|
// GetCommunications retrieves communications for a DSR
|
|
func (s *DSRService) GetCommunications(ctx context.Context, requestID uuid.UUID) ([]models.DSRCommunication, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, request_id, direction, channel, communication_type, template_version_id,
|
|
subject, body_html, body_text, recipient_email, sent_at, error_message,
|
|
attachments, created_at, created_by
|
|
FROM dsr_communications WHERE request_id = $1 ORDER BY created_at DESC
|
|
`, requestID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query communications: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var comms []models.DSRCommunication
|
|
for rows.Next() {
|
|
var c models.DSRCommunication
|
|
var attachmentsJSON []byte
|
|
err := rows.Scan(&c.ID, &c.RequestID, &c.Direction, &c.Channel, &c.CommunicationType,
|
|
&c.TemplateVersionID, &c.Subject, &c.BodyHTML, &c.BodyText, &c.RecipientEmail,
|
|
&c.SentAt, &c.ErrorMessage, &attachmentsJSON, &c.CreatedAt, &c.CreatedBy)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
json.Unmarshal(attachmentsJSON, &c.Attachments)
|
|
comms = append(comms, c)
|
|
}
|
|
|
|
return comms, nil
|
|
}
|
|
|
|
// SendCommunication sends a communication for a DSR
|
|
func (s *DSRService) SendCommunication(ctx context.Context, requestID uuid.UUID, req models.SendDSRCommunicationRequest, sentBy uuid.UUID) error {
|
|
// Get DSR details
|
|
dsr, err := s.GetByID(ctx, requestID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get template if specified
|
|
var subject, bodyHTML, bodyText string
|
|
if req.TemplateVersionID != nil {
|
|
templateVersionID, _ := uuid.Parse(*req.TemplateVersionID)
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT subject, body_html, body_text FROM dsr_template_versions WHERE id = $1 AND status = 'published'
|
|
`, templateVersionID).Scan(&subject, &bodyHTML, &bodyText)
|
|
if err != nil {
|
|
return fmt.Errorf("template version not found or not published: %w", err)
|
|
}
|
|
}
|
|
|
|
// Use custom content if provided
|
|
if req.CustomSubject != nil {
|
|
subject = *req.CustomSubject
|
|
}
|
|
if req.CustomBody != nil {
|
|
bodyHTML = *req.CustomBody
|
|
bodyText = stripHTML(*req.CustomBody)
|
|
}
|
|
|
|
// Replace variables
|
|
variables := map[string]string{
|
|
"requester_name": stringOrDefault(dsr.RequesterName, "Antragsteller/in"),
|
|
"request_number": dsr.RequestNumber,
|
|
"request_type_de": dsr.RequestType.Label(),
|
|
"request_date": dsr.CreatedAt.Format("02.01.2006"),
|
|
"deadline_date": dsr.DeadlineAt.Format("02.01.2006"),
|
|
}
|
|
for k, v := range req.Variables {
|
|
variables[k] = v
|
|
}
|
|
subject = replaceVariables(subject, variables)
|
|
bodyHTML = replaceVariables(bodyHTML, variables)
|
|
bodyText = replaceVariables(bodyText, variables)
|
|
|
|
// Send email
|
|
if s.emailService != nil {
|
|
err = s.emailService.SendEmail(dsr.RequesterEmail, subject, bodyHTML, bodyText)
|
|
if err != nil {
|
|
// Log error but continue
|
|
_, _ = s.pool.Exec(ctx, `
|
|
INSERT INTO dsr_communications (request_id, direction, channel, communication_type,
|
|
template_version_id, subject, body_html, body_text, recipient_email, error_message, created_by)
|
|
VALUES ($1, 'outbound', 'email', $2, $3, $4, $5, $6, $7, $8, $9)
|
|
`, requestID, req.CommunicationType, req.TemplateVersionID, subject, bodyHTML, bodyText,
|
|
dsr.RequesterEmail, err.Error(), sentBy)
|
|
return fmt.Errorf("failed to send email: %w", err)
|
|
}
|
|
}
|
|
|
|
// Log communication
|
|
now := time.Now()
|
|
_, err = s.pool.Exec(ctx, `
|
|
INSERT INTO dsr_communications (request_id, direction, channel, communication_type,
|
|
template_version_id, subject, body_html, body_text, recipient_email, sent_at, created_by)
|
|
VALUES ($1, 'outbound', 'email', $2, $3, $4, $5, $6, $7, $8, $9)
|
|
`, requestID, req.CommunicationType, req.TemplateVersionID, subject, bodyHTML, bodyText,
|
|
dsr.RequesterEmail, now, sentBy)
|
|
|
|
return err
|
|
}
|
|
|
|
// InitErasureExceptionChecks initializes exception checks for an erasure request
|
|
func (s *DSRService) InitErasureExceptionChecks(ctx context.Context, requestID uuid.UUID) error {
|
|
exceptions := []struct {
|
|
Type string
|
|
Description string
|
|
}{
|
|
{models.DSRExceptionFreedomExpression, "Ausübung des Rechts auf freie Meinungsäußerung und Information (Art. 17 Abs. 3 lit. a)"},
|
|
{models.DSRExceptionLegalObligation, "Erfüllung einer rechtlichen Verpflichtung oder öffentlichen Aufgabe (Art. 17 Abs. 3 lit. b)"},
|
|
{models.DSRExceptionPublicHealth, "Gründe des öffentlichen Interesses im Bereich der öffentlichen Gesundheit (Art. 17 Abs. 3 lit. c)"},
|
|
{models.DSRExceptionArchiving, "Im öffentlichen Interesse liegende Archivzwecke, Forschung oder Statistik (Art. 17 Abs. 3 lit. d)"},
|
|
{models.DSRExceptionLegalClaims, "Geltendmachung, Ausübung oder Verteidigung von Rechtsansprüchen (Art. 17 Abs. 3 lit. e)"},
|
|
}
|
|
|
|
for _, exc := range exceptions {
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO dsr_exception_checks (request_id, exception_type, description)
|
|
VALUES ($1, $2, $3) ON CONFLICT DO NOTHING
|
|
`, requestID, exc.Type, exc.Description)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create exception check: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetExceptionChecks retrieves exception checks for a DSR
|
|
func (s *DSRService) GetExceptionChecks(ctx context.Context, requestID uuid.UUID) ([]models.DSRExceptionCheck, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, request_id, exception_type, description, applies, checked_by, checked_at, notes, created_at
|
|
FROM dsr_exception_checks WHERE request_id = $1 ORDER BY created_at
|
|
`, requestID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query exception checks: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var checks []models.DSRExceptionCheck
|
|
for rows.Next() {
|
|
var c models.DSRExceptionCheck
|
|
err := rows.Scan(&c.ID, &c.RequestID, &c.ExceptionType, &c.Description, &c.Applies,
|
|
&c.CheckedBy, &c.CheckedAt, &c.Notes, &c.CreatedAt)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
checks = append(checks, c)
|
|
}
|
|
|
|
return checks, nil
|
|
}
|
|
|
|
// UpdateExceptionCheck updates an exception check
|
|
func (s *DSRService) UpdateExceptionCheck(ctx context.Context, checkID uuid.UUID, applies bool, notes *string, checkedBy uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE dsr_exception_checks
|
|
SET applies = $1, notes = $2, checked_by = $3, checked_at = NOW()
|
|
WHERE id = $4
|
|
`, applies, notes, checkedBy, checkID)
|
|
return err
|
|
}
|
|
|
|
// ProcessDeadlines checks for approaching and overdue deadlines
|
|
func (s *DSRService) ProcessDeadlines(ctx context.Context) error {
|
|
now := time.Now()
|
|
|
|
// Find requests with deadlines in 3 days
|
|
threeDaysAhead := now.AddDate(0, 0, 3)
|
|
rows, _ := s.pool.Query(ctx, `
|
|
SELECT id, request_number, request_type, assigned_to, deadline_at
|
|
FROM data_subject_requests
|
|
WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN $1 AND $2
|
|
AND status NOT IN ('completed', 'rejected', 'cancelled')
|
|
`, now, threeDaysAhead)
|
|
for rows.Next() {
|
|
var id uuid.UUID
|
|
var requestNumber, requestType string
|
|
var assignedTo *uuid.UUID
|
|
var deadline time.Time
|
|
rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline)
|
|
|
|
// Notify assigned user or all DPOs
|
|
if assignedTo != nil {
|
|
s.notifyDeadlineWarning(ctx, id, *assignedTo, requestNumber, deadline, 3)
|
|
} else {
|
|
s.notifyAllDPOs(ctx, id, requestNumber, "Frist in 3 Tagen", deadline)
|
|
}
|
|
}
|
|
rows.Close()
|
|
|
|
// Find requests with deadlines in 1 day
|
|
oneDayAhead := now.AddDate(0, 0, 1)
|
|
rows, _ = s.pool.Query(ctx, `
|
|
SELECT id, request_number, request_type, assigned_to, deadline_at
|
|
FROM data_subject_requests
|
|
WHERE COALESCE(extended_deadline_at, deadline_at) BETWEEN $1 AND $2
|
|
AND status NOT IN ('completed', 'rejected', 'cancelled')
|
|
`, now, oneDayAhead)
|
|
for rows.Next() {
|
|
var id uuid.UUID
|
|
var requestNumber, requestType string
|
|
var assignedTo *uuid.UUID
|
|
var deadline time.Time
|
|
rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline)
|
|
|
|
if assignedTo != nil {
|
|
s.notifyDeadlineWarning(ctx, id, *assignedTo, requestNumber, deadline, 1)
|
|
} else {
|
|
s.notifyAllDPOs(ctx, id, requestNumber, "Frist morgen!", deadline)
|
|
}
|
|
}
|
|
rows.Close()
|
|
|
|
// Find overdue requests
|
|
rows, _ = s.pool.Query(ctx, `
|
|
SELECT id, request_number, request_type, assigned_to, deadline_at
|
|
FROM data_subject_requests
|
|
WHERE COALESCE(extended_deadline_at, deadline_at) < $1
|
|
AND status NOT IN ('completed', 'rejected', 'cancelled')
|
|
`, now)
|
|
for rows.Next() {
|
|
var id uuid.UUID
|
|
var requestNumber, requestType string
|
|
var assignedTo *uuid.UUID
|
|
var deadline time.Time
|
|
rows.Scan(&id, &requestNumber, &requestType, &assignedTo, &deadline)
|
|
|
|
// Notify all DPOs for overdue
|
|
s.notifyAllDPOs(ctx, id, requestNumber, "ÜBERFÄLLIG!", deadline)
|
|
|
|
// Log to audit
|
|
s.pool.Exec(ctx, `
|
|
INSERT INTO consent_audit_log (action, entity_type, entity_id, details)
|
|
VALUES ('dsr_overdue', 'dsr', $1, $2)
|
|
`, id, fmt.Sprintf(`{"request_number": "%s", "deadline": "%s"}`, requestNumber, deadline.Format(time.RFC3339)))
|
|
}
|
|
rows.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
func (s *DSRService) recordStatusChange(ctx context.Context, requestID uuid.UUID, fromStatus *models.DSRStatus, toStatus models.DSRStatus, changedBy *uuid.UUID, comment string) {
|
|
s.pool.Exec(ctx, `
|
|
INSERT INTO dsr_status_history (request_id, from_status, to_status, changed_by, comment)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
`, requestID, fromStatus, toStatus, changedBy, comment)
|
|
}
|
|
|
|
func (s *DSRService) notifyNewRequest(ctx context.Context, dsr *models.DataSubjectRequest) {
|
|
if s.notificationService == nil {
|
|
return
|
|
}
|
|
// Notify all DPOs
|
|
rows, _ := s.pool.Query(ctx, "SELECT id FROM users WHERE role = 'data_protection_officer'")
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var userID uuid.UUID
|
|
rows.Scan(&userID)
|
|
s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRReceived,
|
|
"Neue Betroffenenanfrage",
|
|
fmt.Sprintf("Neue %s eingegangen: %s", dsr.RequestType.Label(), dsr.RequestNumber),
|
|
map[string]interface{}{"dsr_id": dsr.ID, "request_number": dsr.RequestNumber})
|
|
}
|
|
}
|
|
|
|
func (s *DSRService) notifyAssignment(ctx context.Context, dsrID, assigneeID uuid.UUID) {
|
|
if s.notificationService == nil {
|
|
return
|
|
}
|
|
dsr, _ := s.GetByID(ctx, dsrID)
|
|
if dsr != nil {
|
|
s.notificationService.CreateNotification(ctx, assigneeID, NotificationTypeDSRAssigned,
|
|
"Betroffenenanfrage zugewiesen",
|
|
fmt.Sprintf("Ihnen wurde die Anfrage %s zugewiesen", dsr.RequestNumber),
|
|
map[string]interface{}{"dsr_id": dsrID, "request_number": dsr.RequestNumber})
|
|
}
|
|
}
|
|
|
|
func (s *DSRService) notifyDeadlineWarning(ctx context.Context, dsrID, userID uuid.UUID, requestNumber string, deadline time.Time, daysLeft int) {
|
|
if s.notificationService == nil {
|
|
return
|
|
}
|
|
s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRDeadline,
|
|
fmt.Sprintf("Fristwarnung: %s", requestNumber),
|
|
fmt.Sprintf("Die Frist für %s läuft in %d Tag(en) ab (%s)", requestNumber, daysLeft, deadline.Format("02.01.2006")),
|
|
map[string]interface{}{"dsr_id": dsrID, "deadline": deadline, "days_left": daysLeft})
|
|
}
|
|
|
|
func (s *DSRService) notifyAllDPOs(ctx context.Context, dsrID uuid.UUID, requestNumber, message string, deadline time.Time) {
|
|
if s.notificationService == nil {
|
|
return
|
|
}
|
|
rows, _ := s.pool.Query(ctx, "SELECT id FROM users WHERE role = 'data_protection_officer'")
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var userID uuid.UUID
|
|
rows.Scan(&userID)
|
|
s.notificationService.CreateNotification(ctx, userID, NotificationTypeDSRDeadline,
|
|
fmt.Sprintf("%s: %s", message, requestNumber),
|
|
fmt.Sprintf("Anfrage %s: %s (Frist: %s)", requestNumber, message, deadline.Format("02.01.2006")),
|
|
map[string]interface{}{"dsr_id": dsrID, "deadline": deadline})
|
|
}
|
|
}
|
|
|
|
func isValidRequestType(rt models.DSRRequestType) bool {
|
|
switch rt {
|
|
case models.DSRTypeAccess, models.DSRTypeRectification, models.DSRTypeErasure,
|
|
models.DSRTypeRestriction, models.DSRTypePortability:
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isValidStatusTransition(from, to models.DSRStatus) bool {
|
|
validTransitions := map[models.DSRStatus][]models.DSRStatus{
|
|
models.DSRStatusIntake: {models.DSRStatusIdentityVerification, models.DSRStatusProcessing, models.DSRStatusRejected, models.DSRStatusCancelled},
|
|
models.DSRStatusIdentityVerification: {models.DSRStatusProcessing, models.DSRStatusRejected, models.DSRStatusCancelled},
|
|
models.DSRStatusProcessing: {models.DSRStatusCompleted, models.DSRStatusRejected, models.DSRStatusCancelled},
|
|
models.DSRStatusCompleted: {},
|
|
models.DSRStatusRejected: {},
|
|
models.DSRStatusCancelled: {},
|
|
}
|
|
|
|
allowed, exists := validTransitions[from]
|
|
if !exists {
|
|
return false
|
|
}
|
|
for _, s := range allowed {
|
|
if s == to {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func stringOrDefault(s *string, def string) string {
|
|
if s != nil {
|
|
return *s
|
|
}
|
|
return def
|
|
}
|
|
|
|
func replaceVariables(text string, variables map[string]string) string {
|
|
for k, v := range variables {
|
|
text = strings.ReplaceAll(text, "{{"+k+"}}", v)
|
|
}
|
|
return text
|
|
}
|
|
|
|
func stripHTML(html string) string {
|
|
// Simple HTML stripping - in production use a proper library
|
|
text := strings.ReplaceAll(html, "<br>", "\n")
|
|
text = strings.ReplaceAll(text, "<br/>", "\n")
|
|
text = strings.ReplaceAll(text, "<br />", "\n")
|
|
text = strings.ReplaceAll(text, "</p>", "\n\n")
|
|
// Remove all remaining tags
|
|
for {
|
|
start := strings.Index(text, "<")
|
|
if start == -1 {
|
|
break
|
|
}
|
|
end := strings.Index(text[start:], ">")
|
|
if end == -1 {
|
|
break
|
|
}
|
|
text = text[:start] + text[start+end+1:]
|
|
}
|
|
return strings.TrimSpace(text)
|
|
}
|