Docker Compose with 24+ services: - PostgreSQL (PostGIS), Valkey, MinIO, Qdrant - Vault (PKI/TLS), Nginx (Reverse Proxy) - Backend Core API, Consent Service, Billing Service - RAG Service, Embedding Service - Gitea, Woodpecker CI/CD - Night Scheduler, Health Aggregator - Jitsi (Web/XMPP/JVB/Jicofo), Mailpit Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
247 lines
7.7 KiB
Python
247 lines
7.7 KiB
Python
import logging
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile
|
|
from pydantic import BaseModel
|
|
|
|
from api.auth import optional_jwt_auth
|
|
from embedding_client import embedding_client
|
|
from minio_client_wrapper import minio_wrapper
|
|
from qdrant_client_wrapper import qdrant_wrapper
|
|
|
|
logger = logging.getLogger("rag-service.api.documents")
|
|
|
|
router = APIRouter(prefix="/api/v1/documents")
|
|
|
|
|
|
# ---- Request / Response models --------------------------------------------
|
|
|
|
class DocumentUploadResponse(BaseModel):
|
|
document_id: str
|
|
object_name: str
|
|
chunks_count: int
|
|
vectors_indexed: int
|
|
collection: str
|
|
|
|
|
|
class DocumentDeleteRequest(BaseModel):
|
|
object_name: str
|
|
collection: str
|
|
|
|
|
|
# ---- Endpoints ------------------------------------------------------------
|
|
|
|
@router.post("/upload", response_model=DocumentUploadResponse)
|
|
async def upload_document(
|
|
request: Request,
|
|
file: UploadFile = File(...),
|
|
collection: str = Form(default="bp_eh"),
|
|
data_type: str = Form(default="eh"),
|
|
bundesland: str = Form(default="niedersachsen"),
|
|
use_case: str = Form(default="general"),
|
|
year: str = Form(default="2024"),
|
|
chunk_strategy: str = Form(default="recursive"),
|
|
chunk_size: int = Form(default=512),
|
|
chunk_overlap: int = Form(default=50),
|
|
metadata_json: Optional[str] = Form(default=None),
|
|
):
|
|
"""
|
|
Upload a document:
|
|
1. Store original file in MinIO
|
|
2. Extract text (if PDF) via embedding-service
|
|
3. Chunk the text via embedding-service
|
|
4. Generate embeddings for each chunk
|
|
5. Index chunks + embeddings in Qdrant
|
|
"""
|
|
optional_jwt_auth(request)
|
|
|
|
document_id = str(uuid.uuid4())
|
|
|
|
# --- Read file bytes ---
|
|
try:
|
|
file_bytes = await file.read()
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=f"Could not read uploaded file: {exc}")
|
|
|
|
if len(file_bytes) == 0:
|
|
raise HTTPException(status_code=400, detail="Uploaded file is empty")
|
|
|
|
filename = file.filename or f"{document_id}.bin"
|
|
content_type = file.content_type or "application/octet-stream"
|
|
|
|
# --- Store in MinIO ---
|
|
object_name = minio_wrapper.get_minio_path(
|
|
data_type=data_type,
|
|
bundesland=bundesland,
|
|
use_case=use_case,
|
|
year=year,
|
|
filename=filename,
|
|
)
|
|
|
|
try:
|
|
minio_meta = {
|
|
"document_id": document_id,
|
|
"original_filename": filename,
|
|
}
|
|
await minio_wrapper.upload_document(
|
|
object_name=object_name,
|
|
data=file_bytes,
|
|
content_type=content_type,
|
|
metadata=minio_meta,
|
|
)
|
|
except Exception as exc:
|
|
logger.error("MinIO upload failed: %s", exc)
|
|
raise HTTPException(status_code=500, detail=f"Failed to store file in MinIO: {exc}")
|
|
|
|
# --- Extract text ---
|
|
try:
|
|
if content_type == "application/pdf" or filename.lower().endswith(".pdf"):
|
|
text = await embedding_client.extract_pdf(file_bytes)
|
|
else:
|
|
# Try to decode as text
|
|
text = file_bytes.decode("utf-8", errors="replace")
|
|
except Exception as exc:
|
|
logger.error("Text extraction failed: %s", exc)
|
|
raise HTTPException(status_code=500, detail=f"Text extraction failed: {exc}")
|
|
|
|
if not text or not text.strip():
|
|
raise HTTPException(status_code=400, detail="Could not extract any text from the document")
|
|
|
|
# --- Chunk ---
|
|
try:
|
|
chunks = await embedding_client.chunk_text(
|
|
text=text,
|
|
strategy=chunk_strategy,
|
|
chunk_size=chunk_size,
|
|
overlap=chunk_overlap,
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Chunking failed: %s", exc)
|
|
raise HTTPException(status_code=500, detail=f"Chunking failed: {exc}")
|
|
|
|
if not chunks:
|
|
raise HTTPException(status_code=400, detail="Chunking produced zero chunks")
|
|
|
|
# --- Embed ---
|
|
try:
|
|
embeddings = await embedding_client.generate_embeddings(chunks)
|
|
except Exception as exc:
|
|
logger.error("Embedding generation failed: %s", exc)
|
|
raise HTTPException(status_code=500, detail=f"Embedding generation failed: {exc}")
|
|
|
|
# --- Parse extra metadata ---
|
|
extra_metadata: dict = {}
|
|
if metadata_json:
|
|
import json
|
|
try:
|
|
extra_metadata = json.loads(metadata_json)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Invalid metadata_json, ignoring")
|
|
|
|
# --- Build payloads ---
|
|
payloads = []
|
|
for i, chunk in enumerate(chunks):
|
|
payload = {
|
|
"document_id": document_id,
|
|
"object_name": object_name,
|
|
"filename": filename,
|
|
"chunk_index": i,
|
|
"chunk_text": chunk,
|
|
"data_type": data_type,
|
|
"bundesland": bundesland,
|
|
"use_case": use_case,
|
|
"year": year,
|
|
**extra_metadata,
|
|
}
|
|
payloads.append(payload)
|
|
|
|
# --- Index in Qdrant ---
|
|
try:
|
|
indexed = await qdrant_wrapper.index_documents(
|
|
collection=collection,
|
|
vectors=embeddings,
|
|
payloads=payloads,
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Qdrant indexing failed: %s", exc)
|
|
raise HTTPException(status_code=500, detail=f"Qdrant indexing failed: {exc}")
|
|
|
|
return DocumentUploadResponse(
|
|
document_id=document_id,
|
|
object_name=object_name,
|
|
chunks_count=len(chunks),
|
|
vectors_indexed=indexed,
|
|
collection=collection,
|
|
)
|
|
|
|
|
|
@router.delete("")
|
|
async def delete_document(body: DocumentDeleteRequest, request: Request):
|
|
"""Delete a document from both MinIO and Qdrant."""
|
|
optional_jwt_auth(request)
|
|
|
|
errors: list[str] = []
|
|
|
|
# Delete from MinIO
|
|
try:
|
|
await minio_wrapper.delete_document(body.object_name)
|
|
except Exception as exc:
|
|
errors.append(f"MinIO delete failed: {exc}")
|
|
|
|
# Delete vectors from Qdrant
|
|
try:
|
|
await qdrant_wrapper.delete_by_filter(
|
|
collection=body.collection,
|
|
filter_conditions={"object_name": body.object_name},
|
|
)
|
|
except Exception as exc:
|
|
errors.append(f"Qdrant delete failed: {exc}")
|
|
|
|
if errors:
|
|
return {"deleted": False, "errors": errors}
|
|
|
|
return {"deleted": True, "object_name": body.object_name, "collection": body.collection}
|
|
|
|
|
|
@router.get("/list")
|
|
async def list_documents(
|
|
request: Request,
|
|
prefix: Optional[str] = None,
|
|
):
|
|
"""List documents stored in MinIO."""
|
|
optional_jwt_auth(request)
|
|
try:
|
|
docs = await minio_wrapper.list_documents(prefix=prefix)
|
|
return {"documents": docs, "count": len(docs)}
|
|
except Exception as exc:
|
|
logger.error("Failed to list documents: %s", exc)
|
|
raise HTTPException(status_code=500, detail=str(exc))
|
|
|
|
|
|
@router.get("/download/{object_name:path}")
|
|
async def download_document(object_name: str, request: Request):
|
|
"""Get a presigned download URL for a document."""
|
|
optional_jwt_auth(request)
|
|
try:
|
|
url = await minio_wrapper.get_presigned_url(object_name)
|
|
return {"url": url, "object_name": object_name}
|
|
except Exception as exc:
|
|
logger.error("Failed to generate presigned URL for '%s': %s", object_name, exc)
|
|
raise HTTPException(status_code=404, detail=f"Document not found: {exc}")
|
|
|
|
|
|
@router.get("/stats")
|
|
async def storage_stats(
|
|
request: Request,
|
|
prefix: Optional[str] = None,
|
|
):
|
|
"""Get storage stats (size, count) for a given prefix."""
|
|
optional_jwt_auth(request)
|
|
try:
|
|
stats = await minio_wrapper.get_storage_stats(prefix=prefix)
|
|
return stats
|
|
except Exception as exc:
|
|
logger.error("Failed to get storage stats: %s", exc)
|
|
raise HTTPException(status_code=500, detail=str(exc))
|