A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
286 lines
9.2 KiB
Python
286 lines
9.2 KiB
Python
"""
|
|
Hybrid Search Module
|
|
|
|
Combines dense (semantic) search with sparse (BM25/keyword) search
|
|
for better retrieval, especially for German compound words.
|
|
|
|
Why Hybrid Search?
|
|
- Dense search: Great for semantic similarity ("Analyse" ≈ "Untersuchung")
|
|
- Sparse search: Great for exact matches, compound words ("Erwartungshorizont")
|
|
- Combined: Best of both worlds, 10-15% better recall
|
|
|
|
German compound nouns like "Erwartungshorizont", "Bewertungskriterien"
|
|
may not match semantically but should match lexically.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
from typing import List, Dict, Optional, Tuple
|
|
from collections import Counter
|
|
import math
|
|
|
|
# Configuration
|
|
HYBRID_ENABLED = os.getenv("HYBRID_SEARCH_ENABLED", "true").lower() == "true"
|
|
DENSE_WEIGHT = float(os.getenv("HYBRID_DENSE_WEIGHT", "0.7")) # 70% dense
|
|
SPARSE_WEIGHT = float(os.getenv("HYBRID_SPARSE_WEIGHT", "0.3")) # 30% sparse
|
|
|
|
# German stopwords for BM25
|
|
GERMAN_STOPWORDS = {
|
|
'der', 'die', 'das', 'den', 'dem', 'des', 'ein', 'eine', 'einer', 'eines',
|
|
'einem', 'einen', 'und', 'oder', 'aber', 'als', 'auch', 'auf', 'aus', 'bei',
|
|
'bis', 'durch', 'für', 'gegen', 'in', 'mit', 'nach', 'ohne', 'über', 'unter',
|
|
'von', 'vor', 'zu', 'zum', 'zur', 'ist', 'sind', 'war', 'waren', 'wird',
|
|
'werden', 'hat', 'haben', 'kann', 'können', 'muss', 'müssen', 'soll', 'sollen',
|
|
'nicht', 'sich', 'es', 'er', 'sie', 'wir', 'ihr', 'man', 'was', 'wie', 'wo',
|
|
'wenn', 'weil', 'dass', 'ob', 'so', 'sehr', 'nur', 'noch', 'schon', 'mehr',
|
|
'also', 'dabei', 'dabei', 'sowie', 'bzw', 'etc', 'ca', 'vgl'
|
|
}
|
|
|
|
|
|
class BM25:
|
|
"""
|
|
BM25 (Best Matching 25) implementation for German text.
|
|
|
|
BM25 is a ranking function used for keyword-based retrieval.
|
|
It considers term frequency, document length, and inverse document frequency.
|
|
"""
|
|
|
|
def __init__(self, k1: float = 1.5, b: float = 0.75):
|
|
"""
|
|
Initialize BM25 with tuning parameters.
|
|
|
|
Args:
|
|
k1: Term frequency saturation parameter (1.2-2.0 typical)
|
|
b: Document length normalization (0.75 typical)
|
|
"""
|
|
self.k1 = k1
|
|
self.b = b
|
|
self.corpus = []
|
|
self.doc_lengths = []
|
|
self.avg_doc_length = 0
|
|
self.doc_freqs = Counter()
|
|
self.idf = {}
|
|
self.N = 0
|
|
|
|
def _tokenize(self, text: str) -> List[str]:
|
|
"""Tokenize German text."""
|
|
# Lowercase and split on non-word characters
|
|
text = text.lower()
|
|
# Keep German umlauts
|
|
tokens = re.findall(r'[a-zäöüß]+', text)
|
|
# Remove stopwords and short tokens
|
|
tokens = [t for t in tokens if t not in GERMAN_STOPWORDS and len(t) > 2]
|
|
return tokens
|
|
|
|
def fit(self, documents: List[str]):
|
|
"""
|
|
Fit BM25 on a corpus of documents.
|
|
|
|
Args:
|
|
documents: List of document texts
|
|
"""
|
|
self.corpus = [self._tokenize(doc) for doc in documents]
|
|
self.N = len(self.corpus)
|
|
self.doc_lengths = [len(doc) for doc in self.corpus]
|
|
self.avg_doc_length = sum(self.doc_lengths) / max(self.N, 1)
|
|
|
|
# Calculate document frequencies
|
|
self.doc_freqs = Counter()
|
|
for doc in self.corpus:
|
|
unique_terms = set(doc)
|
|
for term in unique_terms:
|
|
self.doc_freqs[term] += 1
|
|
|
|
# Calculate IDF
|
|
self.idf = {}
|
|
for term, df in self.doc_freqs.items():
|
|
# IDF with smoothing
|
|
self.idf[term] = math.log((self.N - df + 0.5) / (df + 0.5) + 1)
|
|
|
|
def score(self, query: str, doc_idx: int) -> float:
|
|
"""
|
|
Calculate BM25 score for a query against a document.
|
|
|
|
Args:
|
|
query: Query text
|
|
doc_idx: Index of document in corpus
|
|
|
|
Returns:
|
|
BM25 score (higher = more relevant)
|
|
"""
|
|
query_tokens = self._tokenize(query)
|
|
doc = self.corpus[doc_idx]
|
|
doc_len = self.doc_lengths[doc_idx]
|
|
|
|
score = 0.0
|
|
doc_term_freqs = Counter(doc)
|
|
|
|
for term in query_tokens:
|
|
if term not in self.idf:
|
|
continue
|
|
|
|
tf = doc_term_freqs.get(term, 0)
|
|
idf = self.idf[term]
|
|
|
|
# BM25 formula
|
|
numerator = tf * (self.k1 + 1)
|
|
denominator = tf + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_length)
|
|
score += idf * numerator / denominator
|
|
|
|
return score
|
|
|
|
def search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]:
|
|
"""
|
|
Search corpus for query.
|
|
|
|
Args:
|
|
query: Query text
|
|
top_k: Number of results to return
|
|
|
|
Returns:
|
|
List of (doc_idx, score) tuples, sorted by score descending
|
|
"""
|
|
scores = [(i, self.score(query, i)) for i in range(self.N)]
|
|
scores.sort(key=lambda x: x[1], reverse=True)
|
|
return scores[:top_k]
|
|
|
|
|
|
def normalize_scores(scores: List[float]) -> List[float]:
|
|
"""Normalize scores to 0-1 range using min-max normalization."""
|
|
if not scores:
|
|
return []
|
|
min_score = min(scores)
|
|
max_score = max(scores)
|
|
if max_score == min_score:
|
|
return [1.0] * len(scores)
|
|
return [(s - min_score) / (max_score - min_score) for s in scores]
|
|
|
|
|
|
def combine_scores(
|
|
dense_results: List[Dict],
|
|
sparse_scores: List[Tuple[int, float]],
|
|
documents: List[str],
|
|
dense_weight: float = DENSE_WEIGHT,
|
|
sparse_weight: float = SPARSE_WEIGHT,
|
|
) -> List[Dict]:
|
|
"""
|
|
Combine dense and sparse search results using Reciprocal Rank Fusion (RRF).
|
|
|
|
Args:
|
|
dense_results: Results from dense (vector) search with 'score' field
|
|
sparse_scores: BM25 scores as (idx, score) tuples
|
|
documents: Original documents (for mapping)
|
|
dense_weight: Weight for dense scores
|
|
sparse_weight: Weight for sparse scores
|
|
|
|
Returns:
|
|
Combined results with hybrid_score field
|
|
"""
|
|
# Create document ID to result mapping
|
|
result_map = {}
|
|
|
|
# Add dense results
|
|
for rank, result in enumerate(dense_results):
|
|
doc_id = result.get("id", str(rank))
|
|
if doc_id not in result_map:
|
|
result_map[doc_id] = result.copy()
|
|
result_map[doc_id]["dense_score"] = result.get("score", 0)
|
|
result_map[doc_id]["dense_rank"] = rank + 1
|
|
result_map[doc_id]["sparse_score"] = 0
|
|
result_map[doc_id]["sparse_rank"] = len(dense_results) + 1
|
|
|
|
# Add sparse scores
|
|
for rank, (doc_idx, score) in enumerate(sparse_scores):
|
|
# Try to match with dense results by text similarity
|
|
if doc_idx < len(dense_results):
|
|
doc_id = dense_results[doc_idx].get("id", str(doc_idx))
|
|
if doc_id in result_map:
|
|
result_map[doc_id]["sparse_score"] = score
|
|
result_map[doc_id]["sparse_rank"] = rank + 1
|
|
|
|
# Calculate hybrid scores using RRF
|
|
k = 60 # RRF constant
|
|
for doc_id, result in result_map.items():
|
|
dense_rrf = 1 / (k + result.get("dense_rank", 1000))
|
|
sparse_rrf = 1 / (k + result.get("sparse_rank", 1000))
|
|
result["hybrid_score"] = (dense_weight * dense_rrf + sparse_weight * sparse_rrf)
|
|
|
|
# Sort by hybrid score
|
|
results = list(result_map.values())
|
|
results.sort(key=lambda x: x.get("hybrid_score", 0), reverse=True)
|
|
|
|
return results
|
|
|
|
|
|
async def hybrid_search(
|
|
query: str,
|
|
documents: List[str],
|
|
dense_search_func,
|
|
top_k: int = 10,
|
|
dense_weight: float = DENSE_WEIGHT,
|
|
sparse_weight: float = SPARSE_WEIGHT,
|
|
**dense_kwargs
|
|
) -> Dict:
|
|
"""
|
|
Perform hybrid search combining dense and sparse retrieval.
|
|
|
|
Args:
|
|
query: Search query
|
|
documents: List of document texts for BM25
|
|
dense_search_func: Async function for dense search
|
|
top_k: Number of results to return
|
|
dense_weight: Weight for dense (semantic) scores
|
|
sparse_weight: Weight for sparse (BM25) scores
|
|
**dense_kwargs: Additional args for dense search
|
|
|
|
Returns:
|
|
Combined results with metadata
|
|
"""
|
|
if not HYBRID_ENABLED:
|
|
# Fall back to dense-only search
|
|
results = await dense_search_func(query=query, limit=top_k, **dense_kwargs)
|
|
return {
|
|
"results": results,
|
|
"hybrid_enabled": False,
|
|
"dense_weight": 1.0,
|
|
"sparse_weight": 0.0,
|
|
}
|
|
|
|
# Perform dense search
|
|
dense_results = await dense_search_func(query=query, limit=top_k * 2, **dense_kwargs)
|
|
|
|
# Perform sparse (BM25) search
|
|
bm25 = BM25()
|
|
doc_texts = [r.get("text", "") for r in dense_results]
|
|
if doc_texts:
|
|
bm25.fit(doc_texts)
|
|
sparse_scores = bm25.search(query, top_k=top_k * 2)
|
|
else:
|
|
sparse_scores = []
|
|
|
|
# Combine results
|
|
combined = combine_scores(
|
|
dense_results=dense_results,
|
|
sparse_scores=sparse_scores,
|
|
documents=doc_texts,
|
|
dense_weight=dense_weight,
|
|
sparse_weight=sparse_weight,
|
|
)
|
|
|
|
return {
|
|
"results": combined[:top_k],
|
|
"hybrid_enabled": True,
|
|
"dense_weight": dense_weight,
|
|
"sparse_weight": sparse_weight,
|
|
}
|
|
|
|
|
|
def get_hybrid_search_info() -> dict:
|
|
"""Get information about hybrid search configuration."""
|
|
return {
|
|
"enabled": HYBRID_ENABLED,
|
|
"dense_weight": DENSE_WEIGHT,
|
|
"sparse_weight": SPARSE_WEIGHT,
|
|
"algorithm": "BM25 + Dense Vector (RRF fusion)",
|
|
}
|