A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
315 lines
10 KiB
Python
315 lines
10 KiB
Python
"""
|
|
Embedding Service Client
|
|
|
|
HTTP client for communicating with the embedding-service.
|
|
Replaces direct sentence-transformers/torch calls in the main service.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import List, Tuple, Optional
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://embedding-service:8087")
|
|
EMBEDDING_SERVICE_TIMEOUT = float(os.getenv("EMBEDDING_SERVICE_TIMEOUT", "60.0"))
|
|
|
|
|
|
class EmbeddingServiceError(Exception):
|
|
"""Error communicating with embedding service."""
|
|
pass
|
|
|
|
|
|
class EmbeddingClient:
|
|
"""
|
|
Client for the embedding-service.
|
|
|
|
Provides async methods for:
|
|
- Embedding generation
|
|
- Re-ranking
|
|
- PDF extraction
|
|
- Text chunking
|
|
"""
|
|
|
|
def __init__(self, base_url: str = EMBEDDING_SERVICE_URL, timeout: float = EMBEDDING_SERVICE_TIMEOUT):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
|
|
async def health_check(self) -> dict:
|
|
"""Check if the embedding service is healthy."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
response = await client.get(f"{self.base_url}/health")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
raise EmbeddingServiceError(f"Health check failed: {e}")
|
|
|
|
async def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
|
|
"""
|
|
Generate embeddings for multiple texts.
|
|
|
|
Args:
|
|
texts: List of texts to embed
|
|
|
|
Returns:
|
|
List of embedding vectors
|
|
"""
|
|
if not texts:
|
|
return []
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/embed",
|
|
json={"texts": texts}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["embeddings"]
|
|
except httpx.TimeoutException:
|
|
raise EmbeddingServiceError("Embedding service timeout")
|
|
except httpx.HTTPStatusError as e:
|
|
raise EmbeddingServiceError(f"Embedding service error: {e.response.status_code} - {e.response.text}")
|
|
except Exception as e:
|
|
raise EmbeddingServiceError(f"Failed to generate embeddings: {e}")
|
|
|
|
async def generate_single_embedding(self, text: str) -> List[float]:
|
|
"""
|
|
Generate embedding for a single text.
|
|
|
|
Args:
|
|
text: Text to embed
|
|
|
|
Returns:
|
|
Embedding vector
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/embed-single",
|
|
json={"text": text}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["embedding"]
|
|
except httpx.TimeoutException:
|
|
raise EmbeddingServiceError("Embedding service timeout")
|
|
except httpx.HTTPStatusError as e:
|
|
raise EmbeddingServiceError(f"Embedding service error: {e.response.status_code} - {e.response.text}")
|
|
except Exception as e:
|
|
raise EmbeddingServiceError(f"Failed to generate embedding: {e}")
|
|
|
|
async def rerank_documents(
|
|
self,
|
|
query: str,
|
|
documents: List[str],
|
|
top_k: int = 5
|
|
) -> List[Tuple[int, float, str]]:
|
|
"""
|
|
Re-rank documents based on query relevance.
|
|
|
|
Args:
|
|
query: Search query
|
|
documents: List of document texts
|
|
top_k: Number of top results to return
|
|
|
|
Returns:
|
|
List of (original_index, score, text) tuples, sorted by score descending
|
|
"""
|
|
if not documents:
|
|
return []
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/rerank",
|
|
json={
|
|
"query": query,
|
|
"documents": documents,
|
|
"top_k": top_k
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
return [
|
|
(r["index"], r["score"], r["text"])
|
|
for r in data["results"]
|
|
]
|
|
except httpx.TimeoutException:
|
|
raise EmbeddingServiceError("Embedding service timeout during re-ranking")
|
|
except httpx.HTTPStatusError as e:
|
|
raise EmbeddingServiceError(f"Re-ranking error: {e.response.status_code} - {e.response.text}")
|
|
except Exception as e:
|
|
raise EmbeddingServiceError(f"Failed to re-rank documents: {e}")
|
|
|
|
async def rerank_search_results(
|
|
self,
|
|
query: str,
|
|
results: List[dict],
|
|
text_field: str = "text",
|
|
top_k: int = 5
|
|
) -> List[dict]:
|
|
"""
|
|
Re-rank search results (dictionaries with text field).
|
|
|
|
Args:
|
|
query: Search query
|
|
results: List of search result dicts
|
|
text_field: Key containing the text to rank on
|
|
top_k: Number of top results
|
|
|
|
Returns:
|
|
Re-ranked results with added 'rerank_score' field
|
|
"""
|
|
if not results:
|
|
return []
|
|
|
|
texts = [r.get(text_field, "") for r in results]
|
|
reranked = await self.rerank_documents(query, texts, top_k)
|
|
|
|
reranked_results = []
|
|
for orig_idx, score, _ in reranked:
|
|
result = results[orig_idx].copy()
|
|
result["rerank_score"] = score
|
|
result["original_rank"] = orig_idx
|
|
reranked_results.append(result)
|
|
|
|
return reranked_results
|
|
|
|
async def chunk_text(
|
|
self,
|
|
text: str,
|
|
chunk_size: int = 1000,
|
|
overlap: int = 200,
|
|
strategy: str = "semantic"
|
|
) -> List[str]:
|
|
"""
|
|
Chunk text into smaller pieces.
|
|
|
|
Args:
|
|
text: Text to chunk
|
|
chunk_size: Target chunk size
|
|
overlap: Overlap between chunks
|
|
strategy: "semantic" or "recursive"
|
|
|
|
Returns:
|
|
List of text chunks
|
|
"""
|
|
if not text:
|
|
return []
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/chunk",
|
|
json={
|
|
"text": text,
|
|
"chunk_size": chunk_size,
|
|
"overlap": overlap,
|
|
"strategy": strategy
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["chunks"]
|
|
except Exception as e:
|
|
raise EmbeddingServiceError(f"Failed to chunk text: {e}")
|
|
|
|
async def extract_pdf(self, pdf_content: bytes) -> dict:
|
|
"""
|
|
Extract text from PDF file.
|
|
|
|
Args:
|
|
pdf_content: PDF file content as bytes
|
|
|
|
Returns:
|
|
Dict with 'text', 'backend_used', 'pages', 'table_count'
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client: # Longer timeout for PDFs
|
|
response = await client.post(
|
|
f"{self.base_url}/extract-pdf",
|
|
content=pdf_content,
|
|
headers={"Content-Type": "application/octet-stream"}
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
raise EmbeddingServiceError(f"Failed to extract PDF: {e}")
|
|
|
|
async def get_model_info(self) -> dict:
|
|
"""Get information about configured models."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
response = await client.get(f"{self.base_url}/models")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
raise EmbeddingServiceError(f"Failed to get model info: {e}")
|
|
|
|
|
|
# Global client instance (lazy initialization)
|
|
_client: Optional[EmbeddingClient] = None
|
|
|
|
|
|
def get_embedding_client() -> EmbeddingClient:
|
|
"""Get the global embedding client instance."""
|
|
global _client
|
|
if _client is None:
|
|
_client = EmbeddingClient()
|
|
return _client
|
|
|
|
|
|
# =============================================================================
|
|
# Compatibility functions (drop-in replacements for eh_pipeline.py)
|
|
# =============================================================================
|
|
|
|
async def generate_embeddings(texts: List[str]) -> List[List[float]]:
|
|
"""
|
|
Generate embeddings for texts (compatibility function).
|
|
|
|
This is a drop-in replacement for eh_pipeline.generate_embeddings().
|
|
"""
|
|
client = get_embedding_client()
|
|
return await client.generate_embeddings(texts)
|
|
|
|
|
|
async def generate_single_embedding(text: str) -> List[float]:
|
|
"""
|
|
Generate embedding for a single text (compatibility function).
|
|
|
|
This is a drop-in replacement for eh_pipeline.generate_single_embedding().
|
|
"""
|
|
client = get_embedding_client()
|
|
return await client.generate_single_embedding(text)
|
|
|
|
|
|
async def rerank_documents(query: str, documents: List[str], top_k: int = 5) -> List[Tuple[int, float, str]]:
|
|
"""
|
|
Re-rank documents (compatibility function).
|
|
|
|
This is a drop-in replacement for reranker.rerank_documents().
|
|
"""
|
|
client = get_embedding_client()
|
|
return await client.rerank_documents(query, documents, top_k)
|
|
|
|
|
|
async def rerank_search_results(
|
|
query: str,
|
|
results: List[dict],
|
|
text_field: str = "text",
|
|
top_k: int = 5
|
|
) -> List[dict]:
|
|
"""
|
|
Re-rank search results (compatibility function).
|
|
|
|
This is a drop-in replacement for reranker.rerank_search_results().
|
|
"""
|
|
client = get_embedding_client()
|
|
return await client.rerank_search_results(query, results, text_field, top_k)
|