8510af46eb
Phase 0: Quality Audit script (Claude Sonnet, 1750 samples) Phase 1: Object ontology expanded 31 → 74 tokens with descriptions + boundaries Phase 2: 174K controls re-classified via Haiku (10 batches, $50) - Generic tokens removed (documentation, procedure, process) - L2 sub-topics added (108K + 64K controls) - Bad subtopics fixed (stakeholder_*, escalation fragments) Phase 3: Re-clustering K=18704 (37K objects → 16.7K groups) Phase 4: Direct MC generation from canonical tokens (gpre2_direct_mc.py) Phase 5: Regulation-source split (gpre3, dry-run tested) New features: - Tenant-isolated document upload API (rag-service) - BAuA crawler (Playwright, 131 PDFs downloaded) - OSHA Technical Manual crawler (23 chapters) - CE obligation extractor (6141 obligations from Qdrant) RAG ingestion: - 126 BAuA PDFs (TRBS/TRGS/ASR): 27,664 chunks - OSHA Technical Manual: 7,241 chunks - OSHA 1910 Subpart O (full): 745 chunks - EuGH C-588/21 P: 216 chunks - EU 2018/1725: 842 chunks Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
202 lines
6.8 KiB
Python
202 lines
6.8 KiB
Python
import io
|
|
import logging
|
|
from datetime import timedelta
|
|
from typing import Any, Optional
|
|
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger("rag-service.minio")
|
|
|
|
|
|
class MinioClientWrapper:
|
|
"""Thin wrapper around the Minio Python client for BreakPilot document storage."""
|
|
|
|
def __init__(self) -> None:
|
|
self._client: Optional[Minio] = None
|
|
|
|
@property
|
|
def client(self) -> Minio:
|
|
if self._client is None:
|
|
self._client = Minio(
|
|
endpoint=settings.MINIO_ENDPOINT,
|
|
access_key=settings.MINIO_ACCESS_KEY,
|
|
secret_key=settings.MINIO_SECRET_KEY,
|
|
secure=settings.MINIO_SECURE,
|
|
)
|
|
logger.info("Connected to MinIO at %s", settings.MINIO_ENDPOINT)
|
|
return self._client
|
|
|
|
# ------------------------------------------------------------------
|
|
# Bucket init
|
|
# ------------------------------------------------------------------
|
|
|
|
async def init_bucket(self) -> None:
|
|
"""Create the configured bucket if it does not exist."""
|
|
bucket = settings.MINIO_BUCKET
|
|
try:
|
|
if not self.client.bucket_exists(bucket):
|
|
self.client.make_bucket(bucket)
|
|
logger.info("Created MinIO bucket '%s'", bucket)
|
|
else:
|
|
logger.debug("MinIO bucket '%s' already exists", bucket)
|
|
except S3Error as exc:
|
|
logger.error("Failed to init bucket '%s': %s", bucket, exc)
|
|
raise
|
|
|
|
# ------------------------------------------------------------------
|
|
# Upload / Download
|
|
# ------------------------------------------------------------------
|
|
|
|
async def upload_document(
|
|
self,
|
|
object_name: str,
|
|
data: bytes,
|
|
content_type: str = "application/octet-stream",
|
|
metadata: Optional[dict[str, str]] = None,
|
|
) -> dict[str, Any]:
|
|
"""Upload bytes to MinIO and return storage info."""
|
|
stream = io.BytesIO(data)
|
|
result = self.client.put_object(
|
|
bucket_name=settings.MINIO_BUCKET,
|
|
object_name=object_name,
|
|
data=stream,
|
|
length=len(data),
|
|
content_type=content_type,
|
|
metadata=metadata,
|
|
)
|
|
logger.info("Uploaded '%s' (%d bytes)", object_name, len(data))
|
|
return {
|
|
"object_name": object_name,
|
|
"bucket": settings.MINIO_BUCKET,
|
|
"etag": result.etag,
|
|
"size": len(data),
|
|
}
|
|
|
|
async def download_document(self, object_name: str) -> bytes:
|
|
"""Download a document from MinIO and return raw bytes."""
|
|
try:
|
|
response = self.client.get_object(settings.MINIO_BUCKET, object_name)
|
|
data = response.read()
|
|
response.close()
|
|
response.release_conn()
|
|
logger.debug("Downloaded '%s' (%d bytes)", object_name, len(data))
|
|
return data
|
|
except S3Error as exc:
|
|
logger.error("Failed to download '%s': %s", object_name, exc)
|
|
raise
|
|
|
|
# ------------------------------------------------------------------
|
|
# List / Delete
|
|
# ------------------------------------------------------------------
|
|
|
|
async def list_documents(
|
|
self, prefix: Optional[str] = None
|
|
) -> list[dict[str, Any]]:
|
|
"""List objects under the given prefix."""
|
|
objects = self.client.list_objects(
|
|
settings.MINIO_BUCKET, prefix=prefix, recursive=True
|
|
)
|
|
results = []
|
|
for obj in objects:
|
|
results.append(
|
|
{
|
|
"object_name": obj.object_name,
|
|
"size": obj.size,
|
|
"last_modified": obj.last_modified.isoformat() if obj.last_modified else None,
|
|
"etag": obj.etag,
|
|
"content_type": obj.content_type,
|
|
}
|
|
)
|
|
return results
|
|
|
|
async def delete_document(self, object_name: str) -> bool:
|
|
"""Remove a single object."""
|
|
try:
|
|
self.client.remove_object(settings.MINIO_BUCKET, object_name)
|
|
logger.info("Deleted '%s' from bucket '%s'", object_name, settings.MINIO_BUCKET)
|
|
return True
|
|
except S3Error as exc:
|
|
logger.error("Failed to delete '%s': %s", object_name, exc)
|
|
raise
|
|
|
|
async def delete_by_prefix(self, prefix: str) -> int:
|
|
"""Remove all objects under a prefix."""
|
|
objects = self.client.list_objects(settings.MINIO_BUCKET, prefix=prefix, recursive=True)
|
|
count = 0
|
|
for obj in objects:
|
|
self.client.remove_object(settings.MINIO_BUCKET, obj.object_name)
|
|
count += 1
|
|
logger.info("Deleted %d objects with prefix '%s'", count, prefix)
|
|
return count
|
|
|
|
# ------------------------------------------------------------------
|
|
# Presigned URL
|
|
# ------------------------------------------------------------------
|
|
|
|
async def get_presigned_url(
|
|
self, object_name: str, expires_hours: int = 1
|
|
) -> str:
|
|
"""Generate a temporary presigned download URL."""
|
|
url = self.client.presigned_get_object(
|
|
settings.MINIO_BUCKET,
|
|
object_name,
|
|
expires=timedelta(hours=expires_hours),
|
|
)
|
|
return url
|
|
|
|
# ------------------------------------------------------------------
|
|
# Storage stats
|
|
# ------------------------------------------------------------------
|
|
|
|
async def get_storage_stats(
|
|
self, prefix: Optional[str] = None
|
|
) -> dict[str, Any]:
|
|
"""Calculate total size and file count under prefix."""
|
|
objects = self.client.list_objects(
|
|
settings.MINIO_BUCKET, prefix=prefix, recursive=True
|
|
)
|
|
total_size = 0
|
|
count = 0
|
|
for obj in objects:
|
|
total_size += obj.size or 0
|
|
count += 1
|
|
return {
|
|
"prefix": prefix,
|
|
"total_size_bytes": total_size,
|
|
"total_size_mb": round(total_size / (1024 * 1024), 2),
|
|
"file_count": count,
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Structured path helper
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def get_minio_path(
|
|
data_type: str,
|
|
bundesland: str,
|
|
use_case: str,
|
|
year: str,
|
|
filename: str,
|
|
) -> str:
|
|
"""
|
|
Build a structured object path.
|
|
|
|
Example: eh/niedersachsen/mathematik/2024/aufgabe_01.pdf
|
|
"""
|
|
parts = [
|
|
data_type.strip("/"),
|
|
bundesland.lower().strip("/"),
|
|
use_case.lower().strip("/"),
|
|
str(year).strip("/"),
|
|
filename.strip("/"),
|
|
]
|
|
return "/".join(parts)
|
|
|
|
|
|
# Singleton
|
|
minio_wrapper = MinioClientWrapper()
|