This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/edu-search-service/internal/pipeline/pipeline.go
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

302 lines
6.6 KiB
Go

package pipeline
import (
"context"
"log"
"strings"
"sync"
"time"
"github.com/breakpilot/edu-search-service/internal/crawler"
"github.com/breakpilot/edu-search-service/internal/extractor"
"github.com/breakpilot/edu-search-service/internal/indexer"
"github.com/breakpilot/edu-search-service/internal/tagger"
)
// Pipeline orchestrates crawling, extraction, tagging, and indexing
type Pipeline struct {
crawler *crawler.Crawler
tagger *tagger.Tagger
indexClient *indexer.Client
maxPages int
workers int
}
// Stats tracks pipeline execution statistics
type Stats struct {
StartTime time.Time
EndTime time.Time
URLsProcessed int
URLsSuccessful int
URLsFailed int
URLsSkipped int
DocumentsIndexed int
}
// NewPipeline creates a new crawl pipeline
func NewPipeline(
crawlerInstance *crawler.Crawler,
taggerInstance *tagger.Tagger,
indexClient *indexer.Client,
maxPages int,
) *Pipeline {
return &Pipeline{
crawler: crawlerInstance,
tagger: taggerInstance,
indexClient: indexClient,
maxPages: maxPages,
workers: 5, // concurrent workers
}
}
// Run executes the crawl pipeline
func (p *Pipeline) Run(ctx context.Context, seedsDir string) (*Stats, error) {
stats := &Stats{
StartTime: time.Now(),
}
// Load seed URLs
seeds, err := p.crawler.LoadSeeds(seedsDir)
if err != nil {
return nil, err
}
log.Printf("Pipeline starting with %d seeds, max %d pages", len(seeds), p.maxPages)
// Create URL queue
urlQueue := make(chan string, len(seeds)*10)
visited := &sync.Map{}
// Add seeds to queue
for _, seed := range seeds {
normalized := crawler.NormalizeURL(seed)
if _, loaded := visited.LoadOrStore(normalized, true); !loaded {
urlQueue <- seed
}
}
// Results channel
results := make(chan *processResult, p.workers*2)
var wg sync.WaitGroup
// Start workers
for i := 0; i < p.workers; i++ {
wg.Add(1)
go p.worker(ctx, i, urlQueue, results, visited, &wg)
}
// Close results when all workers done
go func() {
wg.Wait()
close(results)
}()
// Process results and collect stats
var documents []indexer.Document
processed := 0
for result := range results {
stats.URLsProcessed++
if result.err != nil {
stats.URLsFailed++
continue
}
if result.skipped {
stats.URLsSkipped++
continue
}
if result.document != nil {
documents = append(documents, *result.document)
stats.URLsSuccessful++
// Bulk index every 50 documents
if len(documents) >= 50 {
if err := p.indexClient.BulkIndex(ctx, documents); err != nil {
log.Printf("Bulk index error: %v", err)
} else {
stats.DocumentsIndexed += len(documents)
}
documents = nil
}
}
processed++
if processed >= p.maxPages {
log.Printf("Reached max pages limit (%d)", p.maxPages)
close(urlQueue)
break
}
}
// Index remaining documents
if len(documents) > 0 {
if err := p.indexClient.BulkIndex(ctx, documents); err != nil {
log.Printf("Final bulk index error: %v", err)
} else {
stats.DocumentsIndexed += len(documents)
}
}
stats.EndTime = time.Now()
log.Printf("Pipeline completed: %d processed, %d indexed, %d failed, %d skipped in %v",
stats.URLsProcessed, stats.DocumentsIndexed, stats.URLsFailed, stats.URLsSkipped,
stats.EndTime.Sub(stats.StartTime))
return stats, nil
}
type processResult struct {
url string
document *indexer.Document
err error
skipped bool
}
func (p *Pipeline) worker(
ctx context.Context,
id int,
urlQueue chan string,
results chan<- *processResult,
visited *sync.Map,
wg *sync.WaitGroup,
) {
defer wg.Done()
for url := range urlQueue {
select {
case <-ctx.Done():
return
default:
result := p.processURL(ctx, url, urlQueue, visited)
results <- result
}
}
}
func (p *Pipeline) processURL(
ctx context.Context,
url string,
urlQueue chan<- string,
visited *sync.Map,
) *processResult {
result := &processResult{url: url}
// Fetch URL
fetchResult, err := p.crawler.Fetch(ctx, url)
if err != nil {
result.err = err
return result
}
// Check content type
contentType := strings.ToLower(fetchResult.ContentType)
if !strings.Contains(contentType, "text/html") && !strings.Contains(contentType, "application/pdf") {
result.skipped = true
return result
}
// Extract content
var extracted *extractor.ExtractedContent
if strings.Contains(contentType, "text/html") {
extracted, err = extractor.ExtractHTML(fetchResult.Body)
} else if strings.Contains(contentType, "application/pdf") {
extracted, err = extractor.ExtractPDF(fetchResult.Body)
}
if err != nil {
result.err = err
return result
}
// Skip if too little content
if extracted.ContentLength < 100 {
result.skipped = true
return result
}
// Tag content
features := tagger.ContentFeatures{
AdDensity: extracted.Features.AdDensity,
LinkDensity: extracted.Features.LinkDensity,
ContentLength: extracted.ContentLength,
}
tags := p.tagger.Tag(fetchResult.CanonicalURL, extracted.Title, extracted.ContentText, features)
// Create document
doc := &indexer.Document{
DocID: crawler.GenerateDocID(),
URL: fetchResult.CanonicalURL,
Domain: crawler.ExtractDomain(fetchResult.CanonicalURL),
Title: extracted.Title,
ContentText: extracted.ContentText,
SnippetText: extracted.SnippetText,
ContentHash: fetchResult.ContentHash,
DocType: tags.DocType,
Subjects: tags.Subjects,
SchoolLevel: tags.SchoolLevel,
State: tags.State,
Language: extracted.Language,
TrustScore: tags.TrustScore,
QualityScore: calculateQualityScore(extracted, tags),
FetchedAt: fetchResult.FetchTime,
UpdatedAt: time.Now(),
}
result.document = doc
// Extract and queue new links (limited to same domain for now)
docDomain := crawler.ExtractDomain(url)
for _, link := range extracted.Links {
linkDomain := crawler.ExtractDomain(link)
if linkDomain == docDomain {
normalized := crawler.NormalizeURL(link)
if _, loaded := visited.LoadOrStore(normalized, true); !loaded {
select {
case urlQueue <- link:
default:
// Queue full, skip
}
}
}
}
return result
}
func calculateQualityScore(extracted *extractor.ExtractedContent, tags tagger.TagResult) float64 {
score := 0.5 // base
// Content length bonus
if extracted.ContentLength > 1000 {
score += 0.1
}
if extracted.ContentLength > 5000 {
score += 0.1
}
// Has headings
if len(extracted.Headings) > 0 {
score += 0.1
}
// Low ad density
if extracted.Features.AdDensity < 0.1 {
score += 0.1
}
// Good text/HTML ratio
if extracted.Features.TextToHTMLRatio > 0.2 {
score += 0.1
}
// Clamp
if score > 1 {
score = 1
}
return score
}