Docker Compose with 24+ services: - PostgreSQL (PostGIS), Valkey, MinIO, Qdrant - Vault (PKI/TLS), Nginx (Reverse Proxy) - Backend Core API, Consent Service, Billing Service - RAG Service, Embedding Service - Gitea, Woodpecker CI/CD - Night Scheduler, Health Aggregator - Jitsi (Web/XMPP/JVB/Jicofo), Mailpit Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
192 lines
6.3 KiB
Python
192 lines
6.3 KiB
Python
import io
|
|
import logging
|
|
from datetime import timedelta
|
|
from typing import Any, Optional
|
|
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger("rag-service.minio")
|
|
|
|
|
|
class MinioClientWrapper:
|
|
"""Thin wrapper around the Minio Python client for BreakPilot document storage."""
|
|
|
|
def __init__(self) -> None:
|
|
self._client: Optional[Minio] = None
|
|
|
|
@property
|
|
def client(self) -> Minio:
|
|
if self._client is None:
|
|
self._client = Minio(
|
|
endpoint=settings.MINIO_ENDPOINT,
|
|
access_key=settings.MINIO_ACCESS_KEY,
|
|
secret_key=settings.MINIO_SECRET_KEY,
|
|
secure=settings.MINIO_SECURE,
|
|
)
|
|
logger.info("Connected to MinIO at %s", settings.MINIO_ENDPOINT)
|
|
return self._client
|
|
|
|
# ------------------------------------------------------------------
|
|
# Bucket init
|
|
# ------------------------------------------------------------------
|
|
|
|
async def init_bucket(self) -> None:
|
|
"""Create the configured bucket if it does not exist."""
|
|
bucket = settings.MINIO_BUCKET
|
|
try:
|
|
if not self.client.bucket_exists(bucket):
|
|
self.client.make_bucket(bucket)
|
|
logger.info("Created MinIO bucket '%s'", bucket)
|
|
else:
|
|
logger.debug("MinIO bucket '%s' already exists", bucket)
|
|
except S3Error as exc:
|
|
logger.error("Failed to init bucket '%s': %s", bucket, exc)
|
|
raise
|
|
|
|
# ------------------------------------------------------------------
|
|
# Upload / Download
|
|
# ------------------------------------------------------------------
|
|
|
|
async def upload_document(
|
|
self,
|
|
object_name: str,
|
|
data: bytes,
|
|
content_type: str = "application/octet-stream",
|
|
metadata: Optional[dict[str, str]] = None,
|
|
) -> dict[str, Any]:
|
|
"""Upload bytes to MinIO and return storage info."""
|
|
stream = io.BytesIO(data)
|
|
result = self.client.put_object(
|
|
bucket_name=settings.MINIO_BUCKET,
|
|
object_name=object_name,
|
|
data=stream,
|
|
length=len(data),
|
|
content_type=content_type,
|
|
metadata=metadata,
|
|
)
|
|
logger.info("Uploaded '%s' (%d bytes)", object_name, len(data))
|
|
return {
|
|
"object_name": object_name,
|
|
"bucket": settings.MINIO_BUCKET,
|
|
"etag": result.etag,
|
|
"size": len(data),
|
|
}
|
|
|
|
async def download_document(self, object_name: str) -> bytes:
|
|
"""Download a document from MinIO and return raw bytes."""
|
|
try:
|
|
response = self.client.get_object(settings.MINIO_BUCKET, object_name)
|
|
data = response.read()
|
|
response.close()
|
|
response.release_conn()
|
|
logger.debug("Downloaded '%s' (%d bytes)", object_name, len(data))
|
|
return data
|
|
except S3Error as exc:
|
|
logger.error("Failed to download '%s': %s", object_name, exc)
|
|
raise
|
|
|
|
# ------------------------------------------------------------------
|
|
# List / Delete
|
|
# ------------------------------------------------------------------
|
|
|
|
async def list_documents(
|
|
self, prefix: Optional[str] = None
|
|
) -> list[dict[str, Any]]:
|
|
"""List objects under the given prefix."""
|
|
objects = self.client.list_objects(
|
|
settings.MINIO_BUCKET, prefix=prefix, recursive=True
|
|
)
|
|
results = []
|
|
for obj in objects:
|
|
results.append(
|
|
{
|
|
"object_name": obj.object_name,
|
|
"size": obj.size,
|
|
"last_modified": obj.last_modified.isoformat() if obj.last_modified else None,
|
|
"etag": obj.etag,
|
|
"content_type": obj.content_type,
|
|
}
|
|
)
|
|
return results
|
|
|
|
async def delete_document(self, object_name: str) -> bool:
|
|
"""Remove a single object."""
|
|
try:
|
|
self.client.remove_object(settings.MINIO_BUCKET, object_name)
|
|
logger.info("Deleted '%s' from bucket '%s'", object_name, settings.MINIO_BUCKET)
|
|
return True
|
|
except S3Error as exc:
|
|
logger.error("Failed to delete '%s': %s", object_name, exc)
|
|
raise
|
|
|
|
# ------------------------------------------------------------------
|
|
# Presigned URL
|
|
# ------------------------------------------------------------------
|
|
|
|
async def get_presigned_url(
|
|
self, object_name: str, expires_hours: int = 1
|
|
) -> str:
|
|
"""Generate a temporary presigned download URL."""
|
|
url = self.client.presigned_get_object(
|
|
settings.MINIO_BUCKET,
|
|
object_name,
|
|
expires=timedelta(hours=expires_hours),
|
|
)
|
|
return url
|
|
|
|
# ------------------------------------------------------------------
|
|
# Storage stats
|
|
# ------------------------------------------------------------------
|
|
|
|
async def get_storage_stats(
|
|
self, prefix: Optional[str] = None
|
|
) -> dict[str, Any]:
|
|
"""Calculate total size and file count under prefix."""
|
|
objects = self.client.list_objects(
|
|
settings.MINIO_BUCKET, prefix=prefix, recursive=True
|
|
)
|
|
total_size = 0
|
|
count = 0
|
|
for obj in objects:
|
|
total_size += obj.size or 0
|
|
count += 1
|
|
return {
|
|
"prefix": prefix,
|
|
"total_size_bytes": total_size,
|
|
"total_size_mb": round(total_size / (1024 * 1024), 2),
|
|
"file_count": count,
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Structured path helper
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def get_minio_path(
|
|
data_type: str,
|
|
bundesland: str,
|
|
use_case: str,
|
|
year: str,
|
|
filename: str,
|
|
) -> str:
|
|
"""
|
|
Build a structured object path.
|
|
|
|
Example: eh/niedersachsen/mathematik/2024/aufgabe_01.pdf
|
|
"""
|
|
parts = [
|
|
data_type.strip("/"),
|
|
bundesland.lower().strip("/"),
|
|
use_case.lower().strip("/"),
|
|
str(year).strip("/"),
|
|
filename.strip("/"),
|
|
]
|
|
return "/".join(parts)
|
|
|
|
|
|
# Singleton
|
|
minio_wrapper = MinioClientWrapper()
|