diff --git a/control-pipeline/scripts/reingest_d5.py b/control-pipeline/scripts/reingest_d5.py
new file mode 100644
index 0000000..cd40b5f
--- /dev/null
+++ b/control-pipeline/scripts/reingest_d5.py
@@ -0,0 +1,447 @@
+#!/usr/bin/env python3
+"""
+D5 Re-Ingestion: Re-chunk all ~297 legal sources with structural metadata.
+
+Usage:
+ # Dry-run: build manifest, no changes
+ python3 scripts/reingest_d5.py --dry-run
+
+ # Re-ingest one collection (test)
+ python3 scripts/reingest_d5.py --collection bp_compliance_gesetze
+
+ # Re-ingest all collections (resume-capable)
+ python3 scripts/reingest_d5.py --resume
+
+ # Custom URLs
+ python3 scripts/reingest_d5.py --rag-url https://macmini:8097 --qdrant-url http://macmini:6333
+"""
+
+import argparse
+import json
+import logging
+import random
+import sys
+import time
+from datetime import datetime, timezone
+
+import httpx
+
+from reingest_d5_config import (
+ CHUNK_OVERLAP,
+ CHUNK_SIZE,
+ CHUNK_STRATEGY,
+ DEFAULT_QDRANT_URL,
+ DEFAULT_RAG_URL,
+ MANIFEST_FILE,
+ TARGET_COLLECTIONS,
+ content_type_from_filename,
+ doc_key,
+ extract_doc_metadata,
+ load_progress,
+ save_progress,
+)
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
+logger = logging.getLogger("d5-reingest")
+
+UPLOAD_TIMEOUT = httpx.Timeout(timeout=1800.0, connect=30.0)
+SCROLL_TIMEOUT = httpx.Timeout(timeout=60.0, connect=10.0)
+
+
+# ---------------------------------------------------------------------------
+# Phase 0: Preflight
+# ---------------------------------------------------------------------------
+def preflight_checks(rag_url: str, qdrant_url: str) -> dict:
+ """Verify services are reachable and record baseline chunk counts."""
+ logger.info("Phase 0: Preflight checks...")
+
+ with httpx.Client(timeout=10.0, verify=False) as c:
+ r = c.get(f"{rag_url}/health")
+ r.raise_for_status()
+ logger.info(" RAG service: OK")
+
+ with httpx.Client(timeout=10.0) as c:
+ r = c.get(f"{qdrant_url}/collections")
+ r.raise_for_status()
+ logger.info(" Qdrant: OK")
+
+ before_counts = {}
+ with httpx.Client(timeout=10.0) as c:
+ for coll in TARGET_COLLECTIONS:
+ try:
+ r = c.post(f"{qdrant_url}/collections/{coll}/points/count",
+ json={"exact": True})
+ r.raise_for_status()
+ count = r.json()["result"]["count"]
+ except Exception:
+ count = 0
+ before_counts[coll] = count
+ logger.info(" %s: %d chunks", coll, count)
+
+ return before_counts
+
+
+# ---------------------------------------------------------------------------
+# Phase 1: Build manifest
+# ---------------------------------------------------------------------------
+def build_manifest(qdrant_url: str, collections: list[str]) -> list[dict]:
+ """Scroll Qdrant and build a deduplicated document manifest."""
+ logger.info("Phase 1: Building document manifest...")
+ documents: dict[str, dict] = {} # keyed by doc_key(object_name, collection)
+
+ with httpx.Client(timeout=SCROLL_TIMEOUT) as client:
+ for coll in collections:
+ logger.info(" Scrolling %s...", coll)
+ offset = None
+ points_seen = 0
+
+ while True:
+ body: dict = {
+ "limit": 250,
+ "with_payload": True,
+ "with_vector": False,
+ }
+ if offset:
+ body["offset"] = offset
+
+ resp = client.post(
+ f"{qdrant_url}/collections/{coll}/points/scroll",
+ json=body,
+ )
+ resp.raise_for_status()
+ data = resp.json()["result"]
+ points = data["points"]
+
+ for pt in points:
+ payload = pt.get("payload", {})
+ obj_name = payload.get("object_name", "")
+ if not obj_name:
+ continue
+
+ key = doc_key(obj_name, coll)
+ if key not in documents:
+ meta = extract_doc_metadata(payload)
+ documents[key] = {
+ "object_name": obj_name,
+ "collection": coll,
+ "filename": payload.get("filename", obj_name.split("/")[-1]),
+ "form": meta["form"],
+ "extra_metadata": meta["extra"],
+ "old_chunk_count": 0,
+ }
+ documents[key]["old_chunk_count"] += 1
+
+ points_seen += len(points)
+ offset = data.get("next_page_offset")
+ if not offset:
+ break
+
+ logger.info(" %d points → %d unique docs",
+ points_seen,
+ sum(1 for d in documents.values() if d["collection"] == coll))
+
+ manifest = list(documents.values())
+ logger.info(" Total: %d unique documents across %d collections",
+ len(manifest), len(collections))
+ return manifest
+
+
+# ---------------------------------------------------------------------------
+# Phase 2: Per-document re-ingestion
+# ---------------------------------------------------------------------------
+def download_file(rag_url: str, object_name: str) -> bytes:
+ """Download file bytes via MinIO presigned URL."""
+ with httpx.Client(timeout=60.0, verify=False) as c:
+ resp = c.get(f"{rag_url}/api/v1/documents/download/{object_name}")
+ resp.raise_for_status()
+ presigned_url = resp.json()["url"]
+
+ with httpx.Client(timeout=120.0, verify=False) as c:
+ resp = c.get(presigned_url)
+ resp.raise_for_status()
+ return resp.content
+
+
+def delete_old_chunks(qdrant_url: str, collection: str, object_name: str) -> int:
+ """Delete all chunks for a document from Qdrant. Returns estimated count."""
+ with httpx.Client(timeout=30.0) as c:
+ resp = c.post(
+ f"{qdrant_url}/collections/{collection}/points/delete",
+ json={
+ "filter": {
+ "must": [{
+ "key": "object_name",
+ "match": {"value": object_name},
+ }]
+ }
+ },
+ )
+ resp.raise_for_status()
+ return 0 # Qdrant delete doesn't return count
+
+
+def reupload_document(
+ rag_url: str,
+ file_bytes: bytes,
+ filename: str,
+ collection: str,
+ form_fields: dict,
+ extra_metadata: dict,
+) -> dict:
+ """Upload document to RAG service with new chunking parameters."""
+ ct = content_type_from_filename(filename)
+
+ form_data = {
+ "collection": collection,
+ "data_type": form_fields.get("data_type", "compliance"),
+ "bundesland": form_fields.get("bundesland", "bund"),
+ "use_case": form_fields.get("use_case", "compliance"),
+ "year": form_fields.get("year", "2026"),
+ "chunk_strategy": CHUNK_STRATEGY,
+ "chunk_size": str(CHUNK_SIZE),
+ "chunk_overlap": str(CHUNK_OVERLAP),
+ "metadata_json": json.dumps(extra_metadata, ensure_ascii=False),
+ }
+
+ with httpx.Client(timeout=UPLOAD_TIMEOUT, verify=False) as c:
+ resp = c.post(
+ f"{rag_url}/api/v1/documents/upload",
+ files={"file": (filename, file_bytes, ct)},
+ data=form_data,
+ )
+ resp.raise_for_status()
+ return resp.json()
+
+
+def process_document(
+ doc: dict,
+ rag_url: str,
+ qdrant_url: str,
+ progress: dict,
+ max_retries: int = 2,
+) -> bool:
+ """Process a single document: download → delete → re-upload. Returns success."""
+ key = doc_key(doc["object_name"], doc["collection"])
+
+ # Skip if already done
+ if progress.get("documents", {}).get(key, {}).get("status") == "done":
+ return True
+
+ for attempt in range(max_retries + 1):
+ try:
+ # 1. Download
+ file_bytes = download_file(rag_url, doc["object_name"])
+ if not file_bytes:
+ logger.warning(" Empty file: %s — skipping", doc["object_name"])
+ progress.setdefault("documents", {})[key] = {
+ "status": "skipped", "reason": "empty_file"}
+ return False
+
+ # 2. Delete old chunks
+ delete_old_chunks(qdrant_url, doc["collection"], doc["object_name"])
+
+ # 3. Re-upload
+ result = reupload_document(
+ rag_url, file_bytes, doc["filename"],
+ doc["collection"], doc["form"], doc["extra_metadata"],
+ )
+
+ # 4. Record success
+ progress.setdefault("documents", {})[key] = {
+ "status": "done",
+ "old_chunks": doc["old_chunk_count"],
+ "new_chunks": result.get("chunks_count", 0),
+ "new_document_id": result.get("document_id", ""),
+ "completed_at": datetime.now(timezone.utc).isoformat(),
+ }
+ return True
+
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == 404:
+ logger.warning(" File not in MinIO (404): %s — skipping", doc["object_name"])
+ progress.setdefault("documents", {})[key] = {
+ "status": "skipped", "reason": "not_in_minio"}
+ return False
+ if attempt < max_retries:
+ wait = 5 * (attempt + 1)
+ logger.warning(" HTTP %d on attempt %d, retrying in %ds...",
+ e.response.status_code, attempt + 1, wait)
+ time.sleep(wait)
+ else:
+ logger.error(" FAILED after %d retries: %s", max_retries, e)
+ progress.setdefault("documents", {})[key] = {
+ "status": "error", "error": str(e), "retries": max_retries}
+ return False
+
+ except Exception as e:
+ if attempt < max_retries:
+ wait = 10 * (attempt + 1)
+ logger.warning(" Error on attempt %d: %s — retrying in %ds",
+ attempt + 1, e, wait)
+ time.sleep(wait)
+ else:
+ logger.error(" FAILED after %d retries: %s", max_retries, e)
+ progress.setdefault("documents", {})[key] = {
+ "status": "error", "error": str(e), "retries": max_retries}
+ return False
+
+ return False
+
+
+# ---------------------------------------------------------------------------
+# Phase 3: Verification
+# ---------------------------------------------------------------------------
+def verify_results(
+ qdrant_url: str,
+ before_counts: dict,
+ collections: list[str],
+ manifest: list[dict],
+):
+ """Compare before/after counts and spot-check metadata."""
+ logger.info("Phase 3: Verification...")
+
+ print("\n" + "=" * 65)
+ print("D5 RE-INGESTION VERIFICATION REPORT")
+ print("=" * 65)
+
+ after_counts = {}
+ with httpx.Client(timeout=10.0) as c:
+ for coll in collections:
+ try:
+ r = c.post(f"{qdrant_url}/collections/{coll}/points/count",
+ json={"exact": True})
+ r.raise_for_status()
+ after_counts[coll] = r.json()["result"]["count"]
+ except Exception:
+ after_counts[coll] = -1
+
+ print(f"\n{'Collection':<35} {'Before':>8} {'After':>8} {'Delta':>8}")
+ print("-" * 65)
+ for coll in collections:
+ before = before_counts.get(coll, 0)
+ after = after_counts.get(coll, -1)
+ delta = after - before if after >= 0 else "?"
+ print(f"{coll:<35} {before:>8} {after:>8} {str(delta):>8}")
+
+ # Spot-check: pick 3 random docs and verify metadata
+ print("\nSpot-check (3 random docs):")
+ sample = random.sample(manifest, min(3, len(manifest)))
+
+ with httpx.Client(timeout=30.0) as c:
+ for doc in sample:
+ resp = c.post(
+ f"{qdrant_url}/collections/{doc['collection']}/points/scroll",
+ json={
+ "limit": 3,
+ "with_payload": True,
+ "with_vector": False,
+ "filter": {
+ "must": [{
+ "key": "object_name",
+ "match": {"value": doc["object_name"]},
+ }]
+ },
+ },
+ )
+ if resp.status_code != 200:
+ print(f" {doc['object_name']}: QUERY FAILED")
+ continue
+
+ points = resp.json()["result"]["points"]
+ if not points:
+ print(f" {doc['object_name']}: NO CHUNKS FOUND")
+ continue
+
+ has_section = sum(1 for p in points if p["payload"].get("section"))
+ has_para = sum(1 for p in points if p["payload"].get("paragraph"))
+ print(f" {doc['filename'][:40]:<42} "
+ f"chunks={len(points):>3} "
+ f"with_section={has_section}/{len(points)} "
+ f"with_para={has_para}/{len(points)}")
+
+ print()
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+def main():
+ parser = argparse.ArgumentParser(description="D5 Re-Ingestion Script")
+ parser.add_argument("--rag-url", default=DEFAULT_RAG_URL)
+ parser.add_argument("--qdrant-url", default=DEFAULT_QDRANT_URL)
+ parser.add_argument("--dry-run", action="store_true",
+ help="Build manifest only, no changes")
+ parser.add_argument("--collection", default=None,
+ help="Process only this collection")
+ parser.add_argument("--resume", action="store_true",
+ help="Resume from progress file")
+ args = parser.parse_args()
+
+ collections = [args.collection] if args.collection else TARGET_COLLECTIONS
+
+ # Phase 0
+ before_counts = preflight_checks(args.rag_url, args.qdrant_url)
+
+ # Phase 1
+ manifest = build_manifest(args.qdrant_url, collections)
+
+ # Save manifest for inspection
+ with open(MANIFEST_FILE, "w", encoding="utf-8") as f:
+ json.dump(manifest, f, indent=2, ensure_ascii=False)
+ logger.info("Manifest saved to %s", MANIFEST_FILE)
+
+ if args.dry_run:
+ print(f"\nDRY RUN: {len(manifest)} documents found. See {MANIFEST_FILE}")
+ for doc in manifest[:10]:
+ reg = doc["extra_metadata"].get("regulation_code", "?")
+ print(f" {reg:<30} {doc['collection']:<35} chunks={doc['old_chunk_count']}")
+ if len(manifest) > 10:
+ print(f" ... and {len(manifest) - 10} more")
+ sys.exit(0)
+
+ # Phase 2
+ progress = load_progress() if args.resume else {"documents": {}}
+ progress["started_at"] = datetime.now(timezone.utc).isoformat()
+ progress["before_counts"] = before_counts
+
+ done = 0
+ skipped = 0
+ failed = 0
+
+ for i, doc in enumerate(manifest, 1):
+ key = doc_key(doc["object_name"], doc["collection"])
+ reg = doc["extra_metadata"].get("regulation_code", "?")
+
+ if progress.get("documents", {}).get(key, {}).get("status") == "done":
+ done += 1
+ continue
+
+ logger.info("[%d/%d] %s (%s) — %d old chunks",
+ i, len(manifest), reg, doc["collection"], doc["old_chunk_count"])
+
+ ok = process_document(doc, args.rag_url, args.qdrant_url, progress)
+ if ok:
+ done += 1
+ new_chunks = progress["documents"][key].get("new_chunks", "?")
+ logger.info(" OK: %d old → %s new chunks", doc["old_chunk_count"], new_chunks)
+ elif progress["documents"][key].get("status") == "skipped":
+ skipped += 1
+ else:
+ failed += 1
+
+ save_progress(progress)
+ time.sleep(2)
+
+ logger.info("Phase 2 complete: %d done, %d skipped, %d failed", done, skipped, failed)
+
+ # Phase 3
+ verify_results(args.qdrant_url, before_counts, collections, manifest)
+
+ print(f"Summary: {done} done, {skipped} skipped, {failed} failed")
+ if failed:
+ print(f"Re-run with --resume to retry {failed} failed documents")
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/control-pipeline/scripts/reingest_d5_config.py b/control-pipeline/scripts/reingest_d5_config.py
new file mode 100644
index 0000000..d0b2c1b
--- /dev/null
+++ b/control-pipeline/scripts/reingest_d5_config.py
@@ -0,0 +1,92 @@
+"""D5 Re-Ingestion: Constants, helpers, progress tracking."""
+
+import json
+import logging
+import os
+
+logger = logging.getLogger("d5-reingest")
+
+# ---------------------------------------------------------------------------
+# Defaults (overridable via CLI args)
+# ---------------------------------------------------------------------------
+DEFAULT_RAG_URL = "https://macmini:8097"
+DEFAULT_QDRANT_URL = "http://macmini:6333"
+
+TARGET_COLLECTIONS = [
+ "bp_compliance_ce",
+ "bp_compliance_gesetze",
+ "bp_compliance_datenschutz",
+ "bp_dsfa_corpus",
+ "bp_legal_templates",
+ "bp_compliance_schulrecht",
+]
+
+# New chunking parameters (D1-D4 validated)
+CHUNK_STRATEGY = "recursive"
+CHUNK_SIZE = 1500
+CHUNK_OVERLAP = 100
+
+PROGRESS_FILE = "d5_reingest_progress.json"
+MANIFEST_FILE = "d5_manifest.json"
+
+# Per-chunk fields (NOT carried as extra metadata during re-upload)
+PER_CHUNK_FIELDS = frozenset({
+ "chunk_text", "chunk_index", "document_id", "object_name",
+ "filename", "data_type", "bundesland", "use_case", "year",
+ "section", "section_title", "paragraph", "paragraph_num", "page",
+})
+
+# Upload form fields that come from the payload (not metadata_json)
+FORM_FIELDS = frozenset({"data_type", "bundesland", "use_case", "year"})
+
+
+# ---------------------------------------------------------------------------
+# Progress tracking
+# ---------------------------------------------------------------------------
+def load_progress(path: str = PROGRESS_FILE) -> dict:
+ if os.path.exists(path):
+ with open(path, encoding="utf-8") as f:
+ return json.load(f)
+ return {"documents": {}}
+
+
+def save_progress(data: dict, path: str = PROGRESS_FILE):
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(data, f, indent=2, ensure_ascii=False, default=str)
+
+
+# ---------------------------------------------------------------------------
+# Metadata extraction
+# ---------------------------------------------------------------------------
+def extract_doc_metadata(payload: dict) -> dict:
+ """Split Qdrant payload into form fields + extra metadata.
+
+ Returns: {"form": {data_type, bundesland, ...}, "extra": {regulation_code, ...}}
+ """
+ form = {}
+ extra = {}
+ for k, v in payload.items():
+ if k in PER_CHUNK_FIELDS:
+ continue
+ if k in FORM_FIELDS:
+ form[k] = v
+ else:
+ extra[k] = v
+ return {"form": form, "extra": extra}
+
+
+def doc_key(object_name: str, collection: str) -> str:
+ """Unique key for a document in the progress file."""
+ return f"{object_name}|{collection}"
+
+
+def content_type_from_filename(filename: str) -> str:
+ """Infer MIME type from file extension."""
+ ext = os.path.splitext(filename)[1].lower()
+ return {
+ ".pdf": "application/pdf",
+ ".html": "text/html",
+ ".htm": "text/html",
+ ".md": "text/markdown",
+ ".txt": "text/plain",
+ }.get(ext, "application/octet-stream")
diff --git a/rag-service/api/documents.py b/rag-service/api/documents.py
index c9bb6f6..a8170d1 100644
--- a/rag-service/api/documents.py
+++ b/rag-service/api/documents.py
@@ -7,6 +7,7 @@ from pydantic import BaseModel
from api.auth import optional_jwt_auth
from embedding_client import embedding_client
+from html_utils import looks_like_html, strip_html
from minio_client_wrapper import minio_wrapper
from qdrant_client_wrapper import qdrant_wrapper
@@ -111,6 +112,11 @@ async def upload_document(
if not text or not text.strip():
raise HTTPException(status_code=400, detail="Could not extract any text from the document")
+ # --- Strip HTML if detected ---
+ if looks_like_html(text):
+ text = strip_html(text)
+ logger.info("Stripped HTML tags from %s", filename)
+
# --- Chunk ---
try:
chunk_result = await embedding_client.chunk_text(
diff --git a/rag-service/html_utils.py b/rag-service/html_utils.py
new file mode 100644
index 0000000..3fa9bd4
--- /dev/null
+++ b/rag-service/html_utils.py
@@ -0,0 +1,31 @@
+"""HTML detection and stripping for legal document ingestion."""
+
+import re
+from html import unescape
+
+_HTML_TAG_RE = re.compile(r'<(html|head|body|div|p|span|table)\b', re.IGNORECASE)
+
+
+def looks_like_html(text: str) -> bool:
+ """Check if text contains HTML tags."""
+ return bool(_HTML_TAG_RE.search(text[:500]))
+
+
+def strip_html(html_text: str) -> str:
+ """Convert HTML to plain text preserving legal document structure."""
+ text = html_text
+ # Remove script/style blocks
+ text = re.sub(r'<(script|style)[^>]*>.*?\1>', '', text, flags=re.DOTALL | re.IGNORECASE)
+ # Block elements → newline (preserves § paragraph structure)
+ text = re.sub(
+ r'(div|p|h[1-6]|li|tr|section|article|blockquote)>',
+ '\n', text, flags=re.IGNORECASE,
+ )
+ text = re.sub(r'
', '\n', text, flags=re.IGNORECASE)
+ # Strip remaining tags
+ text = re.sub(r'<[^>]+>', '', text)
+ # Decode HTML entities (ö → ö, § → §)
+ text = unescape(text)
+ # Clean up excessive whitespace
+ text = re.sub(r'\n{3,}', '\n\n', text)
+ return text.strip()
diff --git a/rag-service/tests/test_html_stripping.py b/rag-service/tests/test_html_stripping.py
new file mode 100644
index 0000000..159469d
--- /dev/null
+++ b/rag-service/tests/test_html_stripping.py
@@ -0,0 +1,122 @@
+"""Tests for HTML detection and stripping in document upload."""
+
+from html_utils import looks_like_html as _looks_like_html, strip_html as _strip_html
+
+
+class TestLooksLikeHtml:
+
+ def test_html_document(self):
+ assert _looks_like_html("
Text
") + + def test_html_div(self): + assert _looks_like_html('Absatz 1
Absatz 2
" + result = _strip_html(html) + assert "Absatz 1" in result + assert "Absatz 2" in result + # Paragraphs should be on separate lines + lines = [ln.strip() for ln in result.split("\n") if ln.strip()] + assert len(lines) >= 2 + + def test_preserves_section_headers(self): + """§ signs must be at line starts after stripping.""" + html = '§ 1 Text
' + result = _strip_html(html) + assert "color" not in result + assert "alert" not in result + assert "§ 1 Text" in result + + def test_br_becomes_newline(self): + html = "Zeile 1