Files
breakpilot-compliance/ai-compliance-sdk/internal/ucca/escalation_store.go
Sharang Parnerkar f7a5f9e1ed refactor(go/ucca): split license_policy, models, pdf_export, escalation_store, obligations_registry
Split 5 oversized files (501-583 LOC each) into focused units all under 500 LOC:
- license_policy.go → +_types.go (engine logic / type definitions)
- models.go → +_intake.go, +_assessment.go (enums+domains / intake structs / output+DB types)
- pdf_export.go → +_markdown.go (PDF export / markdown export)
- escalation_store.go → +_dsb.go (main escalation ops / DSB pool ops)
- obligations_registry.go → +_grouping.go (registry core / grouping methods)

All files remain in package ucca. Zero behavior changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 10:03:51 +02:00

390 lines
11 KiB
Go

package ucca
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
)
// EscalationStore handles database operations for escalations.
type EscalationStore struct {
pool *pgxpool.Pool
}
// NewEscalationStore creates a new escalation store.
func NewEscalationStore(pool *pgxpool.Pool) *EscalationStore {
return &EscalationStore{pool: pool}
}
// CreateEscalation creates a new escalation for an assessment.
func (s *EscalationStore) CreateEscalation(ctx context.Context, e *Escalation) error {
conditionsJSON, err := json.Marshal(e.Conditions)
if err != nil {
conditionsJSON = []byte("[]")
}
query := `
INSERT INTO ucca_escalations (
id, tenant_id, assessment_id, escalation_level, escalation_reason,
status, conditions, due_date, created_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, NOW(), NOW()
)
`
e.ID = uuid.New()
e.CreatedAt = time.Now().UTC()
e.UpdatedAt = e.CreatedAt
_, err = s.pool.Exec(ctx, query,
e.ID, e.TenantID, e.AssessmentID, e.EscalationLevel, e.EscalationReason,
e.Status, conditionsJSON, e.DueDate,
)
return err
}
// GetEscalation retrieves an escalation by ID.
func (s *EscalationStore) GetEscalation(ctx context.Context, id uuid.UUID) (*Escalation, error) {
query := `
SELECT id, tenant_id, assessment_id, escalation_level, escalation_reason,
assigned_to, assigned_role, assigned_at, status, reviewer_id,
reviewer_notes, reviewed_at, decision, decision_notes, decision_at,
conditions, created_at, updated_at, due_date,
notification_sent, notification_sent_at
FROM ucca_escalations
WHERE id = $1
`
var e Escalation
var conditionsJSON []byte
err := s.pool.QueryRow(ctx, query, id).Scan(
&e.ID, &e.TenantID, &e.AssessmentID, &e.EscalationLevel, &e.EscalationReason,
&e.AssignedTo, &e.AssignedRole, &e.AssignedAt, &e.Status, &e.ReviewerID,
&e.ReviewerNotes, &e.ReviewedAt, &e.Decision, &e.DecisionNotes, &e.DecisionAt,
&conditionsJSON, &e.CreatedAt, &e.UpdatedAt, &e.DueDate,
&e.NotificationSent, &e.NotificationSentAt,
)
if err != nil {
return nil, err
}
if len(conditionsJSON) > 0 {
json.Unmarshal(conditionsJSON, &e.Conditions)
}
return &e, nil
}
// GetEscalationByAssessment retrieves an escalation for an assessment.
func (s *EscalationStore) GetEscalationByAssessment(ctx context.Context, assessmentID uuid.UUID) (*Escalation, error) {
query := `
SELECT id, tenant_id, assessment_id, escalation_level, escalation_reason,
assigned_to, assigned_role, assigned_at, status, reviewer_id,
reviewer_notes, reviewed_at, decision, decision_notes, decision_at,
conditions, created_at, updated_at, due_date,
notification_sent, notification_sent_at
FROM ucca_escalations
WHERE assessment_id = $1
ORDER BY created_at DESC
LIMIT 1
`
var e Escalation
var conditionsJSON []byte
err := s.pool.QueryRow(ctx, query, assessmentID).Scan(
&e.ID, &e.TenantID, &e.AssessmentID, &e.EscalationLevel, &e.EscalationReason,
&e.AssignedTo, &e.AssignedRole, &e.AssignedAt, &e.Status, &e.ReviewerID,
&e.ReviewerNotes, &e.ReviewedAt, &e.Decision, &e.DecisionNotes, &e.DecisionAt,
&conditionsJSON, &e.CreatedAt, &e.UpdatedAt, &e.DueDate,
&e.NotificationSent, &e.NotificationSentAt,
)
if err != nil {
return nil, err
}
if len(conditionsJSON) > 0 {
json.Unmarshal(conditionsJSON, &e.Conditions)
}
return &e, nil
}
// ListEscalations lists escalations for a tenant with optional filters.
func (s *EscalationStore) ListEscalations(ctx context.Context, tenantID uuid.UUID, status string, level string, assignedTo *uuid.UUID) ([]EscalationWithAssessment, error) {
query := `
SELECT e.id, e.tenant_id, e.assessment_id, e.escalation_level, e.escalation_reason,
e.assigned_to, e.assigned_role, e.assigned_at, e.status, e.reviewer_id,
e.reviewer_notes, e.reviewed_at, e.decision, e.decision_notes, e.decision_at,
e.conditions, e.created_at, e.updated_at, e.due_date,
e.notification_sent, e.notification_sent_at,
a.title, a.feasibility, a.risk_score, a.domain
FROM ucca_escalations e
JOIN ucca_assessments a ON e.assessment_id = a.id
WHERE e.tenant_id = $1
`
args := []interface{}{tenantID}
argCount := 1
if status != "" {
argCount++
query += fmt.Sprintf(" AND e.status = $%d", argCount)
args = append(args, status)
}
if level != "" {
argCount++
query += fmt.Sprintf(" AND e.escalation_level = $%d", argCount)
args = append(args, level)
}
if assignedTo != nil {
argCount++
query += fmt.Sprintf(" AND e.assigned_to = $%d", argCount)
args = append(args, *assignedTo)
}
query += " ORDER BY e.created_at DESC"
rows, err := s.pool.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var escalations []EscalationWithAssessment
for rows.Next() {
var e EscalationWithAssessment
var conditionsJSON []byte
err := rows.Scan(
&e.ID, &e.TenantID, &e.AssessmentID, &e.EscalationLevel, &e.EscalationReason,
&e.AssignedTo, &e.AssignedRole, &e.AssignedAt, &e.Status, &e.ReviewerID,
&e.ReviewerNotes, &e.ReviewedAt, &e.Decision, &e.DecisionNotes, &e.DecisionAt,
&conditionsJSON, &e.CreatedAt, &e.UpdatedAt, &e.DueDate,
&e.NotificationSent, &e.NotificationSentAt,
&e.AssessmentTitle, &e.AssessmentFeasibility, &e.AssessmentRiskScore, &e.AssessmentDomain,
)
if err != nil {
return nil, err
}
if len(conditionsJSON) > 0 {
json.Unmarshal(conditionsJSON, &e.Conditions)
}
escalations = append(escalations, e)
}
return escalations, nil
}
// AssignEscalation assigns an escalation to a reviewer.
func (s *EscalationStore) AssignEscalation(ctx context.Context, id uuid.UUID, assignedTo uuid.UUID, role string) error {
query := `
UPDATE ucca_escalations
SET assigned_to = $2, assigned_role = $3, assigned_at = NOW(),
status = 'assigned', updated_at = NOW()
WHERE id = $1
`
_, err := s.pool.Exec(ctx, query, id, assignedTo, role)
return err
}
// StartReview marks an escalation as being reviewed.
func (s *EscalationStore) StartReview(ctx context.Context, id uuid.UUID, reviewerID uuid.UUID) error {
query := `
UPDATE ucca_escalations
SET reviewer_id = $2, status = 'in_review', updated_at = NOW()
WHERE id = $1
`
_, err := s.pool.Exec(ctx, query, id, reviewerID)
return err
}
// DecideEscalation records a decision on an escalation.
func (s *EscalationStore) DecideEscalation(ctx context.Context, id uuid.UUID, decision EscalationDecision, notes string, conditions []string) error {
var newStatus EscalationStatus
switch decision {
case EscalationDecisionApprove:
newStatus = EscalationStatusApproved
case EscalationDecisionReject:
newStatus = EscalationStatusRejected
case EscalationDecisionModify:
newStatus = EscalationStatusReturned
case EscalationDecisionEscalate:
// Keep in review for re-assignment
newStatus = EscalationStatusPending
default:
newStatus = EscalationStatusPending
}
conditionsJSON, _ := json.Marshal(conditions)
query := `
UPDATE ucca_escalations
SET decision = $2, decision_notes = $3, decision_at = NOW(),
status = $4, conditions = $5, updated_at = NOW()
WHERE id = $1
`
_, err := s.pool.Exec(ctx, query, id, decision, notes, newStatus, conditionsJSON)
return err
}
// AddEscalationHistory adds an audit entry for an escalation.
func (s *EscalationStore) AddEscalationHistory(ctx context.Context, h *EscalationHistory) error {
query := `
INSERT INTO ucca_escalation_history (
id, escalation_id, action, old_status, new_status,
old_level, new_level, actor_id, actor_role, notes, created_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW()
)
`
h.ID = uuid.New()
h.CreatedAt = time.Now().UTC()
_, err := s.pool.Exec(ctx, query,
h.ID, h.EscalationID, h.Action, h.OldStatus, h.NewStatus,
h.OldLevel, h.NewLevel, h.ActorID, h.ActorRole, h.Notes,
)
return err
}
// GetEscalationHistory retrieves the audit history for an escalation.
func (s *EscalationStore) GetEscalationHistory(ctx context.Context, escalationID uuid.UUID) ([]EscalationHistory, error) {
query := `
SELECT id, escalation_id, action, old_status, new_status,
old_level, new_level, actor_id, actor_role, notes, created_at
FROM ucca_escalation_history
WHERE escalation_id = $1
ORDER BY created_at ASC
`
rows, err := s.pool.Query(ctx, query, escalationID)
if err != nil {
return nil, err
}
defer rows.Close()
var history []EscalationHistory
for rows.Next() {
var h EscalationHistory
err := rows.Scan(
&h.ID, &h.EscalationID, &h.Action, &h.OldStatus, &h.NewStatus,
&h.OldLevel, &h.NewLevel, &h.ActorID, &h.ActorRole, &h.Notes, &h.CreatedAt,
)
if err != nil {
return nil, err
}
history = append(history, h)
}
return history, nil
}
// GetEscalationStats retrieves escalation statistics for a tenant.
func (s *EscalationStore) GetEscalationStats(ctx context.Context, tenantID uuid.UUID) (*EscalationStats, error) {
stats := &EscalationStats{
ByLevel: make(map[EscalationLevel]int),
}
// Count by status
statusQuery := `
SELECT status, COUNT(*) as count
FROM ucca_escalations
WHERE tenant_id = $1
GROUP BY status
`
rows, err := s.pool.Query(ctx, statusQuery, tenantID)
if err != nil {
return nil, err
}
for rows.Next() {
var status string
var count int
if err := rows.Scan(&status, &count); err != nil {
continue
}
switch EscalationStatus(status) {
case EscalationStatusPending:
stats.TotalPending = count
case EscalationStatusInReview, EscalationStatusAssigned:
stats.TotalInReview += count
case EscalationStatusApproved:
stats.TotalApproved = count
case EscalationStatusRejected:
stats.TotalRejected = count
}
}
rows.Close()
// Count by level
levelQuery := `
SELECT escalation_level, COUNT(*) as count
FROM ucca_escalations
WHERE tenant_id = $1 AND status NOT IN ('approved', 'rejected')
GROUP BY escalation_level
`
rows, err = s.pool.Query(ctx, levelQuery, tenantID)
if err != nil {
return nil, err
}
for rows.Next() {
var level string
var count int
if err := rows.Scan(&level, &count); err != nil {
continue
}
stats.ByLevel[EscalationLevel(level)] = count
}
rows.Close()
// Count overdue SLA
overdueQuery := `
SELECT COUNT(*)
FROM ucca_escalations
WHERE tenant_id = $1
AND status NOT IN ('approved', 'rejected')
AND due_date < NOW()
`
s.pool.QueryRow(ctx, overdueQuery, tenantID).Scan(&stats.OverdueSLA)
// Count approaching SLA (within 8 hours)
approachingQuery := `
SELECT COUNT(*)
FROM ucca_escalations
WHERE tenant_id = $1
AND status NOT IN ('approved', 'rejected')
AND due_date > NOW()
AND due_date < NOW() + INTERVAL '8 hours'
`
s.pool.QueryRow(ctx, approachingQuery, tenantID).Scan(&stats.ApproachingSLA)
// Average resolution time
avgQuery := `
SELECT COALESCE(AVG(EXTRACT(EPOCH FROM (decision_at - created_at)) / 3600), 0)
FROM ucca_escalations
WHERE tenant_id = $1 AND decision_at IS NOT NULL
`
s.pool.QueryRow(ctx, avgQuery, tenantID).Scan(&stats.AvgResolutionHours)
return stats, nil
}