fix(rag): strip HTML tags before chunking + D5 re-ingestion scripts

HTML files from gesetze-im-internet.de were decoded as raw UTF-8, keeping
<div>/<p> tags intact. The legal chunker regex requires § at line start,
which never matched inside HTML tags → 0% section metadata for HTML docs.

Fix: detect HTML content and strip tags before sending to embedding
service. Block elements become newlines, entities are decoded.
§ signs now appear at line starts → section detection works.

Also adds D5 re-ingestion scripts (reingest_d5.py + config) for
batch re-processing of all documents in Qdrant collections.

27 rag-service tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-02 08:18:25 +02:00
parent 93099b2770
commit ddad58f607
5 changed files with 698 additions and 0 deletions
+447
View File
@@ -0,0 +1,447 @@
#!/usr/bin/env python3
"""
D5 Re-Ingestion: Re-chunk all ~297 legal sources with structural metadata.
Usage:
# Dry-run: build manifest, no changes
python3 scripts/reingest_d5.py --dry-run
# Re-ingest one collection (test)
python3 scripts/reingest_d5.py --collection bp_compliance_gesetze
# Re-ingest all collections (resume-capable)
python3 scripts/reingest_d5.py --resume
# Custom URLs
python3 scripts/reingest_d5.py --rag-url https://macmini:8097 --qdrant-url http://macmini:6333
"""
import argparse
import json
import logging
import random
import sys
import time
from datetime import datetime, timezone
import httpx
from reingest_d5_config import (
CHUNK_OVERLAP,
CHUNK_SIZE,
CHUNK_STRATEGY,
DEFAULT_QDRANT_URL,
DEFAULT_RAG_URL,
MANIFEST_FILE,
TARGET_COLLECTIONS,
content_type_from_filename,
doc_key,
extract_doc_metadata,
load_progress,
save_progress,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("d5-reingest")
UPLOAD_TIMEOUT = httpx.Timeout(timeout=1800.0, connect=30.0)
SCROLL_TIMEOUT = httpx.Timeout(timeout=60.0, connect=10.0)
# ---------------------------------------------------------------------------
# Phase 0: Preflight
# ---------------------------------------------------------------------------
def preflight_checks(rag_url: str, qdrant_url: str) -> dict:
"""Verify services are reachable and record baseline chunk counts."""
logger.info("Phase 0: Preflight checks...")
with httpx.Client(timeout=10.0, verify=False) as c:
r = c.get(f"{rag_url}/health")
r.raise_for_status()
logger.info(" RAG service: OK")
with httpx.Client(timeout=10.0) as c:
r = c.get(f"{qdrant_url}/collections")
r.raise_for_status()
logger.info(" Qdrant: OK")
before_counts = {}
with httpx.Client(timeout=10.0) as c:
for coll in TARGET_COLLECTIONS:
try:
r = c.post(f"{qdrant_url}/collections/{coll}/points/count",
json={"exact": True})
r.raise_for_status()
count = r.json()["result"]["count"]
except Exception:
count = 0
before_counts[coll] = count
logger.info(" %s: %d chunks", coll, count)
return before_counts
# ---------------------------------------------------------------------------
# Phase 1: Build manifest
# ---------------------------------------------------------------------------
def build_manifest(qdrant_url: str, collections: list[str]) -> list[dict]:
"""Scroll Qdrant and build a deduplicated document manifest."""
logger.info("Phase 1: Building document manifest...")
documents: dict[str, dict] = {} # keyed by doc_key(object_name, collection)
with httpx.Client(timeout=SCROLL_TIMEOUT) as client:
for coll in collections:
logger.info(" Scrolling %s...", coll)
offset = None
points_seen = 0
while True:
body: dict = {
"limit": 250,
"with_payload": True,
"with_vector": False,
}
if offset:
body["offset"] = offset
resp = client.post(
f"{qdrant_url}/collections/{coll}/points/scroll",
json=body,
)
resp.raise_for_status()
data = resp.json()["result"]
points = data["points"]
for pt in points:
payload = pt.get("payload", {})
obj_name = payload.get("object_name", "")
if not obj_name:
continue
key = doc_key(obj_name, coll)
if key not in documents:
meta = extract_doc_metadata(payload)
documents[key] = {
"object_name": obj_name,
"collection": coll,
"filename": payload.get("filename", obj_name.split("/")[-1]),
"form": meta["form"],
"extra_metadata": meta["extra"],
"old_chunk_count": 0,
}
documents[key]["old_chunk_count"] += 1
points_seen += len(points)
offset = data.get("next_page_offset")
if not offset:
break
logger.info(" %d points → %d unique docs",
points_seen,
sum(1 for d in documents.values() if d["collection"] == coll))
manifest = list(documents.values())
logger.info(" Total: %d unique documents across %d collections",
len(manifest), len(collections))
return manifest
# ---------------------------------------------------------------------------
# Phase 2: Per-document re-ingestion
# ---------------------------------------------------------------------------
def download_file(rag_url: str, object_name: str) -> bytes:
"""Download file bytes via MinIO presigned URL."""
with httpx.Client(timeout=60.0, verify=False) as c:
resp = c.get(f"{rag_url}/api/v1/documents/download/{object_name}")
resp.raise_for_status()
presigned_url = resp.json()["url"]
with httpx.Client(timeout=120.0, verify=False) as c:
resp = c.get(presigned_url)
resp.raise_for_status()
return resp.content
def delete_old_chunks(qdrant_url: str, collection: str, object_name: str) -> int:
"""Delete all chunks for a document from Qdrant. Returns estimated count."""
with httpx.Client(timeout=30.0) as c:
resp = c.post(
f"{qdrant_url}/collections/{collection}/points/delete",
json={
"filter": {
"must": [{
"key": "object_name",
"match": {"value": object_name},
}]
}
},
)
resp.raise_for_status()
return 0 # Qdrant delete doesn't return count
def reupload_document(
rag_url: str,
file_bytes: bytes,
filename: str,
collection: str,
form_fields: dict,
extra_metadata: dict,
) -> dict:
"""Upload document to RAG service with new chunking parameters."""
ct = content_type_from_filename(filename)
form_data = {
"collection": collection,
"data_type": form_fields.get("data_type", "compliance"),
"bundesland": form_fields.get("bundesland", "bund"),
"use_case": form_fields.get("use_case", "compliance"),
"year": form_fields.get("year", "2026"),
"chunk_strategy": CHUNK_STRATEGY,
"chunk_size": str(CHUNK_SIZE),
"chunk_overlap": str(CHUNK_OVERLAP),
"metadata_json": json.dumps(extra_metadata, ensure_ascii=False),
}
with httpx.Client(timeout=UPLOAD_TIMEOUT, verify=False) as c:
resp = c.post(
f"{rag_url}/api/v1/documents/upload",
files={"file": (filename, file_bytes, ct)},
data=form_data,
)
resp.raise_for_status()
return resp.json()
def process_document(
doc: dict,
rag_url: str,
qdrant_url: str,
progress: dict,
max_retries: int = 2,
) -> bool:
"""Process a single document: download → delete → re-upload. Returns success."""
key = doc_key(doc["object_name"], doc["collection"])
# Skip if already done
if progress.get("documents", {}).get(key, {}).get("status") == "done":
return True
for attempt in range(max_retries + 1):
try:
# 1. Download
file_bytes = download_file(rag_url, doc["object_name"])
if not file_bytes:
logger.warning(" Empty file: %s — skipping", doc["object_name"])
progress.setdefault("documents", {})[key] = {
"status": "skipped", "reason": "empty_file"}
return False
# 2. Delete old chunks
delete_old_chunks(qdrant_url, doc["collection"], doc["object_name"])
# 3. Re-upload
result = reupload_document(
rag_url, file_bytes, doc["filename"],
doc["collection"], doc["form"], doc["extra_metadata"],
)
# 4. Record success
progress.setdefault("documents", {})[key] = {
"status": "done",
"old_chunks": doc["old_chunk_count"],
"new_chunks": result.get("chunks_count", 0),
"new_document_id": result.get("document_id", ""),
"completed_at": datetime.now(timezone.utc).isoformat(),
}
return True
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.warning(" File not in MinIO (404): %s — skipping", doc["object_name"])
progress.setdefault("documents", {})[key] = {
"status": "skipped", "reason": "not_in_minio"}
return False
if attempt < max_retries:
wait = 5 * (attempt + 1)
logger.warning(" HTTP %d on attempt %d, retrying in %ds...",
e.response.status_code, attempt + 1, wait)
time.sleep(wait)
else:
logger.error(" FAILED after %d retries: %s", max_retries, e)
progress.setdefault("documents", {})[key] = {
"status": "error", "error": str(e), "retries": max_retries}
return False
except Exception as e:
if attempt < max_retries:
wait = 10 * (attempt + 1)
logger.warning(" Error on attempt %d: %s — retrying in %ds",
attempt + 1, e, wait)
time.sleep(wait)
else:
logger.error(" FAILED after %d retries: %s", max_retries, e)
progress.setdefault("documents", {})[key] = {
"status": "error", "error": str(e), "retries": max_retries}
return False
return False
# ---------------------------------------------------------------------------
# Phase 3: Verification
# ---------------------------------------------------------------------------
def verify_results(
qdrant_url: str,
before_counts: dict,
collections: list[str],
manifest: list[dict],
):
"""Compare before/after counts and spot-check metadata."""
logger.info("Phase 3: Verification...")
print("\n" + "=" * 65)
print("D5 RE-INGESTION VERIFICATION REPORT")
print("=" * 65)
after_counts = {}
with httpx.Client(timeout=10.0) as c:
for coll in collections:
try:
r = c.post(f"{qdrant_url}/collections/{coll}/points/count",
json={"exact": True})
r.raise_for_status()
after_counts[coll] = r.json()["result"]["count"]
except Exception:
after_counts[coll] = -1
print(f"\n{'Collection':<35} {'Before':>8} {'After':>8} {'Delta':>8}")
print("-" * 65)
for coll in collections:
before = before_counts.get(coll, 0)
after = after_counts.get(coll, -1)
delta = after - before if after >= 0 else "?"
print(f"{coll:<35} {before:>8} {after:>8} {str(delta):>8}")
# Spot-check: pick 3 random docs and verify metadata
print("\nSpot-check (3 random docs):")
sample = random.sample(manifest, min(3, len(manifest)))
with httpx.Client(timeout=30.0) as c:
for doc in sample:
resp = c.post(
f"{qdrant_url}/collections/{doc['collection']}/points/scroll",
json={
"limit": 3,
"with_payload": True,
"with_vector": False,
"filter": {
"must": [{
"key": "object_name",
"match": {"value": doc["object_name"]},
}]
},
},
)
if resp.status_code != 200:
print(f" {doc['object_name']}: QUERY FAILED")
continue
points = resp.json()["result"]["points"]
if not points:
print(f" {doc['object_name']}: NO CHUNKS FOUND")
continue
has_section = sum(1 for p in points if p["payload"].get("section"))
has_para = sum(1 for p in points if p["payload"].get("paragraph"))
print(f" {doc['filename'][:40]:<42} "
f"chunks={len(points):>3} "
f"with_section={has_section}/{len(points)} "
f"with_para={has_para}/{len(points)}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="D5 Re-Ingestion Script")
parser.add_argument("--rag-url", default=DEFAULT_RAG_URL)
parser.add_argument("--qdrant-url", default=DEFAULT_QDRANT_URL)
parser.add_argument("--dry-run", action="store_true",
help="Build manifest only, no changes")
parser.add_argument("--collection", default=None,
help="Process only this collection")
parser.add_argument("--resume", action="store_true",
help="Resume from progress file")
args = parser.parse_args()
collections = [args.collection] if args.collection else TARGET_COLLECTIONS
# Phase 0
before_counts = preflight_checks(args.rag_url, args.qdrant_url)
# Phase 1
manifest = build_manifest(args.qdrant_url, collections)
# Save manifest for inspection
with open(MANIFEST_FILE, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2, ensure_ascii=False)
logger.info("Manifest saved to %s", MANIFEST_FILE)
if args.dry_run:
print(f"\nDRY RUN: {len(manifest)} documents found. See {MANIFEST_FILE}")
for doc in manifest[:10]:
reg = doc["extra_metadata"].get("regulation_code", "?")
print(f" {reg:<30} {doc['collection']:<35} chunks={doc['old_chunk_count']}")
if len(manifest) > 10:
print(f" ... and {len(manifest) - 10} more")
sys.exit(0)
# Phase 2
progress = load_progress() if args.resume else {"documents": {}}
progress["started_at"] = datetime.now(timezone.utc).isoformat()
progress["before_counts"] = before_counts
done = 0
skipped = 0
failed = 0
for i, doc in enumerate(manifest, 1):
key = doc_key(doc["object_name"], doc["collection"])
reg = doc["extra_metadata"].get("regulation_code", "?")
if progress.get("documents", {}).get(key, {}).get("status") == "done":
done += 1
continue
logger.info("[%d/%d] %s (%s) — %d old chunks",
i, len(manifest), reg, doc["collection"], doc["old_chunk_count"])
ok = process_document(doc, args.rag_url, args.qdrant_url, progress)
if ok:
done += 1
new_chunks = progress["documents"][key].get("new_chunks", "?")
logger.info(" OK: %d old → %s new chunks", doc["old_chunk_count"], new_chunks)
elif progress["documents"][key].get("status") == "skipped":
skipped += 1
else:
failed += 1
save_progress(progress)
time.sleep(2)
logger.info("Phase 2 complete: %d done, %d skipped, %d failed", done, skipped, failed)
# Phase 3
verify_results(args.qdrant_url, before_counts, collections, manifest)
print(f"Summary: {done} done, {skipped} skipped, {failed} failed")
if failed:
print(f"Re-run with --resume to retry {failed} failed documents")
sys.exit(1)
if __name__ == "__main__":
main()
@@ -0,0 +1,92 @@
"""D5 Re-Ingestion: Constants, helpers, progress tracking."""
import json
import logging
import os
logger = logging.getLogger("d5-reingest")
# ---------------------------------------------------------------------------
# Defaults (overridable via CLI args)
# ---------------------------------------------------------------------------
DEFAULT_RAG_URL = "https://macmini:8097"
DEFAULT_QDRANT_URL = "http://macmini:6333"
TARGET_COLLECTIONS = [
"bp_compliance_ce",
"bp_compliance_gesetze",
"bp_compliance_datenschutz",
"bp_dsfa_corpus",
"bp_legal_templates",
"bp_compliance_schulrecht",
]
# New chunking parameters (D1-D4 validated)
CHUNK_STRATEGY = "recursive"
CHUNK_SIZE = 1500
CHUNK_OVERLAP = 100
PROGRESS_FILE = "d5_reingest_progress.json"
MANIFEST_FILE = "d5_manifest.json"
# Per-chunk fields (NOT carried as extra metadata during re-upload)
PER_CHUNK_FIELDS = frozenset({
"chunk_text", "chunk_index", "document_id", "object_name",
"filename", "data_type", "bundesland", "use_case", "year",
"section", "section_title", "paragraph", "paragraph_num", "page",
})
# Upload form fields that come from the payload (not metadata_json)
FORM_FIELDS = frozenset({"data_type", "bundesland", "use_case", "year"})
# ---------------------------------------------------------------------------
# Progress tracking
# ---------------------------------------------------------------------------
def load_progress(path: str = PROGRESS_FILE) -> dict:
if os.path.exists(path):
with open(path, encoding="utf-8") as f:
return json.load(f)
return {"documents": {}}
def save_progress(data: dict, path: str = PROGRESS_FILE):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False, default=str)
# ---------------------------------------------------------------------------
# Metadata extraction
# ---------------------------------------------------------------------------
def extract_doc_metadata(payload: dict) -> dict:
"""Split Qdrant payload into form fields + extra metadata.
Returns: {"form": {data_type, bundesland, ...}, "extra": {regulation_code, ...}}
"""
form = {}
extra = {}
for k, v in payload.items():
if k in PER_CHUNK_FIELDS:
continue
if k in FORM_FIELDS:
form[k] = v
else:
extra[k] = v
return {"form": form, "extra": extra}
def doc_key(object_name: str, collection: str) -> str:
"""Unique key for a document in the progress file."""
return f"{object_name}|{collection}"
def content_type_from_filename(filename: str) -> str:
"""Infer MIME type from file extension."""
ext = os.path.splitext(filename)[1].lower()
return {
".pdf": "application/pdf",
".html": "text/html",
".htm": "text/html",
".md": "text/markdown",
".txt": "text/plain",
}.get(ext, "application/octet-stream")