""" MinIO Client Utility S3-compatible storage operations for AOI bundles """ import os from typing import Optional, BinaryIO import structlog from minio import Minio from minio.error import S3Error from config import settings logger = structlog.get_logger(__name__) class MinioClient: """ Client for MinIO S3-compatible storage. Used for storing generated AOI bundles and assets. """ def __init__(self): self.endpoint = settings.minio_endpoint self.access_key = settings.minio_access_key self.secret_key = settings.minio_secret_key self.bucket = settings.minio_bucket self.secure = settings.minio_secure self._client: Optional[Minio] = None @property def client(self) -> Minio: """Get or create MinIO client instance.""" if self._client is None: self._client = Minio( self.endpoint, access_key=self.access_key, secret_key=self.secret_key, secure=self.secure, ) self._ensure_bucket_exists() return self._client def _ensure_bucket_exists(self): """Create the bucket if it doesn't exist.""" try: if not self._client.bucket_exists(self.bucket): self._client.make_bucket(self.bucket) logger.info("Created MinIO bucket", bucket=self.bucket) except S3Error as e: logger.error("Error creating bucket", error=str(e)) async def upload_file( self, local_path: str, object_name: str, content_type: str = "application/octet-stream", ) -> Optional[str]: """ Upload a file to MinIO. Args: local_path: Path to local file object_name: Name in MinIO (can include path) content_type: MIME type of the file Returns: Object URL or None on failure """ try: self.client.fput_object( self.bucket, object_name, local_path, content_type=content_type, ) logger.info("Uploaded file to MinIO", object_name=object_name) return f"{self.endpoint}/{self.bucket}/{object_name}" except S3Error as e: logger.error("Error uploading file", error=str(e)) return None async def upload_bytes( self, data: bytes, object_name: str, content_type: str = "application/octet-stream", ) -> Optional[str]: """ Upload bytes to MinIO. Args: data: Bytes to upload object_name: Name in MinIO content_type: MIME type Returns: Object URL or None on failure """ from io import BytesIO try: stream = BytesIO(data) self.client.put_object( self.bucket, object_name, stream, length=len(data), content_type=content_type, ) logger.info("Uploaded bytes to MinIO", object_name=object_name, size=len(data)) return f"{self.endpoint}/{self.bucket}/{object_name}" except S3Error as e: logger.error("Error uploading bytes", error=str(e)) return None async def download_file( self, object_name: str, local_path: str, ) -> bool: """ Download a file from MinIO. Args: object_name: Name in MinIO local_path: Destination path Returns: True on success """ try: self.client.fget_object(self.bucket, object_name, local_path) logger.info("Downloaded file from MinIO", object_name=object_name) return True except S3Error as e: logger.error("Error downloading file", error=str(e)) return False async def get_bytes(self, object_name: str) -> Optional[bytes]: """ Get object content as bytes. Args: object_name: Name in MinIO Returns: File content or None """ try: response = self.client.get_object(self.bucket, object_name) data = response.read() response.close() response.release_conn() return data except S3Error as e: logger.error("Error getting bytes", error=str(e)) return None async def delete_object(self, object_name: str) -> bool: """ Delete an object from MinIO. Args: object_name: Name in MinIO Returns: True on success """ try: self.client.remove_object(self.bucket, object_name) logger.info("Deleted object from MinIO", object_name=object_name) return True except S3Error as e: logger.error("Error deleting object", error=str(e)) return False async def list_objects(self, prefix: str = "") -> list[str]: """ List objects in the bucket. Args: prefix: Filter by prefix Returns: List of object names """ try: objects = self.client.list_objects(self.bucket, prefix=prefix) return [obj.object_name for obj in objects] except S3Error as e: logger.error("Error listing objects", error=str(e)) return [] async def get_presigned_url( self, object_name: str, expiry_hours: int = 24, ) -> Optional[str]: """ Get a presigned URL for downloading an object. Args: object_name: Name in MinIO expiry_hours: URL expiry time in hours Returns: Presigned URL or None """ from datetime import timedelta try: url = self.client.presigned_get_object( self.bucket, object_name, expires=timedelta(hours=expiry_hours), ) return url except S3Error as e: logger.error("Error generating presigned URL", error=str(e)) return None async def object_exists(self, object_name: str) -> bool: """Check if an object exists.""" try: self.client.stat_object(self.bucket, object_name) return True except S3Error: return False async def get_object_size(self, object_name: str) -> Optional[int]: """Get the size of an object in bytes.""" try: stat = self.client.stat_object(self.bucket, object_name) return stat.size except S3Error: return None