""" Embedding Service Client HTTP client for communicating with the embedding-service. Replaces direct sentence-transformers/torch calls in the main service. """ import os import logging from typing import List, Tuple, Optional import httpx logger = logging.getLogger(__name__) # Configuration EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://embedding-service:8087") EMBEDDING_SERVICE_TIMEOUT = float(os.getenv("EMBEDDING_SERVICE_TIMEOUT", "60.0")) class EmbeddingServiceError(Exception): """Error communicating with embedding service.""" pass class EmbeddingClient: """ Client for the embedding-service. Provides async methods for: - Embedding generation - Re-ranking - PDF extraction - Text chunking """ def __init__(self, base_url: str = EMBEDDING_SERVICE_URL, timeout: float = EMBEDDING_SERVICE_TIMEOUT): self.base_url = base_url.rstrip("/") self.timeout = timeout async def health_check(self) -> dict: """Check if the embedding service is healthy.""" try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{self.base_url}/health") response.raise_for_status() return response.json() except Exception as e: raise EmbeddingServiceError(f"Health check failed: {e}") async def generate_embeddings(self, texts: List[str]) -> List[List[float]]: """ Generate embeddings for multiple texts. Args: texts: List of texts to embed Returns: List of embedding vectors """ if not texts: return [] try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/embed", json={"texts": texts} ) response.raise_for_status() data = response.json() return data["embeddings"] except httpx.TimeoutException: raise EmbeddingServiceError("Embedding service timeout") except httpx.HTTPStatusError as e: raise EmbeddingServiceError(f"Embedding service error: {e.response.status_code} - {e.response.text}") except Exception as e: raise EmbeddingServiceError(f"Failed to generate embeddings: {e}") async def generate_single_embedding(self, text: str) -> List[float]: """ Generate embedding for a single text. Args: text: Text to embed Returns: Embedding vector """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/embed-single", json={"text": text} ) response.raise_for_status() data = response.json() return data["embedding"] except httpx.TimeoutException: raise EmbeddingServiceError("Embedding service timeout") except httpx.HTTPStatusError as e: raise EmbeddingServiceError(f"Embedding service error: {e.response.status_code} - {e.response.text}") except Exception as e: raise EmbeddingServiceError(f"Failed to generate embedding: {e}") async def rerank_documents( self, query: str, documents: List[str], top_k: int = 5 ) -> List[Tuple[int, float, str]]: """ Re-rank documents based on query relevance. Args: query: Search query documents: List of document texts top_k: Number of top results to return Returns: List of (original_index, score, text) tuples, sorted by score descending """ if not documents: return [] try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/rerank", json={ "query": query, "documents": documents, "top_k": top_k } ) response.raise_for_status() data = response.json() return [ (r["index"], r["score"], r["text"]) for r in data["results"] ] except httpx.TimeoutException: raise EmbeddingServiceError("Embedding service timeout during re-ranking") except httpx.HTTPStatusError as e: raise EmbeddingServiceError(f"Re-ranking error: {e.response.status_code} - {e.response.text}") except Exception as e: raise EmbeddingServiceError(f"Failed to re-rank documents: {e}") async def rerank_search_results( self, query: str, results: List[dict], text_field: str = "text", top_k: int = 5 ) -> List[dict]: """ Re-rank search results (dictionaries with text field). Args: query: Search query results: List of search result dicts text_field: Key containing the text to rank on top_k: Number of top results Returns: Re-ranked results with added 'rerank_score' field """ if not results: return [] texts = [r.get(text_field, "") for r in results] reranked = await self.rerank_documents(query, texts, top_k) reranked_results = [] for orig_idx, score, _ in reranked: result = results[orig_idx].copy() result["rerank_score"] = score result["original_rank"] = orig_idx reranked_results.append(result) return reranked_results async def chunk_text( self, text: str, chunk_size: int = 1000, overlap: int = 200, strategy: str = "semantic" ) -> List[str]: """ Chunk text into smaller pieces. Args: text: Text to chunk chunk_size: Target chunk size overlap: Overlap between chunks strategy: "semantic" or "recursive" Returns: List of text chunks """ if not text: return [] try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/chunk", json={ "text": text, "chunk_size": chunk_size, "overlap": overlap, "strategy": strategy } ) response.raise_for_status() data = response.json() return data["chunks"] except Exception as e: raise EmbeddingServiceError(f"Failed to chunk text: {e}") async def extract_pdf(self, pdf_content: bytes) -> dict: """ Extract text from PDF file. Args: pdf_content: PDF file content as bytes Returns: Dict with 'text', 'backend_used', 'pages', 'table_count' """ try: async with httpx.AsyncClient(timeout=120.0) as client: # Longer timeout for PDFs response = await client.post( f"{self.base_url}/extract-pdf", content=pdf_content, headers={"Content-Type": "application/octet-stream"} ) response.raise_for_status() return response.json() except Exception as e: raise EmbeddingServiceError(f"Failed to extract PDF: {e}") async def get_model_info(self) -> dict: """Get information about configured models.""" try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{self.base_url}/models") response.raise_for_status() return response.json() except Exception as e: raise EmbeddingServiceError(f"Failed to get model info: {e}") # Global client instance (lazy initialization) _client: Optional[EmbeddingClient] = None def get_embedding_client() -> EmbeddingClient: """Get the global embedding client instance.""" global _client if _client is None: _client = EmbeddingClient() return _client # ============================================================================= # Compatibility functions (drop-in replacements for eh_pipeline.py) # ============================================================================= async def generate_embeddings(texts: List[str]) -> List[List[float]]: """ Generate embeddings for texts (compatibility function). This is a drop-in replacement for eh_pipeline.generate_embeddings(). """ client = get_embedding_client() return await client.generate_embeddings(texts) async def generate_single_embedding(text: str) -> List[float]: """ Generate embedding for a single text (compatibility function). This is a drop-in replacement for eh_pipeline.generate_single_embedding(). """ client = get_embedding_client() return await client.generate_single_embedding(text) async def rerank_documents(query: str, documents: List[str], top_k: int = 5) -> List[Tuple[int, float, str]]: """ Re-rank documents (compatibility function). This is a drop-in replacement for reranker.rerank_documents(). """ client = get_embedding_client() return await client.rerank_documents(query, documents, top_k) async def rerank_search_results( query: str, results: List[dict], text_field: str = "text", top_k: int = 5 ) -> List[dict]: """ Re-rank search results (compatibility function). This is a drop-in replacement for reranker.rerank_search_results(). """ client = get_embedding_client() return await client.rerank_search_results(query, results, text_field, top_k)