// Package orchestrator implements multi-phase university crawling with queue management package orchestrator import ( "context" "fmt" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // PostgresRepository implements the Repository interface using PostgreSQL type PostgresRepository struct { pool *pgxpool.Pool } // NewPostgresRepository creates a new PostgresRepository func NewPostgresRepository(pool *pgxpool.Pool) *PostgresRepository { return &PostgresRepository{pool: pool} } // ============================================================================ // QUEUE OPERATIONS // ============================================================================ // GetQueueItems retrieves all items in the crawl queue func (r *PostgresRepository) GetQueueItems(ctx context.Context) ([]CrawlQueueItem, error) { query := ` SELECT cq.id, cq.university_id, u.name, COALESCE(u.short_name, ''), cq.queue_position, cq.priority, cq.current_phase, cq.discovery_completed, cq.discovery_completed_at, cq.professors_completed, cq.professors_completed_at, cq.all_staff_completed, cq.all_staff_completed_at, cq.publications_completed, cq.publications_completed_at, cq.discovery_count, cq.professors_count, cq.staff_count, cq.publications_count, cq.retry_count, cq.max_retries, COALESCE(cq.last_error, ''), cq.started_at, cq.completed_at, CASE WHEN cq.current_phase = 'pending' THEN 0 WHEN cq.current_phase = 'discovery' THEN 10 WHEN cq.current_phase = 'professors' THEN 30 WHEN cq.current_phase = 'all_staff' THEN 60 WHEN cq.current_phase = 'publications' THEN 90 WHEN cq.current_phase = 'completed' THEN 100 ELSE 0 END as progress_percent, cq.created_at, cq.updated_at FROM crawl_queue cq JOIN universities u ON cq.university_id = u.id ORDER BY cq.queue_position NULLS LAST, cq.priority DESC ` rows, err := r.pool.Query(ctx, query) if err != nil { return nil, fmt.Errorf("failed to query queue items: %w", err) } defer rows.Close() var items []CrawlQueueItem for rows.Next() { var item CrawlQueueItem var phase string if err := rows.Scan( &item.ID, &item.UniversityID, &item.UniversityName, &item.UniversityShort, &item.QueuePosition, &item.Priority, &phase, &item.DiscoveryCompleted, &item.DiscoveryCompletedAt, &item.ProfessorsCompleted, &item.ProfessorsCompletedAt, &item.AllStaffCompleted, &item.AllStaffCompletedAt, &item.PublicationsCompleted, &item.PublicationsCompletedAt, &item.DiscoveryCount, &item.ProfessorsCount, &item.StaffCount, &item.PublicationsCount, &item.RetryCount, &item.MaxRetries, &item.LastError, &item.StartedAt, &item.CompletedAt, &item.ProgressPercent, &item.CreatedAt, &item.UpdatedAt, ); err != nil { return nil, fmt.Errorf("failed to scan queue item: %w", err) } item.CurrentPhase = CrawlPhase(phase) items = append(items, item) } return items, rows.Err() } // GetNextInQueue retrieves the next item to process func (r *PostgresRepository) GetNextInQueue(ctx context.Context) (*CrawlQueueItem, error) { query := ` SELECT cq.id, cq.university_id, u.name, COALESCE(u.short_name, ''), cq.queue_position, cq.priority, cq.current_phase, cq.discovery_completed, cq.discovery_completed_at, cq.professors_completed, cq.professors_completed_at, cq.all_staff_completed, cq.all_staff_completed_at, cq.publications_completed, cq.publications_completed_at, cq.discovery_count, cq.professors_count, cq.staff_count, cq.publications_count, cq.retry_count, cq.max_retries, COALESCE(cq.last_error, ''), cq.started_at, cq.completed_at, cq.created_at, cq.updated_at FROM crawl_queue cq JOIN universities u ON cq.university_id = u.id WHERE cq.current_phase NOT IN ('completed', 'failed', 'paused') AND cq.queue_position IS NOT NULL ORDER BY cq.queue_position ASC, cq.priority DESC LIMIT 1 ` var item CrawlQueueItem var phase string err := r.pool.QueryRow(ctx, query).Scan( &item.ID, &item.UniversityID, &item.UniversityName, &item.UniversityShort, &item.QueuePosition, &item.Priority, &phase, &item.DiscoveryCompleted, &item.DiscoveryCompletedAt, &item.ProfessorsCompleted, &item.ProfessorsCompletedAt, &item.AllStaffCompleted, &item.AllStaffCompletedAt, &item.PublicationsCompleted, &item.PublicationsCompletedAt, &item.DiscoveryCount, &item.ProfessorsCount, &item.StaffCount, &item.PublicationsCount, &item.RetryCount, &item.MaxRetries, &item.LastError, &item.StartedAt, &item.CompletedAt, &item.CreatedAt, &item.UpdatedAt, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("failed to get next queue item: %w", err) } item.CurrentPhase = CrawlPhase(phase) return &item, nil } // AddToQueue adds a university to the crawl queue func (r *PostgresRepository) AddToQueue(ctx context.Context, universityID uuid.UUID, priority int, initiatedBy string) (*CrawlQueueItem, error) { // Get next queue position var nextPosition int err := r.pool.QueryRow(ctx, `SELECT COALESCE(MAX(queue_position), 0) + 1 FROM crawl_queue WHERE queue_position IS NOT NULL`).Scan(&nextPosition) if err != nil { return nil, fmt.Errorf("failed to get next queue position: %w", err) } query := ` INSERT INTO crawl_queue (university_id, queue_position, priority, initiated_by) VALUES ($1, $2, $3, $4) ON CONFLICT (university_id) DO UPDATE SET queue_position = EXCLUDED.queue_position, priority = EXCLUDED.priority, current_phase = 'pending', retry_count = 0, last_error = NULL, updated_at = NOW() RETURNING id, created_at, updated_at ` item := &CrawlQueueItem{ UniversityID: universityID, QueuePosition: &nextPosition, Priority: priority, CurrentPhase: PhasePending, MaxRetries: 3, } err = r.pool.QueryRow(ctx, query, universityID, nextPosition, priority, initiatedBy).Scan( &item.ID, &item.CreatedAt, &item.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to add to queue: %w", err) } // Get university name r.pool.QueryRow(ctx, `SELECT name, short_name FROM universities WHERE id = $1`, universityID).Scan( &item.UniversityName, &item.UniversityShort, ) return item, nil } // RemoveFromQueue removes a university from the queue func (r *PostgresRepository) RemoveFromQueue(ctx context.Context, universityID uuid.UUID) error { _, err := r.pool.Exec(ctx, `DELETE FROM crawl_queue WHERE university_id = $1`, universityID) return err } // UpdateQueueItem updates a queue item func (r *PostgresRepository) UpdateQueueItem(ctx context.Context, item *CrawlQueueItem) error { query := ` UPDATE crawl_queue SET queue_position = $2, priority = $3, current_phase = $4, discovery_completed = $5, discovery_completed_at = $6, professors_completed = $7, professors_completed_at = $8, all_staff_completed = $9, all_staff_completed_at = $10, publications_completed = $11, publications_completed_at = $12, discovery_count = $13, professors_count = $14, staff_count = $15, publications_count = $16, retry_count = $17, last_error = $18, started_at = $19, completed_at = $20, updated_at = NOW() WHERE university_id = $1 ` _, err := r.pool.Exec(ctx, query, item.UniversityID, item.QueuePosition, item.Priority, string(item.CurrentPhase), item.DiscoveryCompleted, item.DiscoveryCompletedAt, item.ProfessorsCompleted, item.ProfessorsCompletedAt, item.AllStaffCompleted, item.AllStaffCompletedAt, item.PublicationsCompleted, item.PublicationsCompletedAt, item.DiscoveryCount, item.ProfessorsCount, item.StaffCount, item.PublicationsCount, item.RetryCount, item.LastError, item.StartedAt, item.CompletedAt, ) return err } // PauseQueueItem pauses a crawl func (r *PostgresRepository) PauseQueueItem(ctx context.Context, universityID uuid.UUID) error { _, err := r.pool.Exec(ctx, `UPDATE crawl_queue SET current_phase = 'paused', updated_at = NOW() WHERE university_id = $1`, universityID) return err } // ResumeQueueItem resumes a paused crawl func (r *PostgresRepository) ResumeQueueItem(ctx context.Context, universityID uuid.UUID) error { // Determine what phase to resume from query := ` UPDATE crawl_queue SET current_phase = CASE WHEN NOT discovery_completed THEN 'discovery' WHEN NOT professors_completed THEN 'professors' WHEN NOT all_staff_completed THEN 'all_staff' WHEN NOT publications_completed THEN 'publications' ELSE 'pending' END, updated_at = NOW() WHERE university_id = $1 AND current_phase = 'paused' ` _, err := r.pool.Exec(ctx, query, universityID) return err } // ============================================================================ // PHASE UPDATES // ============================================================================ // CompletePhase marks a phase as completed func (r *PostgresRepository) CompletePhase(ctx context.Context, universityID uuid.UUID, phase CrawlPhase, count int) error { now := time.Now() var query string switch phase { case PhaseDiscovery: query = `UPDATE crawl_queue SET discovery_completed = true, discovery_completed_at = $2, discovery_count = $3, updated_at = NOW() WHERE university_id = $1` case PhaseProfessors: query = `UPDATE crawl_queue SET professors_completed = true, professors_completed_at = $2, professors_count = $3, updated_at = NOW() WHERE university_id = $1` case PhaseAllStaff: query = `UPDATE crawl_queue SET all_staff_completed = true, all_staff_completed_at = $2, staff_count = $3, updated_at = NOW() WHERE university_id = $1` case PhasePublications: query = `UPDATE crawl_queue SET publications_completed = true, publications_completed_at = $2, publications_count = $3, updated_at = NOW() WHERE university_id = $1` default: return fmt.Errorf("unknown phase: %s", phase) } _, err := r.pool.Exec(ctx, query, universityID, now, count) return err } // FailPhase records a phase failure func (r *PostgresRepository) FailPhase(ctx context.Context, universityID uuid.UUID, phase CrawlPhase, errMsg string) error { query := ` UPDATE crawl_queue SET retry_count = retry_count + 1, last_error = $2, current_phase = CASE WHEN retry_count + 1 >= max_retries THEN 'failed' ELSE current_phase END, updated_at = NOW() WHERE university_id = $1 ` _, err := r.pool.Exec(ctx, query, universityID, errMsg) return err } // ============================================================================ // STATS // ============================================================================ // GetCompletedTodayCount returns the number of universities completed today func (r *PostgresRepository) GetCompletedTodayCount(ctx context.Context) (int, error) { var count int err := r.pool.QueryRow(ctx, ` SELECT COUNT(*) FROM crawl_queue WHERE current_phase = 'completed' AND completed_at >= CURRENT_DATE `).Scan(&count) return count, err } // GetTotalProcessedCount returns the total number of processed universities func (r *PostgresRepository) GetTotalProcessedCount(ctx context.Context) (int, error) { var count int err := r.pool.QueryRow(ctx, `SELECT COUNT(*) FROM crawl_queue WHERE current_phase = 'completed'`).Scan(&count) return count, err }