import io import logging from datetime import timedelta from typing import Any, Optional from minio import Minio from minio.error import S3Error from config import settings logger = logging.getLogger("rag-service.minio") class MinioClientWrapper: """Thin wrapper around the Minio Python client for BreakPilot document storage.""" def __init__(self) -> None: self._client: Optional[Minio] = None @property def client(self) -> Minio: if self._client is None: self._client = Minio( endpoint=settings.MINIO_ENDPOINT, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE, ) logger.info("Connected to MinIO at %s", settings.MINIO_ENDPOINT) return self._client # ------------------------------------------------------------------ # Bucket init # ------------------------------------------------------------------ async def init_bucket(self) -> None: """Create the configured bucket if it does not exist.""" bucket = settings.MINIO_BUCKET try: if not self.client.bucket_exists(bucket): self.client.make_bucket(bucket) logger.info("Created MinIO bucket '%s'", bucket) else: logger.debug("MinIO bucket '%s' already exists", bucket) except S3Error as exc: logger.error("Failed to init bucket '%s': %s", bucket, exc) raise # ------------------------------------------------------------------ # Upload / Download # ------------------------------------------------------------------ async def upload_document( self, object_name: str, data: bytes, content_type: str = "application/octet-stream", metadata: Optional[dict[str, str]] = None, ) -> dict[str, Any]: """Upload bytes to MinIO and return storage info.""" stream = io.BytesIO(data) result = self.client.put_object( bucket_name=settings.MINIO_BUCKET, object_name=object_name, data=stream, length=len(data), content_type=content_type, metadata=metadata, ) logger.info("Uploaded '%s' (%d bytes)", object_name, len(data)) return { "object_name": object_name, "bucket": settings.MINIO_BUCKET, "etag": result.etag, "size": len(data), } async def download_document(self, object_name: str) -> bytes: """Download a document from MinIO and return raw bytes.""" try: response = self.client.get_object(settings.MINIO_BUCKET, object_name) data = response.read() response.close() response.release_conn() logger.debug("Downloaded '%s' (%d bytes)", object_name, len(data)) return data except S3Error as exc: logger.error("Failed to download '%s': %s", object_name, exc) raise # ------------------------------------------------------------------ # List / Delete # ------------------------------------------------------------------ async def list_documents( self, prefix: Optional[str] = None ) -> list[dict[str, Any]]: """List objects under the given prefix.""" objects = self.client.list_objects( settings.MINIO_BUCKET, prefix=prefix, recursive=True ) results = [] for obj in objects: results.append( { "object_name": obj.object_name, "size": obj.size, "last_modified": obj.last_modified.isoformat() if obj.last_modified else None, "etag": obj.etag, "content_type": obj.content_type, } ) return results async def delete_document(self, object_name: str) -> bool: """Remove a single object.""" try: self.client.remove_object(settings.MINIO_BUCKET, object_name) logger.info("Deleted '%s' from bucket '%s'", object_name, settings.MINIO_BUCKET) return True except S3Error as exc: logger.error("Failed to delete '%s': %s", object_name, exc) raise # ------------------------------------------------------------------ # Presigned URL # ------------------------------------------------------------------ async def get_presigned_url( self, object_name: str, expires_hours: int = 1 ) -> str: """Generate a temporary presigned download URL.""" url = self.client.presigned_get_object( settings.MINIO_BUCKET, object_name, expires=timedelta(hours=expires_hours), ) return url # ------------------------------------------------------------------ # Storage stats # ------------------------------------------------------------------ async def get_storage_stats( self, prefix: Optional[str] = None ) -> dict[str, Any]: """Calculate total size and file count under prefix.""" objects = self.client.list_objects( settings.MINIO_BUCKET, prefix=prefix, recursive=True ) total_size = 0 count = 0 for obj in objects: total_size += obj.size or 0 count += 1 return { "prefix": prefix, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2), "file_count": count, } # ------------------------------------------------------------------ # Structured path helper # ------------------------------------------------------------------ @staticmethod def get_minio_path( data_type: str, bundesland: str, use_case: str, year: str, filename: str, ) -> str: """ Build a structured object path. Example: eh/niedersachsen/mathematik/2024/aufgabe_01.pdf """ parts = [ data_type.strip("/"), bundesland.lower().strip("/"), use_case.lower().strip("/"), str(year).strip("/"), filename.strip("/"), ] return "/".join(parts) # Singleton minio_wrapper = MinioClientWrapper()