Initial commit: breakpilot-core - Shared Infrastructure

Docker Compose with 24+ services:
- PostgreSQL (PostGIS), Valkey, MinIO, Qdrant
- Vault (PKI/TLS), Nginx (Reverse Proxy)
- Backend Core API, Consent Service, Billing Service
- RAG Service, Embedding Service
- Gitea, Woodpecker CI/CD
- Night Scheduler, Health Aggregator
- Jitsi (Web/XMPP/JVB/Jicofo), Mailpit

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Boenisch
2026-02-11 23:47:13 +01:00
commit ad111d5e69
244 changed files with 84288 additions and 0 deletions

16
rag-service/Dockerfile Normal file
View File

@@ -0,0 +1,16 @@
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8097
CMD ["python", "main.py"]

View File

@@ -0,0 +1,11 @@
from fastapi import APIRouter
from api.collections import router as collections_router
from api.documents import router as documents_router
from api.search import router as search_router
router = APIRouter()
router.include_router(collections_router, tags=["Collections"])
router.include_router(documents_router, tags=["Documents"])
router.include_router(search_router, tags=["Search"])

46
rag-service/api/auth.py Normal file
View File

@@ -0,0 +1,46 @@
"""Optional JWT authentication helper.
If JWT_SECRET is configured and an Authorization header is present, the token
is verified. If no header is present or JWT_SECRET is empty, the request is
allowed through (public access).
"""
import logging
from typing import Optional
from fastapi import HTTPException, Request
from jose import JWTError, jwt
from config import settings
logger = logging.getLogger("rag-service.auth")
def optional_jwt_auth(request: Request) -> Optional[dict]:
"""
Validate the JWT from the Authorization header if present.
Returns the decoded token payload, or None if no auth was provided.
Raises HTTPException 401 if a token IS provided but is invalid.
"""
auth_header: Optional[str] = request.headers.get("authorization")
if not auth_header:
return None
if not settings.JWT_SECRET:
# No secret configured -- skip validation
return None
# Expect "Bearer <token>"
parts = auth_header.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(status_code=401, detail="Invalid Authorization header format")
token = parts[1]
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
return payload
except JWTError as exc:
logger.warning("JWT verification failed: %s", exc)
raise HTTPException(status_code=401, detail="Invalid or expired token")

View File

@@ -0,0 +1,77 @@
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from api.auth import optional_jwt_auth
from qdrant_client_wrapper import qdrant_wrapper, ALL_DEFAULT_COLLECTIONS
logger = logging.getLogger("rag-service.api.collections")
router = APIRouter(prefix="/api/v1/collections")
# ---- Request / Response models --------------------------------------------
class CreateCollectionRequest(BaseModel):
name: str
vector_size: int = 1536
class CollectionInfoResponse(BaseModel):
name: str
vectors_count: Optional[int] = None
points_count: Optional[int] = None
status: Optional[str] = None
vector_size: Optional[int] = None
# ---- Endpoints ------------------------------------------------------------
@router.post("", status_code=201)
async def create_collection(body: CreateCollectionRequest, request: Request):
"""Create a new Qdrant collection."""
optional_jwt_auth(request)
try:
created = await qdrant_wrapper.create_collection(body.name, body.vector_size)
return {
"collection": body.name,
"vector_size": body.vector_size,
"created": created,
}
except Exception as exc:
logger.error("Failed to create collection '%s': %s", body.name, exc)
raise HTTPException(status_code=500, detail=str(exc))
@router.get("")
async def list_collections(request: Request):
"""List all Qdrant collections."""
optional_jwt_auth(request)
try:
result = qdrant_wrapper.client.get_collections()
names = [c.name for c in result.collections]
return {"collections": names, "count": len(names)}
except Exception as exc:
logger.error("Failed to list collections: %s", exc)
raise HTTPException(status_code=500, detail=str(exc))
@router.get("/defaults")
async def list_default_collections(request: Request):
"""Return the pre-configured default collections and their dimensions."""
optional_jwt_auth(request)
return {"defaults": ALL_DEFAULT_COLLECTIONS}
@router.get("/{collection_name}")
async def get_collection_info(collection_name: str, request: Request):
"""Get stats for a single collection."""
optional_jwt_auth(request)
try:
info = await qdrant_wrapper.get_collection_info(collection_name)
return info
except Exception as exc:
logger.error("Failed to get collection info for '%s': %s", collection_name, exc)
raise HTTPException(status_code=404, detail=f"Collection '{collection_name}' not found or error: {exc}")

View File

@@ -0,0 +1,246 @@
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile
from pydantic import BaseModel
from api.auth import optional_jwt_auth
from embedding_client import embedding_client
from minio_client_wrapper import minio_wrapper
from qdrant_client_wrapper import qdrant_wrapper
logger = logging.getLogger("rag-service.api.documents")
router = APIRouter(prefix="/api/v1/documents")
# ---- Request / Response models --------------------------------------------
class DocumentUploadResponse(BaseModel):
document_id: str
object_name: str
chunks_count: int
vectors_indexed: int
collection: str
class DocumentDeleteRequest(BaseModel):
object_name: str
collection: str
# ---- Endpoints ------------------------------------------------------------
@router.post("/upload", response_model=DocumentUploadResponse)
async def upload_document(
request: Request,
file: UploadFile = File(...),
collection: str = Form(default="bp_eh"),
data_type: str = Form(default="eh"),
bundesland: str = Form(default="niedersachsen"),
use_case: str = Form(default="general"),
year: str = Form(default="2024"),
chunk_strategy: str = Form(default="recursive"),
chunk_size: int = Form(default=512),
chunk_overlap: int = Form(default=50),
metadata_json: Optional[str] = Form(default=None),
):
"""
Upload a document:
1. Store original file in MinIO
2. Extract text (if PDF) via embedding-service
3. Chunk the text via embedding-service
4. Generate embeddings for each chunk
5. Index chunks + embeddings in Qdrant
"""
optional_jwt_auth(request)
document_id = str(uuid.uuid4())
# --- Read file bytes ---
try:
file_bytes = await file.read()
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Could not read uploaded file: {exc}")
if len(file_bytes) == 0:
raise HTTPException(status_code=400, detail="Uploaded file is empty")
filename = file.filename or f"{document_id}.bin"
content_type = file.content_type or "application/octet-stream"
# --- Store in MinIO ---
object_name = minio_wrapper.get_minio_path(
data_type=data_type,
bundesland=bundesland,
use_case=use_case,
year=year,
filename=filename,
)
try:
minio_meta = {
"document_id": document_id,
"original_filename": filename,
}
await minio_wrapper.upload_document(
object_name=object_name,
data=file_bytes,
content_type=content_type,
metadata=minio_meta,
)
except Exception as exc:
logger.error("MinIO upload failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Failed to store file in MinIO: {exc}")
# --- Extract text ---
try:
if content_type == "application/pdf" or filename.lower().endswith(".pdf"):
text = await embedding_client.extract_pdf(file_bytes)
else:
# Try to decode as text
text = file_bytes.decode("utf-8", errors="replace")
except Exception as exc:
logger.error("Text extraction failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Text extraction failed: {exc}")
if not text or not text.strip():
raise HTTPException(status_code=400, detail="Could not extract any text from the document")
# --- Chunk ---
try:
chunks = await embedding_client.chunk_text(
text=text,
strategy=chunk_strategy,
chunk_size=chunk_size,
overlap=chunk_overlap,
)
except Exception as exc:
logger.error("Chunking failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Chunking failed: {exc}")
if not chunks:
raise HTTPException(status_code=400, detail="Chunking produced zero chunks")
# --- Embed ---
try:
embeddings = await embedding_client.generate_embeddings(chunks)
except Exception as exc:
logger.error("Embedding generation failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Embedding generation failed: {exc}")
# --- Parse extra metadata ---
extra_metadata: dict = {}
if metadata_json:
import json
try:
extra_metadata = json.loads(metadata_json)
except json.JSONDecodeError:
logger.warning("Invalid metadata_json, ignoring")
# --- Build payloads ---
payloads = []
for i, chunk in enumerate(chunks):
payload = {
"document_id": document_id,
"object_name": object_name,
"filename": filename,
"chunk_index": i,
"chunk_text": chunk,
"data_type": data_type,
"bundesland": bundesland,
"use_case": use_case,
"year": year,
**extra_metadata,
}
payloads.append(payload)
# --- Index in Qdrant ---
try:
indexed = await qdrant_wrapper.index_documents(
collection=collection,
vectors=embeddings,
payloads=payloads,
)
except Exception as exc:
logger.error("Qdrant indexing failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Qdrant indexing failed: {exc}")
return DocumentUploadResponse(
document_id=document_id,
object_name=object_name,
chunks_count=len(chunks),
vectors_indexed=indexed,
collection=collection,
)
@router.delete("")
async def delete_document(body: DocumentDeleteRequest, request: Request):
"""Delete a document from both MinIO and Qdrant."""
optional_jwt_auth(request)
errors: list[str] = []
# Delete from MinIO
try:
await minio_wrapper.delete_document(body.object_name)
except Exception as exc:
errors.append(f"MinIO delete failed: {exc}")
# Delete vectors from Qdrant
try:
await qdrant_wrapper.delete_by_filter(
collection=body.collection,
filter_conditions={"object_name": body.object_name},
)
except Exception as exc:
errors.append(f"Qdrant delete failed: {exc}")
if errors:
return {"deleted": False, "errors": errors}
return {"deleted": True, "object_name": body.object_name, "collection": body.collection}
@router.get("/list")
async def list_documents(
request: Request,
prefix: Optional[str] = None,
):
"""List documents stored in MinIO."""
optional_jwt_auth(request)
try:
docs = await minio_wrapper.list_documents(prefix=prefix)
return {"documents": docs, "count": len(docs)}
except Exception as exc:
logger.error("Failed to list documents: %s", exc)
raise HTTPException(status_code=500, detail=str(exc))
@router.get("/download/{object_name:path}")
async def download_document(object_name: str, request: Request):
"""Get a presigned download URL for a document."""
optional_jwt_auth(request)
try:
url = await minio_wrapper.get_presigned_url(object_name)
return {"url": url, "object_name": object_name}
except Exception as exc:
logger.error("Failed to generate presigned URL for '%s': %s", object_name, exc)
raise HTTPException(status_code=404, detail=f"Document not found: {exc}")
@router.get("/stats")
async def storage_stats(
request: Request,
prefix: Optional[str] = None,
):
"""Get storage stats (size, count) for a given prefix."""
optional_jwt_auth(request)
try:
stats = await minio_wrapper.get_storage_stats(prefix=prefix)
return stats
except Exception as exc:
logger.error("Failed to get storage stats: %s", exc)
raise HTTPException(status_code=500, detail=str(exc))

200
rag-service/api/search.py Normal file
View File

@@ -0,0 +1,200 @@
import logging
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field
from api.auth import optional_jwt_auth
from embedding_client import embedding_client
from qdrant_client_wrapper import qdrant_wrapper
logger = logging.getLogger("rag-service.api.search")
router = APIRouter(prefix="/api/v1")
# ---- Request / Response models --------------------------------------------
class SemanticSearchRequest(BaseModel):
query: str
collection: str = "bp_eh"
limit: int = Field(default=10, ge=1, le=100)
filters: Optional[dict[str, Any]] = None
score_threshold: Optional[float] = None
class HybridSearchRequest(BaseModel):
query: str
collection: str = "bp_eh"
limit: int = Field(default=10, ge=1, le=100)
filters: Optional[dict[str, Any]] = None
score_threshold: Optional[float] = None
keyword_boost: float = Field(default=0.3, ge=0.0, le=1.0)
rerank: bool = True
rerank_top_k: int = Field(default=10, ge=1, le=50)
class RerankRequest(BaseModel):
query: str
documents: list[str]
top_k: int = Field(default=10, ge=1, le=100)
class SearchResult(BaseModel):
id: str
score: float
payload: dict[str, Any] = {}
class SearchResponse(BaseModel):
results: list[SearchResult]
count: int
query: str
collection: str
# ---- Endpoints ------------------------------------------------------------
@router.post("/search", response_model=SearchResponse)
async def semantic_search(body: SemanticSearchRequest, request: Request):
"""
Pure semantic (vector) search.
Embeds the query, then searches Qdrant for nearest neighbours.
"""
optional_jwt_auth(request)
# Generate query embedding
try:
query_vector = await embedding_client.generate_single_embedding(body.query)
except Exception as exc:
logger.error("Failed to embed query: %s", exc)
raise HTTPException(status_code=502, detail=f"Embedding service error: {exc}")
# Search Qdrant
try:
results = await qdrant_wrapper.search(
collection=body.collection,
query_vector=query_vector,
limit=body.limit,
filters=body.filters,
score_threshold=body.score_threshold,
)
except Exception as exc:
logger.error("Qdrant search failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Vector search failed: {exc}")
return SearchResponse(
results=[SearchResult(**r) for r in results],
count=len(results),
query=body.query,
collection=body.collection,
)
@router.post("/search/hybrid", response_model=SearchResponse)
async def hybrid_search(body: HybridSearchRequest, request: Request):
"""
Hybrid search: vector search + keyword filtering + optional re-ranking.
1. Embed query and do vector search with a higher initial limit
2. Apply keyword matching on chunk_text to boost relevant results
3. Optionally re-rank the top results via the embedding service
"""
optional_jwt_auth(request)
# --- Step 1: Vector search (fetch more than needed for re-ranking) ---
fetch_limit = max(body.limit * 3, 30)
try:
query_vector = await embedding_client.generate_single_embedding(body.query)
except Exception as exc:
logger.error("Failed to embed query: %s", exc)
raise HTTPException(status_code=502, detail=f"Embedding service error: {exc}")
try:
vector_results = await qdrant_wrapper.search(
collection=body.collection,
query_vector=query_vector,
limit=fetch_limit,
filters=body.filters,
score_threshold=body.score_threshold,
)
except Exception as exc:
logger.error("Qdrant search failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Vector search failed: {exc}")
if not vector_results:
return SearchResponse(
results=[],
count=0,
query=body.query,
collection=body.collection,
)
# --- Step 2: Keyword boost ---
query_terms = body.query.lower().split()
for result in vector_results:
chunk_text = result.get("payload", {}).get("chunk_text", "").lower()
keyword_hits = sum(1 for term in query_terms if term in chunk_text)
keyword_score = (keyword_hits / max(len(query_terms), 1)) * body.keyword_boost
result["score"] = result["score"] + keyword_score
# Sort by boosted score
vector_results.sort(key=lambda x: x["score"], reverse=True)
# --- Step 3: Optional re-ranking ---
if body.rerank and len(vector_results) > 1:
try:
documents = [
r.get("payload", {}).get("chunk_text", "")
for r in vector_results[: body.rerank_top_k]
]
reranked = await embedding_client.rerank_documents(
query=body.query,
documents=documents,
top_k=body.limit,
)
# Rebuild results in re-ranked order
reranked_results = []
for item in reranked:
idx = item.get("index", 0)
if idx < len(vector_results):
entry = vector_results[idx].copy()
entry["score"] = item.get("score", entry["score"])
reranked_results.append(entry)
vector_results = reranked_results
except Exception as exc:
logger.warning("Re-ranking failed, falling back to vector+keyword scores: %s", exc)
# Trim to requested limit
final_results = vector_results[: body.limit]
return SearchResponse(
results=[SearchResult(**r) for r in final_results],
count=len(final_results),
query=body.query,
collection=body.collection,
)
@router.post("/rerank")
async def rerank(body: RerankRequest, request: Request):
"""
Standalone re-ranking endpoint.
Sends query + documents to the embedding service for re-ranking.
"""
optional_jwt_auth(request)
if not body.documents:
return {"results": [], "count": 0}
try:
results = await embedding_client.rerank_documents(
query=body.query,
documents=body.documents,
top_k=body.top_k,
)
return {"results": results, "count": len(results), "query": body.query}
except Exception as exc:
logger.error("Re-ranking failed: %s", exc)
raise HTTPException(status_code=502, detail=f"Re-ranking failed: {exc}")

29
rag-service/config.py Normal file
View File

@@ -0,0 +1,29 @@
import os
class Settings:
"""Environment-based configuration for rag-service."""
# Qdrant
QDRANT_URL: str = os.getenv("QDRANT_URL", "http://localhost:6333")
# MinIO
MINIO_ENDPOINT: str = os.getenv("MINIO_ENDPOINT", "localhost:9000")
MINIO_ACCESS_KEY: str = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
MINIO_SECRET_KEY: str = os.getenv("MINIO_SECRET_KEY", "minioadmin")
MINIO_BUCKET: str = os.getenv("MINIO_BUCKET", "breakpilot-documents")
MINIO_SECURE: bool = os.getenv("MINIO_SECURE", "false").lower() == "true"
# Embedding Service
EMBEDDING_SERVICE_URL: str = os.getenv(
"EMBEDDING_SERVICE_URL", "http://embedding-service:8087"
)
# Auth
JWT_SECRET: str = os.getenv("JWT_SECRET", "")
# Server
PORT: int = int(os.getenv("PORT", "8097"))
settings = Settings()

View File

@@ -0,0 +1,123 @@
import logging
from typing import Optional
import httpx
from config import settings
logger = logging.getLogger("rag-service.embedding")
_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
class EmbeddingClient:
"""HTTP client for the embedding-service (port 8087)."""
def __init__(self) -> None:
self._base_url: str = settings.EMBEDDING_SERVICE_URL.rstrip("/")
def _url(self, path: str) -> str:
return f"{self._base_url}{path}"
# ------------------------------------------------------------------
# Embeddings
# ------------------------------------------------------------------
async def generate_embeddings(self, texts: list[str]) -> list[list[float]]:
"""
Send a batch of texts to the embedding service and return a list of
embedding vectors.
"""
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
response = await client.post(
self._url("/api/v1/embeddings"),
json={"texts": texts},
)
response.raise_for_status()
data = response.json()
return data.get("embeddings", [])
async def generate_single_embedding(self, text: str) -> list[float]:
"""Convenience wrapper for a single text."""
results = await self.generate_embeddings([text])
if not results:
raise ValueError("Embedding service returned empty result")
return results[0]
# ------------------------------------------------------------------
# Reranking
# ------------------------------------------------------------------
async def rerank_documents(
self,
query: str,
documents: list[str],
top_k: int = 10,
) -> list[dict]:
"""
Ask the embedding service to re-rank documents for a given query.
Returns a list of {index, score, text}.
"""
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
response = await client.post(
self._url("/api/v1/rerank"),
json={
"query": query,
"documents": documents,
"top_k": top_k,
},
)
response.raise_for_status()
data = response.json()
return data.get("results", [])
# ------------------------------------------------------------------
# Chunking
# ------------------------------------------------------------------
async def chunk_text(
self,
text: str,
strategy: str = "recursive",
chunk_size: int = 512,
overlap: int = 50,
) -> list[str]:
"""
Ask the embedding service to chunk a long text.
Returns a list of chunk strings.
"""
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
response = await client.post(
self._url("/api/v1/chunk"),
json={
"text": text,
"strategy": strategy,
"chunk_size": chunk_size,
"overlap": overlap,
},
)
response.raise_for_status()
data = response.json()
return data.get("chunks", [])
# ------------------------------------------------------------------
# PDF extraction
# ------------------------------------------------------------------
async def extract_pdf(self, pdf_bytes: bytes) -> str:
"""
Send raw PDF bytes to the embedding service for text extraction.
Returns the extracted text as a string.
"""
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
response = await client.post(
self._url("/api/v1/extract-pdf"),
files={"file": ("document.pdf", pdf_bytes, "application/pdf")},
)
response.raise_for_status()
data = response.json()
return data.get("text", "")
# Singleton
embedding_client = EmbeddingClient()

101
rag-service/main.py Normal file
View File

@@ -0,0 +1,101 @@
import logging
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from qdrant_client_wrapper import qdrant_wrapper
from minio_client_wrapper import minio_wrapper
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger("rag-service")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup: initialise Qdrant collections and MinIO bucket."""
logger.info("RAG-Service starting up ...")
try:
await qdrant_wrapper.init_collections()
logger.info("Qdrant collections ready")
except Exception as exc:
logger.error("Qdrant init failed (will retry on first request): %s", exc)
try:
await minio_wrapper.init_bucket()
logger.info("MinIO bucket ready")
except Exception as exc:
logger.error("MinIO init failed (will retry on first request): %s", exc)
yield
logger.info("RAG-Service shutting down ...")
app = FastAPI(
title="BreakPilot RAG Service",
description="Wraps Qdrant vector search and MinIO document storage for the BreakPilot platform.",
version="1.0.0",
lifespan=lifespan,
)
# ---- CORS -----------------------------------------------------------------
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---- Routers --------------------------------------------------------------
from api import router as api_router # noqa: E402
app.include_router(api_router)
# ---- Health ---------------------------------------------------------------
@app.get("/health")
async def health():
"""Basic liveness probe."""
qdrant_ok = False
minio_ok = False
try:
qdrant_wrapper.client.get_collections()
qdrant_ok = True
except Exception:
pass
try:
minio_wrapper.client.bucket_exists(settings.MINIO_BUCKET)
minio_ok = True
except Exception:
pass
status = "healthy" if (qdrant_ok and minio_ok) else "degraded"
return {
"status": status,
"service": "rag-service",
"version": "1.0.0",
"dependencies": {
"qdrant": "ok" if qdrant_ok else "unavailable",
"minio": "ok" if minio_ok else "unavailable",
},
}
# ---- Main -----------------------------------------------------------------
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=settings.PORT,
reload=False,
log_level="info",
)

View File

@@ -0,0 +1,191 @@
import io
import logging
from datetime import timedelta
from typing import Any, Optional
from minio import Minio
from minio.error import S3Error
from config import settings
logger = logging.getLogger("rag-service.minio")
class MinioClientWrapper:
"""Thin wrapper around the Minio Python client for BreakPilot document storage."""
def __init__(self) -> None:
self._client: Optional[Minio] = None
@property
def client(self) -> Minio:
if self._client is None:
self._client = Minio(
endpoint=settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
secure=settings.MINIO_SECURE,
)
logger.info("Connected to MinIO at %s", settings.MINIO_ENDPOINT)
return self._client
# ------------------------------------------------------------------
# Bucket init
# ------------------------------------------------------------------
async def init_bucket(self) -> None:
"""Create the configured bucket if it does not exist."""
bucket = settings.MINIO_BUCKET
try:
if not self.client.bucket_exists(bucket):
self.client.make_bucket(bucket)
logger.info("Created MinIO bucket '%s'", bucket)
else:
logger.debug("MinIO bucket '%s' already exists", bucket)
except S3Error as exc:
logger.error("Failed to init bucket '%s': %s", bucket, exc)
raise
# ------------------------------------------------------------------
# Upload / Download
# ------------------------------------------------------------------
async def upload_document(
self,
object_name: str,
data: bytes,
content_type: str = "application/octet-stream",
metadata: Optional[dict[str, str]] = None,
) -> dict[str, Any]:
"""Upload bytes to MinIO and return storage info."""
stream = io.BytesIO(data)
result = self.client.put_object(
bucket_name=settings.MINIO_BUCKET,
object_name=object_name,
data=stream,
length=len(data),
content_type=content_type,
metadata=metadata,
)
logger.info("Uploaded '%s' (%d bytes)", object_name, len(data))
return {
"object_name": object_name,
"bucket": settings.MINIO_BUCKET,
"etag": result.etag,
"size": len(data),
}
async def download_document(self, object_name: str) -> bytes:
"""Download a document from MinIO and return raw bytes."""
try:
response = self.client.get_object(settings.MINIO_BUCKET, object_name)
data = response.read()
response.close()
response.release_conn()
logger.debug("Downloaded '%s' (%d bytes)", object_name, len(data))
return data
except S3Error as exc:
logger.error("Failed to download '%s': %s", object_name, exc)
raise
# ------------------------------------------------------------------
# List / Delete
# ------------------------------------------------------------------
async def list_documents(
self, prefix: Optional[str] = None
) -> list[dict[str, Any]]:
"""List objects under the given prefix."""
objects = self.client.list_objects(
settings.MINIO_BUCKET, prefix=prefix, recursive=True
)
results = []
for obj in objects:
results.append(
{
"object_name": obj.object_name,
"size": obj.size,
"last_modified": obj.last_modified.isoformat() if obj.last_modified else None,
"etag": obj.etag,
"content_type": obj.content_type,
}
)
return results
async def delete_document(self, object_name: str) -> bool:
"""Remove a single object."""
try:
self.client.remove_object(settings.MINIO_BUCKET, object_name)
logger.info("Deleted '%s' from bucket '%s'", object_name, settings.MINIO_BUCKET)
return True
except S3Error as exc:
logger.error("Failed to delete '%s': %s", object_name, exc)
raise
# ------------------------------------------------------------------
# Presigned URL
# ------------------------------------------------------------------
async def get_presigned_url(
self, object_name: str, expires_hours: int = 1
) -> str:
"""Generate a temporary presigned download URL."""
url = self.client.presigned_get_object(
settings.MINIO_BUCKET,
object_name,
expires=timedelta(hours=expires_hours),
)
return url
# ------------------------------------------------------------------
# Storage stats
# ------------------------------------------------------------------
async def get_storage_stats(
self, prefix: Optional[str] = None
) -> dict[str, Any]:
"""Calculate total size and file count under prefix."""
objects = self.client.list_objects(
settings.MINIO_BUCKET, prefix=prefix, recursive=True
)
total_size = 0
count = 0
for obj in objects:
total_size += obj.size or 0
count += 1
return {
"prefix": prefix,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"file_count": count,
}
# ------------------------------------------------------------------
# Structured path helper
# ------------------------------------------------------------------
@staticmethod
def get_minio_path(
data_type: str,
bundesland: str,
use_case: str,
year: str,
filename: str,
) -> str:
"""
Build a structured object path.
Example: eh/niedersachsen/mathematik/2024/aufgabe_01.pdf
"""
parts = [
data_type.strip("/"),
bundesland.lower().strip("/"),
use_case.lower().strip("/"),
str(year).strip("/"),
filename.strip("/"),
]
return "/".join(parts)
# Singleton
minio_wrapper = MinioClientWrapper()

View File

@@ -0,0 +1,245 @@
import logging
import uuid
from typing import Any, Optional
from qdrant_client import QdrantClient
from qdrant_client.http import models as qmodels
from qdrant_client.http.exceptions import UnexpectedResponse
from config import settings
logger = logging.getLogger("rag-service.qdrant")
# ------------------------------------------------------------------
# Default collections with their vector dimensions
# ------------------------------------------------------------------
# Lehrer / EH collections (OpenAI-style 1536-dim embeddings)
_LEHRER_COLLECTIONS = {
"bp_eh": 1536,
"bp_nibis_eh": 1536,
"bp_nibis": 1536,
"bp_vocab": 1536,
}
# Compliance / Legal collections (1024-dim embeddings, e.g. from a smaller model)
_COMPLIANCE_COLLECTIONS = {
"bp_legal_templates": 1024,
"bp_compliance_gdpr": 1024,
"bp_compliance_schulrecht": 1024,
"bp_compliance_datenschutz": 1024,
"bp_dsfa_templates": 1024,
"bp_dsfa_risks": 1024,
}
ALL_DEFAULT_COLLECTIONS: dict[str, int] = {
**_LEHRER_COLLECTIONS,
**_COMPLIANCE_COLLECTIONS,
}
class QdrantClientWrapper:
"""Thin wrapper around QdrantClient with BreakPilot-specific helpers."""
def __init__(self) -> None:
self._client: Optional[QdrantClient] = None
@property
def client(self) -> QdrantClient:
if self._client is None:
self._client = QdrantClient(url=settings.QDRANT_URL, timeout=30)
logger.info("Connected to Qdrant at %s", settings.QDRANT_URL)
return self._client
# ------------------------------------------------------------------
# Initialisation
# ------------------------------------------------------------------
async def init_collections(self) -> None:
"""Create all default collections if they do not already exist."""
for name, dim in ALL_DEFAULT_COLLECTIONS.items():
await self.create_collection(name, dim)
logger.info(
"All default collections initialised (%d total)",
len(ALL_DEFAULT_COLLECTIONS),
)
async def create_collection(
self,
name: str,
vector_size: int,
distance: qmodels.Distance = qmodels.Distance.COSINE,
) -> bool:
"""Create a single collection. Returns True if newly created."""
try:
self.client.get_collection(name)
logger.debug("Collection '%s' already exists skipping", name)
return False
except (UnexpectedResponse, Exception):
pass
try:
self.client.create_collection(
collection_name=name,
vectors_config=qmodels.VectorParams(
size=vector_size,
distance=distance,
),
optimizers_config=qmodels.OptimizersConfigDiff(
indexing_threshold=20_000,
),
)
logger.info(
"Created collection '%s' (dims=%d, distance=%s)",
name,
vector_size,
distance.value,
)
return True
except Exception as exc:
logger.error("Failed to create collection '%s': %s", name, exc)
raise
# ------------------------------------------------------------------
# Indexing
# ------------------------------------------------------------------
async def index_documents(
self,
collection: str,
vectors: list[list[float]],
payloads: list[dict[str, Any]],
ids: Optional[list[str]] = None,
) -> int:
"""Batch-upsert vectors + payloads. Returns number of points upserted."""
if len(vectors) != len(payloads):
raise ValueError(
f"vectors ({len(vectors)}) and payloads ({len(payloads)}) must have equal length"
)
if ids is None:
ids = [str(uuid.uuid4()) for _ in vectors]
points = [
qmodels.PointStruct(id=pid, vector=vec, payload=pay)
for pid, vec, pay in zip(ids, vectors, payloads)
]
batch_size = 100
total_upserted = 0
for i in range(0, len(points), batch_size):
batch = points[i : i + batch_size]
self.client.upsert(collection_name=collection, points=batch, wait=True)
total_upserted += len(batch)
logger.info(
"Upserted %d points into '%s'", total_upserted, collection
)
return total_upserted
# ------------------------------------------------------------------
# Search
# ------------------------------------------------------------------
async def search(
self,
collection: str,
query_vector: list[float],
limit: int = 10,
filters: Optional[dict[str, Any]] = None,
score_threshold: Optional[float] = None,
) -> list[dict[str, Any]]:
"""Semantic search. Returns list of {id, score, payload}."""
qdrant_filter = None
if filters:
must_conditions = []
for key, value in filters.items():
if isinstance(value, list):
must_conditions.append(
qmodels.FieldCondition(
key=key, match=qmodels.MatchAny(any=value)
)
)
else:
must_conditions.append(
qmodels.FieldCondition(
key=key, match=qmodels.MatchValue(value=value)
)
)
qdrant_filter = qmodels.Filter(must=must_conditions)
results = self.client.search(
collection_name=collection,
query_vector=query_vector,
limit=limit,
query_filter=qdrant_filter,
score_threshold=score_threshold,
)
return [
{
"id": str(hit.id),
"score": hit.score,
"payload": hit.payload or {},
}
for hit in results
]
# ------------------------------------------------------------------
# Delete
# ------------------------------------------------------------------
async def delete_by_filter(
self, collection: str, filter_conditions: dict[str, Any]
) -> bool:
"""Delete all points matching the given filter dict."""
must_conditions = []
for key, value in filter_conditions.items():
if isinstance(value, list):
must_conditions.append(
qmodels.FieldCondition(
key=key, match=qmodels.MatchAny(any=value)
)
)
else:
must_conditions.append(
qmodels.FieldCondition(
key=key, match=qmodels.MatchValue(value=value)
)
)
self.client.delete(
collection_name=collection,
points_selector=qmodels.FilterSelector(
filter=qmodels.Filter(must=must_conditions)
),
wait=True,
)
logger.info("Deleted points from '%s' with filter %s", collection, filter_conditions)
return True
# ------------------------------------------------------------------
# Info
# ------------------------------------------------------------------
async def get_collection_info(self, collection: str) -> dict[str, Any]:
"""Return basic stats for a collection."""
try:
info = self.client.get_collection(collection)
return {
"name": collection,
"vectors_count": info.vectors_count,
"points_count": info.points_count,
"status": info.status.value if info.status else "unknown",
"vector_size": (
info.config.params.vectors.size
if hasattr(info.config.params.vectors, "size")
else None
),
}
except Exception as exc:
logger.error("Failed to get info for '%s': %s", collection, exc)
raise
# Singleton
qdrant_wrapper = QdrantClientWrapper()

View File

@@ -0,0 +1,8 @@
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
qdrant-client>=1.7.0
minio>=7.2.0
httpx>=0.25.0
pydantic>=2.5.0
python-multipart>=0.0.6
python-jose[cryptography]>=3.3.0