This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/geo-service/utils/minio_client.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

238 lines
6.7 KiB
Python

"""
MinIO Client Utility
S3-compatible storage operations for AOI bundles
"""
import os
from typing import Optional, BinaryIO
import structlog
from minio import Minio
from minio.error import S3Error
from config import settings
logger = structlog.get_logger(__name__)
class MinioClient:
"""
Client for MinIO S3-compatible storage.
Used for storing generated AOI bundles and assets.
"""
def __init__(self):
self.endpoint = settings.minio_endpoint
self.access_key = settings.minio_access_key
self.secret_key = settings.minio_secret_key
self.bucket = settings.minio_bucket
self.secure = settings.minio_secure
self._client: Optional[Minio] = None
@property
def client(self) -> Minio:
"""Get or create MinIO client instance."""
if self._client is None:
self._client = Minio(
self.endpoint,
access_key=self.access_key,
secret_key=self.secret_key,
secure=self.secure,
)
self._ensure_bucket_exists()
return self._client
def _ensure_bucket_exists(self):
"""Create the bucket if it doesn't exist."""
try:
if not self._client.bucket_exists(self.bucket):
self._client.make_bucket(self.bucket)
logger.info("Created MinIO bucket", bucket=self.bucket)
except S3Error as e:
logger.error("Error creating bucket", error=str(e))
async def upload_file(
self,
local_path: str,
object_name: str,
content_type: str = "application/octet-stream",
) -> Optional[str]:
"""
Upload a file to MinIO.
Args:
local_path: Path to local file
object_name: Name in MinIO (can include path)
content_type: MIME type of the file
Returns:
Object URL or None on failure
"""
try:
self.client.fput_object(
self.bucket,
object_name,
local_path,
content_type=content_type,
)
logger.info("Uploaded file to MinIO", object_name=object_name)
return f"{self.endpoint}/{self.bucket}/{object_name}"
except S3Error as e:
logger.error("Error uploading file", error=str(e))
return None
async def upload_bytes(
self,
data: bytes,
object_name: str,
content_type: str = "application/octet-stream",
) -> Optional[str]:
"""
Upload bytes to MinIO.
Args:
data: Bytes to upload
object_name: Name in MinIO
content_type: MIME type
Returns:
Object URL or None on failure
"""
from io import BytesIO
try:
stream = BytesIO(data)
self.client.put_object(
self.bucket,
object_name,
stream,
length=len(data),
content_type=content_type,
)
logger.info("Uploaded bytes to MinIO", object_name=object_name, size=len(data))
return f"{self.endpoint}/{self.bucket}/{object_name}"
except S3Error as e:
logger.error("Error uploading bytes", error=str(e))
return None
async def download_file(
self,
object_name: str,
local_path: str,
) -> bool:
"""
Download a file from MinIO.
Args:
object_name: Name in MinIO
local_path: Destination path
Returns:
True on success
"""
try:
self.client.fget_object(self.bucket, object_name, local_path)
logger.info("Downloaded file from MinIO", object_name=object_name)
return True
except S3Error as e:
logger.error("Error downloading file", error=str(e))
return False
async def get_bytes(self, object_name: str) -> Optional[bytes]:
"""
Get object content as bytes.
Args:
object_name: Name in MinIO
Returns:
File content or None
"""
try:
response = self.client.get_object(self.bucket, object_name)
data = response.read()
response.close()
response.release_conn()
return data
except S3Error as e:
logger.error("Error getting bytes", error=str(e))
return None
async def delete_object(self, object_name: str) -> bool:
"""
Delete an object from MinIO.
Args:
object_name: Name in MinIO
Returns:
True on success
"""
try:
self.client.remove_object(self.bucket, object_name)
logger.info("Deleted object from MinIO", object_name=object_name)
return True
except S3Error as e:
logger.error("Error deleting object", error=str(e))
return False
async def list_objects(self, prefix: str = "") -> list[str]:
"""
List objects in the bucket.
Args:
prefix: Filter by prefix
Returns:
List of object names
"""
try:
objects = self.client.list_objects(self.bucket, prefix=prefix)
return [obj.object_name for obj in objects]
except S3Error as e:
logger.error("Error listing objects", error=str(e))
return []
async def get_presigned_url(
self,
object_name: str,
expiry_hours: int = 24,
) -> Optional[str]:
"""
Get a presigned URL for downloading an object.
Args:
object_name: Name in MinIO
expiry_hours: URL expiry time in hours
Returns:
Presigned URL or None
"""
from datetime import timedelta
try:
url = self.client.presigned_get_object(
self.bucket,
object_name,
expires=timedelta(hours=expiry_hours),
)
return url
except S3Error as e:
logger.error("Error generating presigned URL", error=str(e))
return None
async def object_exists(self, object_name: str) -> bool:
"""Check if an object exists."""
try:
self.client.stat_object(self.bucket, object_name)
return True
except S3Error:
return False
async def get_object_size(self, object_name: str) -> Optional[int]:
"""Get the size of an object in bytes."""
try:
stat = self.client.stat_object(self.bucket, object_name)
return stat.size
except S3Error:
return None