package pipeline import ( "context" "log" "strings" "sync" "time" "github.com/breakpilot/edu-search-service/internal/crawler" "github.com/breakpilot/edu-search-service/internal/extractor" "github.com/breakpilot/edu-search-service/internal/indexer" "github.com/breakpilot/edu-search-service/internal/tagger" ) // Pipeline orchestrates crawling, extraction, tagging, and indexing type Pipeline struct { crawler *crawler.Crawler tagger *tagger.Tagger indexClient *indexer.Client maxPages int workers int } // Stats tracks pipeline execution statistics type Stats struct { StartTime time.Time EndTime time.Time URLsProcessed int URLsSuccessful int URLsFailed int URLsSkipped int DocumentsIndexed int } // NewPipeline creates a new crawl pipeline func NewPipeline( crawlerInstance *crawler.Crawler, taggerInstance *tagger.Tagger, indexClient *indexer.Client, maxPages int, ) *Pipeline { return &Pipeline{ crawler: crawlerInstance, tagger: taggerInstance, indexClient: indexClient, maxPages: maxPages, workers: 5, // concurrent workers } } // Run executes the crawl pipeline func (p *Pipeline) Run(ctx context.Context, seedsDir string) (*Stats, error) { stats := &Stats{ StartTime: time.Now(), } // Load seed URLs seeds, err := p.crawler.LoadSeeds(seedsDir) if err != nil { return nil, err } log.Printf("Pipeline starting with %d seeds, max %d pages", len(seeds), p.maxPages) // Create URL queue urlQueue := make(chan string, len(seeds)*10) visited := &sync.Map{} // Add seeds to queue for _, seed := range seeds { normalized := crawler.NormalizeURL(seed) if _, loaded := visited.LoadOrStore(normalized, true); !loaded { urlQueue <- seed } } // Results channel results := make(chan *processResult, p.workers*2) var wg sync.WaitGroup // Start workers for i := 0; i < p.workers; i++ { wg.Add(1) go p.worker(ctx, i, urlQueue, results, visited, &wg) } // Close results when all workers done go func() { wg.Wait() close(results) }() // Process results and collect stats var documents []indexer.Document processed := 0 for result := range results { stats.URLsProcessed++ if result.err != nil { stats.URLsFailed++ continue } if result.skipped { stats.URLsSkipped++ continue } if result.document != nil { documents = append(documents, *result.document) stats.URLsSuccessful++ // Bulk index every 50 documents if len(documents) >= 50 { if err := p.indexClient.BulkIndex(ctx, documents); err != nil { log.Printf("Bulk index error: %v", err) } else { stats.DocumentsIndexed += len(documents) } documents = nil } } processed++ if processed >= p.maxPages { log.Printf("Reached max pages limit (%d)", p.maxPages) close(urlQueue) break } } // Index remaining documents if len(documents) > 0 { if err := p.indexClient.BulkIndex(ctx, documents); err != nil { log.Printf("Final bulk index error: %v", err) } else { stats.DocumentsIndexed += len(documents) } } stats.EndTime = time.Now() log.Printf("Pipeline completed: %d processed, %d indexed, %d failed, %d skipped in %v", stats.URLsProcessed, stats.DocumentsIndexed, stats.URLsFailed, stats.URLsSkipped, stats.EndTime.Sub(stats.StartTime)) return stats, nil } type processResult struct { url string document *indexer.Document err error skipped bool } func (p *Pipeline) worker( ctx context.Context, id int, urlQueue chan string, results chan<- *processResult, visited *sync.Map, wg *sync.WaitGroup, ) { defer wg.Done() for url := range urlQueue { select { case <-ctx.Done(): return default: result := p.processURL(ctx, url, urlQueue, visited) results <- result } } } func (p *Pipeline) processURL( ctx context.Context, url string, urlQueue chan<- string, visited *sync.Map, ) *processResult { result := &processResult{url: url} // Fetch URL fetchResult, err := p.crawler.Fetch(ctx, url) if err != nil { result.err = err return result } // Check content type contentType := strings.ToLower(fetchResult.ContentType) if !strings.Contains(contentType, "text/html") && !strings.Contains(contentType, "application/pdf") { result.skipped = true return result } // Extract content var extracted *extractor.ExtractedContent if strings.Contains(contentType, "text/html") { extracted, err = extractor.ExtractHTML(fetchResult.Body) } else if strings.Contains(contentType, "application/pdf") { extracted, err = extractor.ExtractPDF(fetchResult.Body) } if err != nil { result.err = err return result } // Skip if too little content if extracted.ContentLength < 100 { result.skipped = true return result } // Tag content features := tagger.ContentFeatures{ AdDensity: extracted.Features.AdDensity, LinkDensity: extracted.Features.LinkDensity, ContentLength: extracted.ContentLength, } tags := p.tagger.Tag(fetchResult.CanonicalURL, extracted.Title, extracted.ContentText, features) // Create document doc := &indexer.Document{ DocID: crawler.GenerateDocID(), URL: fetchResult.CanonicalURL, Domain: crawler.ExtractDomain(fetchResult.CanonicalURL), Title: extracted.Title, ContentText: extracted.ContentText, SnippetText: extracted.SnippetText, ContentHash: fetchResult.ContentHash, DocType: tags.DocType, Subjects: tags.Subjects, SchoolLevel: tags.SchoolLevel, State: tags.State, Language: extracted.Language, TrustScore: tags.TrustScore, QualityScore: calculateQualityScore(extracted, tags), FetchedAt: fetchResult.FetchTime, UpdatedAt: time.Now(), } result.document = doc // Extract and queue new links (limited to same domain for now) docDomain := crawler.ExtractDomain(url) for _, link := range extracted.Links { linkDomain := crawler.ExtractDomain(link) if linkDomain == docDomain { normalized := crawler.NormalizeURL(link) if _, loaded := visited.LoadOrStore(normalized, true); !loaded { select { case urlQueue <- link: default: // Queue full, skip } } } } return result } func calculateQualityScore(extracted *extractor.ExtractedContent, tags tagger.TagResult) float64 { score := 0.5 // base // Content length bonus if extracted.ContentLength > 1000 { score += 0.1 } if extracted.ContentLength > 5000 { score += 0.1 } // Has headings if len(extracted.Headings) > 0 { score += 0.1 } // Low ad density if extracted.Features.AdDensity < 0.1 { score += 0.1 } // Good text/HTML ratio if extracted.Features.TextToHTMLRatio > 0.2 { score += 0.1 } // Clamp if score > 1 { score = 1 } return score }