import logging import uuid from typing import Optional from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile from pydantic import BaseModel from api.auth import optional_jwt_auth from embedding_client import embedding_client from minio_client_wrapper import minio_wrapper from qdrant_client_wrapper import qdrant_wrapper logger = logging.getLogger("rag-service.api.documents") router = APIRouter(prefix="/api/v1/documents") # ---- Request / Response models -------------------------------------------- class DocumentUploadResponse(BaseModel): document_id: str object_name: str chunks_count: int vectors_indexed: int collection: str class DocumentDeleteRequest(BaseModel): object_name: str collection: str # ---- Endpoints ------------------------------------------------------------ @router.post("/upload", response_model=DocumentUploadResponse) async def upload_document( request: Request, file: UploadFile = File(...), collection: str = Form(default="bp_eh"), data_type: str = Form(default="eh"), bundesland: str = Form(default="niedersachsen"), use_case: str = Form(default="general"), year: str = Form(default="2024"), chunk_strategy: str = Form(default="recursive"), chunk_size: int = Form(default=512), chunk_overlap: int = Form(default=50), metadata_json: Optional[str] = Form(default=None), ): """ Upload a document: 1. Store original file in MinIO 2. Extract text (if PDF) via embedding-service 3. Chunk the text via embedding-service 4. Generate embeddings for each chunk 5. Index chunks + embeddings in Qdrant """ optional_jwt_auth(request) document_id = str(uuid.uuid4()) # --- Read file bytes --- try: file_bytes = await file.read() except Exception as exc: raise HTTPException(status_code=400, detail=f"Could not read uploaded file: {exc}") if len(file_bytes) == 0: raise HTTPException(status_code=400, detail="Uploaded file is empty") filename = file.filename or f"{document_id}.bin" content_type = file.content_type or "application/octet-stream" # --- Store in MinIO --- object_name = minio_wrapper.get_minio_path( data_type=data_type, bundesland=bundesland, use_case=use_case, year=year, filename=filename, ) try: minio_meta = { "document_id": document_id, "original_filename": filename, } await minio_wrapper.upload_document( object_name=object_name, data=file_bytes, content_type=content_type, metadata=minio_meta, ) except Exception as exc: logger.error("MinIO upload failed: %s", exc) raise HTTPException(status_code=500, detail=f"Failed to store file in MinIO: {exc}") # --- Extract text --- try: if content_type == "application/pdf" or filename.lower().endswith(".pdf"): text = await embedding_client.extract_pdf(file_bytes) else: # Try to decode as text text = file_bytes.decode("utf-8", errors="replace") except Exception as exc: logger.error("Text extraction failed: %s", exc) raise HTTPException(status_code=500, detail=f"Text extraction failed: {exc}") if not text or not text.strip(): raise HTTPException(status_code=400, detail="Could not extract any text from the document") # --- Chunk --- try: chunks = await embedding_client.chunk_text( text=text, strategy=chunk_strategy, chunk_size=chunk_size, overlap=chunk_overlap, ) except Exception as exc: logger.error("Chunking failed: %s", exc) raise HTTPException(status_code=500, detail=f"Chunking failed: {exc}") if not chunks: raise HTTPException(status_code=400, detail="Chunking produced zero chunks") # --- Embed --- try: embeddings = await embedding_client.generate_embeddings(chunks) except Exception as exc: logger.error("Embedding generation failed: %s", exc) raise HTTPException(status_code=500, detail=f"Embedding generation failed: {exc}") # --- Parse extra metadata --- extra_metadata: dict = {} if metadata_json: import json try: extra_metadata = json.loads(metadata_json) except json.JSONDecodeError: logger.warning("Invalid metadata_json, ignoring") # --- Build payloads --- payloads = [] for i, chunk in enumerate(chunks): payload = { "document_id": document_id, "object_name": object_name, "filename": filename, "chunk_index": i, "chunk_text": chunk, "data_type": data_type, "bundesland": bundesland, "use_case": use_case, "year": year, **extra_metadata, } payloads.append(payload) # --- Index in Qdrant --- try: indexed = await qdrant_wrapper.index_documents( collection=collection, vectors=embeddings, payloads=payloads, ) except Exception as exc: logger.error("Qdrant indexing failed: %s", exc) raise HTTPException(status_code=500, detail=f"Qdrant indexing failed: {exc}") return DocumentUploadResponse( document_id=document_id, object_name=object_name, chunks_count=len(chunks), vectors_indexed=indexed, collection=collection, ) @router.delete("") async def delete_document(body: DocumentDeleteRequest, request: Request): """Delete a document from both MinIO and Qdrant.""" optional_jwt_auth(request) errors: list[str] = [] # Delete from MinIO try: await minio_wrapper.delete_document(body.object_name) except Exception as exc: errors.append(f"MinIO delete failed: {exc}") # Delete vectors from Qdrant try: await qdrant_wrapper.delete_by_filter( collection=body.collection, filter_conditions={"object_name": body.object_name}, ) except Exception as exc: errors.append(f"Qdrant delete failed: {exc}") if errors: return {"deleted": False, "errors": errors} return {"deleted": True, "object_name": body.object_name, "collection": body.collection} @router.get("/list") async def list_documents( request: Request, prefix: Optional[str] = None, ): """List documents stored in MinIO.""" optional_jwt_auth(request) try: docs = await minio_wrapper.list_documents(prefix=prefix) return {"documents": docs, "count": len(docs)} except Exception as exc: logger.error("Failed to list documents: %s", exc) raise HTTPException(status_code=500, detail=str(exc)) @router.get("/download/{object_name:path}") async def download_document(object_name: str, request: Request): """Get a presigned download URL for a document.""" optional_jwt_auth(request) try: url = await minio_wrapper.get_presigned_url(object_name) return {"url": url, "object_name": object_name} except Exception as exc: logger.error("Failed to generate presigned URL for '%s': %s", object_name, exc) raise HTTPException(status_code=404, detail=f"Document not found: {exc}") @router.get("/stats") async def storage_stats( request: Request, prefix: Optional[str] = None, ): """Get storage stats (size, count) for a given prefix.""" optional_jwt_auth(request) try: stats = await minio_wrapper.get_storage_stats(prefix=prefix) return stats except Exception as exc: logger.error("Failed to get storage stats: %s", exc) raise HTTPException(status_code=500, detail=str(exc))