Files
breakpilot-core/rag-service/minio_client_wrapper.py
Benjamin Boenisch ad111d5e69 Initial commit: breakpilot-core - Shared Infrastructure
Docker Compose with 24+ services:
- PostgreSQL (PostGIS), Valkey, MinIO, Qdrant
- Vault (PKI/TLS), Nginx (Reverse Proxy)
- Backend Core API, Consent Service, Billing Service
- RAG Service, Embedding Service
- Gitea, Woodpecker CI/CD
- Night Scheduler, Health Aggregator
- Jitsi (Web/XMPP/JVB/Jicofo), Mailpit

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:13 +01:00

192 lines
6.3 KiB
Python

import io
import logging
from datetime import timedelta
from typing import Any, Optional
from minio import Minio
from minio.error import S3Error
from config import settings
logger = logging.getLogger("rag-service.minio")
class MinioClientWrapper:
"""Thin wrapper around the Minio Python client for BreakPilot document storage."""
def __init__(self) -> None:
self._client: Optional[Minio] = None
@property
def client(self) -> Minio:
if self._client is None:
self._client = Minio(
endpoint=settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
secure=settings.MINIO_SECURE,
)
logger.info("Connected to MinIO at %s", settings.MINIO_ENDPOINT)
return self._client
# ------------------------------------------------------------------
# Bucket init
# ------------------------------------------------------------------
async def init_bucket(self) -> None:
"""Create the configured bucket if it does not exist."""
bucket = settings.MINIO_BUCKET
try:
if not self.client.bucket_exists(bucket):
self.client.make_bucket(bucket)
logger.info("Created MinIO bucket '%s'", bucket)
else:
logger.debug("MinIO bucket '%s' already exists", bucket)
except S3Error as exc:
logger.error("Failed to init bucket '%s': %s", bucket, exc)
raise
# ------------------------------------------------------------------
# Upload / Download
# ------------------------------------------------------------------
async def upload_document(
self,
object_name: str,
data: bytes,
content_type: str = "application/octet-stream",
metadata: Optional[dict[str, str]] = None,
) -> dict[str, Any]:
"""Upload bytes to MinIO and return storage info."""
stream = io.BytesIO(data)
result = self.client.put_object(
bucket_name=settings.MINIO_BUCKET,
object_name=object_name,
data=stream,
length=len(data),
content_type=content_type,
metadata=metadata,
)
logger.info("Uploaded '%s' (%d bytes)", object_name, len(data))
return {
"object_name": object_name,
"bucket": settings.MINIO_BUCKET,
"etag": result.etag,
"size": len(data),
}
async def download_document(self, object_name: str) -> bytes:
"""Download a document from MinIO and return raw bytes."""
try:
response = self.client.get_object(settings.MINIO_BUCKET, object_name)
data = response.read()
response.close()
response.release_conn()
logger.debug("Downloaded '%s' (%d bytes)", object_name, len(data))
return data
except S3Error as exc:
logger.error("Failed to download '%s': %s", object_name, exc)
raise
# ------------------------------------------------------------------
# List / Delete
# ------------------------------------------------------------------
async def list_documents(
self, prefix: Optional[str] = None
) -> list[dict[str, Any]]:
"""List objects under the given prefix."""
objects = self.client.list_objects(
settings.MINIO_BUCKET, prefix=prefix, recursive=True
)
results = []
for obj in objects:
results.append(
{
"object_name": obj.object_name,
"size": obj.size,
"last_modified": obj.last_modified.isoformat() if obj.last_modified else None,
"etag": obj.etag,
"content_type": obj.content_type,
}
)
return results
async def delete_document(self, object_name: str) -> bool:
"""Remove a single object."""
try:
self.client.remove_object(settings.MINIO_BUCKET, object_name)
logger.info("Deleted '%s' from bucket '%s'", object_name, settings.MINIO_BUCKET)
return True
except S3Error as exc:
logger.error("Failed to delete '%s': %s", object_name, exc)
raise
# ------------------------------------------------------------------
# Presigned URL
# ------------------------------------------------------------------
async def get_presigned_url(
self, object_name: str, expires_hours: int = 1
) -> str:
"""Generate a temporary presigned download URL."""
url = self.client.presigned_get_object(
settings.MINIO_BUCKET,
object_name,
expires=timedelta(hours=expires_hours),
)
return url
# ------------------------------------------------------------------
# Storage stats
# ------------------------------------------------------------------
async def get_storage_stats(
self, prefix: Optional[str] = None
) -> dict[str, Any]:
"""Calculate total size and file count under prefix."""
objects = self.client.list_objects(
settings.MINIO_BUCKET, prefix=prefix, recursive=True
)
total_size = 0
count = 0
for obj in objects:
total_size += obj.size or 0
count += 1
return {
"prefix": prefix,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"file_count": count,
}
# ------------------------------------------------------------------
# Structured path helper
# ------------------------------------------------------------------
@staticmethod
def get_minio_path(
data_type: str,
bundesland: str,
use_case: str,
year: str,
filename: str,
) -> str:
"""
Build a structured object path.
Example: eh/niedersachsen/mathematik/2024/aufgabe_01.pdf
"""
parts = [
data_type.strip("/"),
bundesland.lower().strip("/"),
use_case.lower().strip("/"),
str(year).strip("/"),
filename.strip("/"),
]
return "/".join(parts)
# Singleton
minio_wrapper = MinioClientWrapper()