Files
breakpilot-core/rag-service/api/documents.py
T
Benjamin Admin dac2a9f685 feat(rag): Legal-Metadaten — article_label + deterministische IDs + chunk_hash
Neues pures Modul legal_metadata.py (nur stdlib, lokal+CI testbar): §3-Normalisierung
section->article, strikte Header-Extraktion (Datum/Seiten-Rauschen -> kein Falsch-Zitat),
citation_style pro Regulierung (EU/CH=article, DE=paragraph), Urteil=Aktenzeichen statt §,
camelCase-Klarnamen (ProdHaftG), deterministische uuid5-Point-ID + chunk_hash (sha256).
documents.py verdrahtet build_legal_fields in den Payload-Build + document_version.
10 Tests gruen. Vertrag: rag_reingest_spec.md (§2/§3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-20 14:35:07 +02:00

289 lines
9.7 KiB
Python

import logging
import uuid
from typing import Optional
from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile
from pydantic import BaseModel
from api.auth import optional_jwt_auth
from embedding_client import embedding_client
from html_utils import decode_html_bytes, looks_like_html, strip_html
from minio_client_wrapper import minio_wrapper
from qdrant_client_wrapper import qdrant_wrapper
from legal_metadata import build_legal_fields, compute_chunk_hash, deterministic_point_id
logger = logging.getLogger("rag-service.api.documents")
router = APIRouter(prefix="/api/v1/documents")
# Structural metadata fields from embedding-service chunks_with_metadata (D2)
_STRUCT_FIELDS = ("section", "section_title", "paragraph", "paragraph_num", "page")
# ---- Request / Response models --------------------------------------------
class DocumentUploadResponse(BaseModel):
document_id: str
object_name: str
chunks_count: int
vectors_indexed: int
collection: str
class DocumentDeleteRequest(BaseModel):
object_name: str
collection: str
# ---- Endpoints ------------------------------------------------------------
@router.post("/upload", response_model=DocumentUploadResponse)
async def upload_document(
request: Request,
file: UploadFile = File(...),
collection: str = Form(default="bp_eh"),
data_type: str = Form(default="eh"),
bundesland: str = Form(default="niedersachsen"),
use_case: str = Form(default="general"),
year: str = Form(default="2024"),
chunk_strategy: str = Form(default="recursive"),
chunk_size: int = Form(default=512),
chunk_overlap: int = Form(default=50),
metadata_json: Optional[str] = Form(default=None),
document_version: str = Form(default="1"),
):
"""
Upload a document:
1. Store original file in MinIO
2. Extract text (if PDF) via embedding-service
3. Chunk the text via embedding-service
4. Generate embeddings for each chunk
5. Index chunks + embeddings in Qdrant
"""
optional_jwt_auth(request)
document_id = str(uuid.uuid4())
# --- Read file bytes ---
try:
file_bytes = await file.read()
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Could not read uploaded file: {exc}")
if len(file_bytes) == 0:
raise HTTPException(status_code=400, detail="Uploaded file is empty")
filename = file.filename or f"{document_id}.bin"
content_type = file.content_type or "application/octet-stream"
# --- Store in MinIO ---
object_name = minio_wrapper.get_minio_path(
data_type=data_type,
bundesland=bundesland,
use_case=use_case,
year=year,
filename=filename,
)
try:
minio_meta = {
"document_id": document_id,
"original_filename": filename,
}
await minio_wrapper.upload_document(
object_name=object_name,
data=file_bytes,
content_type=content_type,
metadata=minio_meta,
)
except Exception as exc:
logger.error("MinIO upload failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Failed to store file in MinIO: {exc}")
# --- Extract text ---
try:
if content_type == "application/pdf" or filename.lower().endswith(".pdf"):
text = await embedding_client.extract_pdf(file_bytes)
elif filename.lower().endswith((".html", ".htm")):
text = decode_html_bytes(file_bytes)
text = strip_html(text)
logger.info("Decoded + stripped HTML from %s", filename)
else:
text = file_bytes.decode("utf-8", errors="replace")
# Strip HTML if content looks like HTML despite extension
if looks_like_html(text):
text = strip_html(text)
logger.info("Stripped HTML tags from %s", filename)
except Exception as exc:
logger.error("Text extraction failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Text extraction failed: {exc}")
if not text or not text.strip():
raise HTTPException(status_code=400, detail="Could not extract any text from the document")
# --- Chunk ---
try:
chunk_result = await embedding_client.chunk_text(
text=text,
strategy=chunk_strategy,
chunk_size=chunk_size,
overlap=chunk_overlap,
)
except Exception as exc:
logger.error("Chunking failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Chunking failed: {exc}")
chunks = chunk_result.chunks
chunks_meta = chunk_result.chunks_with_metadata
if not chunks:
raise HTTPException(status_code=400, detail="Chunking produced zero chunks")
# --- Embed ---
try:
embeddings = await embedding_client.generate_embeddings(chunks)
except Exception as exc:
logger.error("Embedding generation failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Embedding generation failed: {exc}")
# --- Parse extra metadata ---
extra_metadata: dict = {}
if metadata_json:
import json
try:
extra_metadata = json.loads(metadata_json)
except json.JSONDecodeError:
logger.warning("Invalid metadata_json, ignoring")
# --- Build payloads (rag_reingest_spec.md §2/§3: zitierfaehige Legal-Metadaten) ---
reg_code = (
extra_metadata.get("regulation_code")
or extra_metadata.get("regulation_short")
or extra_metadata.get("regulation_id")
or ""
).strip()
payloads = []
ids = []
for i, chunk in enumerate(chunks):
meta = chunks_meta[i] if i < len(chunks_meta) else {}
legal = build_legal_fields(
meta,
reg_code,
chunk,
citation_style=extra_metadata.get("citation_style"),
display_name=extra_metadata.get("regulation_short"),
source_type=extra_metadata.get("source_type"),
)
payload = {
"document_id": document_id,
"document_version": document_version,
"object_name": object_name,
"filename": filename,
"chunk_index": i,
"chunk_text": chunk,
"chunk_hash": compute_chunk_hash(chunk),
"data_type": data_type,
"bundesland": bundesland,
"use_case": use_case,
"year": year,
**extra_metadata,
**{k: v for k, v in legal.items() if v not in (None, "")},
}
# Seite aus den Struktur-Metadaten uebernehmen (nicht Teil von legal)
if meta.get("page") not in (None, ""):
payload["page"] = meta["page"]
payloads.append(payload)
ids.append(
deterministic_point_id(reg_code, legal["article"], legal["paragraph"], i, document_version)
)
# --- Index in Qdrant (deterministische IDs fuer stabilen Re-Link) ---
try:
indexed = await qdrant_wrapper.index_documents(
collection=collection,
vectors=embeddings,
payloads=payloads,
ids=ids,
)
except Exception as exc:
logger.error("Qdrant indexing failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Qdrant indexing failed: {exc}")
return DocumentUploadResponse(
document_id=document_id,
object_name=object_name,
chunks_count=len(chunks),
vectors_indexed=indexed,
collection=collection,
)
@router.delete("")
async def delete_document(body: DocumentDeleteRequest, request: Request):
"""Delete a document from both MinIO and Qdrant."""
optional_jwt_auth(request)
errors: list[str] = []
# Delete from MinIO
try:
await minio_wrapper.delete_document(body.object_name)
except Exception as exc:
errors.append(f"MinIO delete failed: {exc}")
# Delete vectors from Qdrant
try:
await qdrant_wrapper.delete_by_filter(
collection=body.collection,
filter_conditions={"object_name": body.object_name},
)
except Exception as exc:
errors.append(f"Qdrant delete failed: {exc}")
if errors:
return {"deleted": False, "errors": errors}
return {"deleted": True, "object_name": body.object_name, "collection": body.collection}
@router.get("/list")
async def list_documents(
request: Request,
prefix: Optional[str] = None,
):
"""List documents stored in MinIO."""
optional_jwt_auth(request)
try:
docs = await minio_wrapper.list_documents(prefix=prefix)
return {"documents": docs, "count": len(docs)}
except Exception as exc:
logger.error("Failed to list documents: %s", exc)
raise HTTPException(status_code=500, detail=str(exc))
@router.get("/download/{object_name:path}")
async def download_document(object_name: str, request: Request):
"""Get a presigned download URL for a document."""
optional_jwt_auth(request)
try:
url = await minio_wrapper.get_presigned_url(object_name)
return {"url": url, "object_name": object_name}
except Exception as exc:
logger.error("Failed to generate presigned URL for '%s': %s", object_name, exc)
raise HTTPException(status_code=404, detail=f"Document not found: {exc}")
@router.get("/stats")
async def storage_stats(
request: Request,
prefix: Optional[str] = None,
):
"""Get storage stats (size, count) for a given prefix."""
optional_jwt_auth(request)
try:
stats = await minio_wrapper.get_storage_stats(prefix=prefix)
return stats
except Exception as exc:
logger.error("Failed to get storage stats: %s", exc)
raise HTTPException(status_code=500, detail=str(exc))