""" Tenant-isolated document upload, listing, and deletion. Each tenant gets their own Qdrant collection (bp_docs_tenant_{short_id}). Documents are stored in MinIO under tenant-specific paths. No data crosses tenant boundaries. Endpoints: POST /api/v1/tenant/documents - Upload + process PDF GET /api/v1/tenant/documents - List tenant's documents DELETE /api/v1/tenant/documents/{doc_id} - Delete document + vectors GET /api/v1/tenant/documents/{doc_id}/status - Processing status """ import json import logging import uuid from typing import Optional from fastapi import APIRouter, File, Form, HTTPException, Header, Request, UploadFile from pydantic import BaseModel from api.auth import optional_jwt_auth from embedding_client import embedding_client from html_utils import decode_html_bytes, looks_like_html, strip_html from minio_client_wrapper import minio_wrapper from qdrant_client_wrapper import qdrant_wrapper logger = logging.getLogger("rag-service.api.tenant-documents") router = APIRouter(prefix="/api/v1/tenant/documents") VECTOR_DIM = 1024 # bge-m3 dimension MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB ALLOWED_TYPES = {"application/pdf", "text/html", "text/plain"} PDF_MAGIC = b"%PDF" def _collection_name(tenant_id: str) -> str: """Derive tenant-specific Qdrant collection name.""" short = tenant_id.replace("-", "")[:12] return f"bp_docs_tenant_{short}" def _storage_path(tenant_id: str, document_id: str, filename: str) -> str: """Derive tenant-isolated storage path.""" short = tenant_id.replace("-", "")[:12] return f"tenant_docs/{short}/{document_id}/{filename}" def _extract_tenant_id( request: Request, x_tenant_id: Optional[str] = Header(None), ) -> str: """Extract tenant ID from header. Required for all tenant endpoints.""" tid = x_tenant_id or request.headers.get("x-tenant-id", "") if not tid: raise HTTPException(status_code=400, detail="X-Tenant-ID header required") return tid # ── Response models ──────────────────────────────────────────────── class DocumentResponse(BaseModel): id: str filename: str file_size: int status: str chunk_count: int collection: str created_at: Optional[str] = None class DocumentListResponse(BaseModel): documents: list[DocumentResponse] total: int # ── Endpoints ────────────────────────────────────────────────────── @router.post("", response_model=DocumentResponse) async def upload_tenant_document( request: Request, file: UploadFile = File(...), x_tenant_id: Optional[str] = Header(None), chunk_size: int = Form(default=512), chunk_overlap: int = Form(default=50), metadata_json: Optional[str] = Form(default=None), ): """Upload a document, process it, and index in tenant-specific collection.""" optional_jwt_auth(request) tenant_id = _extract_tenant_id(request, x_tenant_id) # Read + validate file_bytes = await file.read() if len(file_bytes) == 0: raise HTTPException(status_code=400, detail="Empty file") if len(file_bytes) > MAX_FILE_SIZE: raise HTTPException(status_code=413, detail=f"File too large (max {MAX_FILE_SIZE // 1024 // 1024} MB)") filename = file.filename or "document.pdf" content_type = file.content_type or "application/octet-stream" # PDF magic bytes check if filename.lower().endswith(".pdf") and not file_bytes[:4].startswith(PDF_MAGIC): raise HTTPException(status_code=400, detail="File claims to be PDF but magic bytes don't match") document_id = str(uuid.uuid4()) collection = _collection_name(tenant_id) object_name = _storage_path(tenant_id, document_id, filename) # Ensure collection exists await qdrant_wrapper.create_collection(collection, VECTOR_DIM) # Store in MinIO try: await minio_wrapper.upload_document( object_name=object_name, data=file_bytes, content_type=content_type, metadata={"document_id": document_id, "tenant_id": tenant_id}, ) except Exception as exc: logger.error("MinIO upload failed for tenant %s: %s", tenant_id, exc) raise HTTPException(status_code=500, detail="Storage failed") # Extract text try: text = await _extract_text(file_bytes, filename, content_type) except Exception as exc: logger.error("Text extraction failed: %s", exc) raise HTTPException(status_code=500, detail=f"Text extraction failed: {exc}") if not text or not text.strip(): raise HTTPException(status_code=400, detail="No text could be extracted") # Chunk chunk_result = await embedding_client.chunk_text( text=text, strategy="recursive", chunk_size=chunk_size, overlap=chunk_overlap, ) chunks = chunk_result.chunks chunks_meta = chunk_result.chunks_with_metadata if not chunks: raise HTTPException(status_code=400, detail="Chunking produced zero chunks") # Embed embeddings = await embedding_client.generate_embeddings(chunks) # Parse extra metadata extra_metadata = {} if metadata_json: try: extra_metadata = json.loads(metadata_json) except json.JSONDecodeError: pass # Build payloads with tenant isolation _STRUCT_FIELDS = ("section", "section_title", "paragraph", "paragraph_num", "page") payloads = [] for i, chunk in enumerate(chunks): payload = { "document_id": document_id, "tenant_id": tenant_id, "filename": filename, "chunk_index": i, "chunk_text": chunk, **extra_metadata, } if i < len(chunks_meta): for field in _STRUCT_FIELDS: value = chunks_meta[i].get(field) if value is not None and value != "": payload[field] = value payloads.append(payload) # Index in tenant collection indexed = await qdrant_wrapper.index_documents( collection=collection, vectors=embeddings, payloads=payloads, ) logger.info( "Tenant %s: uploaded %s (%d chunks, %d vectors) to %s", tenant_id[:8], filename, len(chunks), indexed, collection, ) return DocumentResponse( id=document_id, filename=filename, file_size=len(file_bytes), status="indexed", chunk_count=len(chunks), collection=collection, ) @router.get("", response_model=DocumentListResponse) async def list_tenant_documents( request: Request, x_tenant_id: Optional[str] = Header(None), ): """List all documents for this tenant.""" optional_jwt_auth(request) tenant_id = _extract_tenant_id(request, x_tenant_id) collection = _collection_name(tenant_id) try: # Get unique document_ids from Qdrant docs = await qdrant_wrapper.get_unique_documents(collection) except Exception: # Collection doesn't exist yet → no documents docs = [] return DocumentListResponse(documents=docs, total=len(docs)) @router.delete("/{doc_id}") async def delete_tenant_document( doc_id: str, request: Request, x_tenant_id: Optional[str] = Header(None), ): """Delete a document and all its vectors from tenant collection.""" optional_jwt_auth(request) tenant_id = _extract_tenant_id(request, x_tenant_id) collection = _collection_name(tenant_id) errors = [] # Delete vectors from Qdrant try: await qdrant_wrapper.delete_by_filter( collection=collection, filter_conditions={"document_id": doc_id}, ) except Exception as exc: errors.append(f"Qdrant: {exc}") # Delete file from MinIO try: prefix = f"tenant_docs/{tenant_id.replace('-', '')[:12]}/{doc_id}/" await minio_wrapper.delete_by_prefix(prefix) except Exception as exc: errors.append(f"MinIO: {exc}") if errors: logger.warning("Partial delete for %s/%s: %s", tenant_id[:8], doc_id[:8], errors) return {"deleted": True, "warnings": errors} logger.info("Tenant %s: deleted document %s", tenant_id[:8], doc_id[:8]) return {"deleted": True, "document_id": doc_id} @router.get("/{doc_id}/status") async def document_status( doc_id: str, request: Request, x_tenant_id: Optional[str] = Header(None), ): """Get processing status for a document.""" optional_jwt_auth(request) tenant_id = _extract_tenant_id(request, x_tenant_id) collection = _collection_name(tenant_id) try: count = await qdrant_wrapper.count_by_filter( collection=collection, filter_conditions={"document_id": doc_id}, ) status = "indexed" if count > 0 else "not_found" except Exception: count = 0 status = "not_found" return {"document_id": doc_id, "status": status, "chunk_count": count} # ── Helpers ──────────────────────────────────────────────────────── async def _extract_text(file_bytes: bytes, filename: str, content_type: str) -> str: """Extract text from PDF, HTML, or plain text.""" if content_type == "application/pdf" or filename.lower().endswith(".pdf"): return await embedding_client.extract_pdf(file_bytes) if filename.lower().endswith((".html", ".htm")): text = decode_html_bytes(file_bytes) return strip_html(text) text = file_bytes.decode("utf-8", errors="replace") if looks_like_html(text): return strip_html(text) return text