Files
breakpilot-core/rag-service/api/search.py
Benjamin Boenisch ad111d5e69 Initial commit: breakpilot-core - Shared Infrastructure
Docker Compose with 24+ services:
- PostgreSQL (PostGIS), Valkey, MinIO, Qdrant
- Vault (PKI/TLS), Nginx (Reverse Proxy)
- Backend Core API, Consent Service, Billing Service
- RAG Service, Embedding Service
- Gitea, Woodpecker CI/CD
- Night Scheduler, Health Aggregator
- Jitsi (Web/XMPP/JVB/Jicofo), Mailpit

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:13 +01:00

201 lines
6.5 KiB
Python

import logging
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field
from api.auth import optional_jwt_auth
from embedding_client import embedding_client
from qdrant_client_wrapper import qdrant_wrapper
logger = logging.getLogger("rag-service.api.search")
router = APIRouter(prefix="/api/v1")
# ---- Request / Response models --------------------------------------------
class SemanticSearchRequest(BaseModel):
query: str
collection: str = "bp_eh"
limit: int = Field(default=10, ge=1, le=100)
filters: Optional[dict[str, Any]] = None
score_threshold: Optional[float] = None
class HybridSearchRequest(BaseModel):
query: str
collection: str = "bp_eh"
limit: int = Field(default=10, ge=1, le=100)
filters: Optional[dict[str, Any]] = None
score_threshold: Optional[float] = None
keyword_boost: float = Field(default=0.3, ge=0.0, le=1.0)
rerank: bool = True
rerank_top_k: int = Field(default=10, ge=1, le=50)
class RerankRequest(BaseModel):
query: str
documents: list[str]
top_k: int = Field(default=10, ge=1, le=100)
class SearchResult(BaseModel):
id: str
score: float
payload: dict[str, Any] = {}
class SearchResponse(BaseModel):
results: list[SearchResult]
count: int
query: str
collection: str
# ---- Endpoints ------------------------------------------------------------
@router.post("/search", response_model=SearchResponse)
async def semantic_search(body: SemanticSearchRequest, request: Request):
"""
Pure semantic (vector) search.
Embeds the query, then searches Qdrant for nearest neighbours.
"""
optional_jwt_auth(request)
# Generate query embedding
try:
query_vector = await embedding_client.generate_single_embedding(body.query)
except Exception as exc:
logger.error("Failed to embed query: %s", exc)
raise HTTPException(status_code=502, detail=f"Embedding service error: {exc}")
# Search Qdrant
try:
results = await qdrant_wrapper.search(
collection=body.collection,
query_vector=query_vector,
limit=body.limit,
filters=body.filters,
score_threshold=body.score_threshold,
)
except Exception as exc:
logger.error("Qdrant search failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Vector search failed: {exc}")
return SearchResponse(
results=[SearchResult(**r) for r in results],
count=len(results),
query=body.query,
collection=body.collection,
)
@router.post("/search/hybrid", response_model=SearchResponse)
async def hybrid_search(body: HybridSearchRequest, request: Request):
"""
Hybrid search: vector search + keyword filtering + optional re-ranking.
1. Embed query and do vector search with a higher initial limit
2. Apply keyword matching on chunk_text to boost relevant results
3. Optionally re-rank the top results via the embedding service
"""
optional_jwt_auth(request)
# --- Step 1: Vector search (fetch more than needed for re-ranking) ---
fetch_limit = max(body.limit * 3, 30)
try:
query_vector = await embedding_client.generate_single_embedding(body.query)
except Exception as exc:
logger.error("Failed to embed query: %s", exc)
raise HTTPException(status_code=502, detail=f"Embedding service error: {exc}")
try:
vector_results = await qdrant_wrapper.search(
collection=body.collection,
query_vector=query_vector,
limit=fetch_limit,
filters=body.filters,
score_threshold=body.score_threshold,
)
except Exception as exc:
logger.error("Qdrant search failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Vector search failed: {exc}")
if not vector_results:
return SearchResponse(
results=[],
count=0,
query=body.query,
collection=body.collection,
)
# --- Step 2: Keyword boost ---
query_terms = body.query.lower().split()
for result in vector_results:
chunk_text = result.get("payload", {}).get("chunk_text", "").lower()
keyword_hits = sum(1 for term in query_terms if term in chunk_text)
keyword_score = (keyword_hits / max(len(query_terms), 1)) * body.keyword_boost
result["score"] = result["score"] + keyword_score
# Sort by boosted score
vector_results.sort(key=lambda x: x["score"], reverse=True)
# --- Step 3: Optional re-ranking ---
if body.rerank and len(vector_results) > 1:
try:
documents = [
r.get("payload", {}).get("chunk_text", "")
for r in vector_results[: body.rerank_top_k]
]
reranked = await embedding_client.rerank_documents(
query=body.query,
documents=documents,
top_k=body.limit,
)
# Rebuild results in re-ranked order
reranked_results = []
for item in reranked:
idx = item.get("index", 0)
if idx < len(vector_results):
entry = vector_results[idx].copy()
entry["score"] = item.get("score", entry["score"])
reranked_results.append(entry)
vector_results = reranked_results
except Exception as exc:
logger.warning("Re-ranking failed, falling back to vector+keyword scores: %s", exc)
# Trim to requested limit
final_results = vector_results[: body.limit]
return SearchResponse(
results=[SearchResult(**r) for r in final_results],
count=len(final_results),
query=body.query,
collection=body.collection,
)
@router.post("/rerank")
async def rerank(body: RerankRequest, request: Request):
"""
Standalone re-ranking endpoint.
Sends query + documents to the embedding service for re-ranking.
"""
optional_jwt_auth(request)
if not body.documents:
return {"results": [], "count": 0}
try:
results = await embedding_client.rerank_documents(
query=body.query,
documents=body.documents,
top_k=body.top_k,
)
return {"results": results, "count": len(results), "query": body.query}
except Exception as exc:
logger.error("Re-ranking failed: %s", exc)
raise HTTPException(status_code=502, detail=f"Re-ranking failed: {exc}")