#!/usr/bin/env python3 """ D5 Re-Ingestion: Re-chunk all ~297 legal sources with structural metadata. Usage: # Dry-run: build manifest, no changes python3 scripts/reingest_d5.py --dry-run # Re-ingest one collection (test) python3 scripts/reingest_d5.py --collection bp_compliance_gesetze # Re-ingest all collections (resume-capable) python3 scripts/reingest_d5.py --resume # Custom URLs python3 scripts/reingest_d5.py --rag-url https://macmini:8097 --qdrant-url http://macmini:6333 """ import argparse import json import logging import random import sys import time from datetime import datetime, timezone import httpx from reingest_d5_config import ( CHUNK_OVERLAP, CHUNK_SIZE, CHUNK_STRATEGY, DEFAULT_QDRANT_URL, DEFAULT_RAG_URL, MANIFEST_FILE, TARGET_COLLECTIONS, content_type_from_filename, doc_key, extract_doc_metadata, load_progress, save_progress, ) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("d5-reingest") UPLOAD_TIMEOUT = httpx.Timeout(timeout=3600.0, connect=30.0) SCROLL_TIMEOUT = httpx.Timeout(timeout=60.0, connect=10.0) # --------------------------------------------------------------------------- # Phase 0: Preflight # --------------------------------------------------------------------------- def preflight_checks(rag_url: str, qdrant_url: str) -> dict: """Verify services are reachable and record baseline chunk counts.""" logger.info("Phase 0: Preflight checks...") with httpx.Client(timeout=10.0, verify=False) as c: r = c.get(f"{rag_url}/health") r.raise_for_status() logger.info(" RAG service: OK") with httpx.Client(timeout=10.0) as c: r = c.get(f"{qdrant_url}/collections") r.raise_for_status() logger.info(" Qdrant: OK") before_counts = {} with httpx.Client(timeout=10.0) as c: for coll in TARGET_COLLECTIONS: try: r = c.post(f"{qdrant_url}/collections/{coll}/points/count", json={"exact": True}) r.raise_for_status() count = r.json()["result"]["count"] except Exception: count = 0 before_counts[coll] = count logger.info(" %s: %d chunks", coll, count) return before_counts # --------------------------------------------------------------------------- # Phase 1: Build manifest # --------------------------------------------------------------------------- def build_manifest(qdrant_url: str, collections: list[str]) -> list[dict]: """Scroll Qdrant and build a deduplicated document manifest.""" logger.info("Phase 1: Building document manifest...") documents: dict[str, dict] = {} # keyed by doc_key(object_name, collection) with httpx.Client(timeout=SCROLL_TIMEOUT) as client: for coll in collections: logger.info(" Scrolling %s...", coll) offset = None points_seen = 0 while True: body: dict = { "limit": 250, "with_payload": True, "with_vector": False, } if offset: body["offset"] = offset resp = client.post( f"{qdrant_url}/collections/{coll}/points/scroll", json=body, ) resp.raise_for_status() data = resp.json()["result"] points = data["points"] for pt in points: payload = pt.get("payload", {}) obj_name = payload.get("object_name", "") if not obj_name: continue key = doc_key(obj_name, coll) if key not in documents: meta = extract_doc_metadata(payload) documents[key] = { "object_name": obj_name, "collection": coll, "filename": payload.get("filename", obj_name.split("/")[-1]), "form": meta["form"], "extra_metadata": meta["extra"], "old_chunk_count": 0, } documents[key]["old_chunk_count"] += 1 points_seen += len(points) offset = data.get("next_page_offset") if not offset: break logger.info(" %d points → %d unique docs", points_seen, sum(1 for d in documents.values() if d["collection"] == coll)) manifest = list(documents.values()) logger.info(" Total: %d unique documents across %d collections", len(manifest), len(collections)) return manifest # --------------------------------------------------------------------------- # Phase 2: Per-document re-ingestion # --------------------------------------------------------------------------- def download_file(rag_url: str, object_name: str) -> bytes: """Download file bytes via MinIO presigned URL.""" with httpx.Client(timeout=60.0, verify=False) as c: resp = c.get(f"{rag_url}/api/v1/documents/download/{object_name}") resp.raise_for_status() presigned_url = resp.json()["url"] with httpx.Client(timeout=120.0, verify=False) as c: resp = c.get(presigned_url) resp.raise_for_status() return resp.content def delete_old_chunks(qdrant_url: str, collection: str, object_name: str) -> int: """Delete all chunks for a document from Qdrant. Returns estimated count.""" with httpx.Client(timeout=30.0) as c: resp = c.post( f"{qdrant_url}/collections/{collection}/points/delete", json={ "filter": { "must": [{ "key": "object_name", "match": {"value": object_name}, }] } }, ) resp.raise_for_status() return 0 # Qdrant delete doesn't return count def reupload_document( rag_url: str, file_bytes: bytes, filename: str, collection: str, form_fields: dict, extra_metadata: dict, ) -> dict: """Upload document to RAG service with new chunking parameters.""" ct = content_type_from_filename(filename) form_data = { "collection": collection, "data_type": form_fields.get("data_type", "compliance"), "bundesland": form_fields.get("bundesland", "bund"), "use_case": form_fields.get("use_case", "compliance"), "year": form_fields.get("year", "2026"), "chunk_strategy": CHUNK_STRATEGY, "chunk_size": str(CHUNK_SIZE), "chunk_overlap": str(CHUNK_OVERLAP), "metadata_json": json.dumps(extra_metadata, ensure_ascii=False), } with httpx.Client(timeout=UPLOAD_TIMEOUT, verify=False) as c: resp = c.post( f"{rag_url}/api/v1/documents/upload", files={"file": (filename, file_bytes, ct)}, data=form_data, ) resp.raise_for_status() return resp.json() def process_document( doc: dict, rag_url: str, qdrant_url: str, progress: dict, max_retries: int = 2, ) -> bool: """Process a single document: download → delete → re-upload. Returns success.""" key = doc_key(doc["object_name"], doc["collection"]) # Skip if already done if progress.get("documents", {}).get(key, {}).get("status") == "done": return True for attempt in range(max_retries + 1): try: # 1. Download file_bytes = download_file(rag_url, doc["object_name"]) if not file_bytes: logger.warning(" Empty file: %s — skipping", doc["object_name"]) progress.setdefault("documents", {})[key] = { "status": "skipped", "reason": "empty_file"} return False # 2. Delete old chunks delete_old_chunks(qdrant_url, doc["collection"], doc["object_name"]) # 3. Re-upload result = reupload_document( rag_url, file_bytes, doc["filename"], doc["collection"], doc["form"], doc["extra_metadata"], ) # 4. Record success progress.setdefault("documents", {})[key] = { "status": "done", "old_chunks": doc["old_chunk_count"], "new_chunks": result.get("chunks_count", 0), "new_document_id": result.get("document_id", ""), "completed_at": datetime.now(timezone.utc).isoformat(), } return True except httpx.HTTPStatusError as e: if e.response.status_code == 404: logger.warning(" File not in MinIO (404): %s — skipping", doc["object_name"]) progress.setdefault("documents", {})[key] = { "status": "skipped", "reason": "not_in_minio"} return False if attempt < max_retries: wait = 5 * (attempt + 1) logger.warning(" HTTP %d on attempt %d, retrying in %ds...", e.response.status_code, attempt + 1, wait) time.sleep(wait) else: logger.error(" FAILED after %d retries: %s", max_retries, e) progress.setdefault("documents", {})[key] = { "status": "error", "error": str(e), "retries": max_retries} return False except Exception as e: if attempt < max_retries: wait = 10 * (attempt + 1) logger.warning(" Error on attempt %d: %s — retrying in %ds", attempt + 1, e, wait) time.sleep(wait) else: logger.error(" FAILED after %d retries: %s", max_retries, e) progress.setdefault("documents", {})[key] = { "status": "error", "error": str(e), "retries": max_retries} return False return False # --------------------------------------------------------------------------- # Phase 3: Verification # --------------------------------------------------------------------------- def verify_results( qdrant_url: str, before_counts: dict, collections: list[str], manifest: list[dict], ): """Compare before/after counts and spot-check metadata.""" logger.info("Phase 3: Verification...") print("\n" + "=" * 65) print("D5 RE-INGESTION VERIFICATION REPORT") print("=" * 65) after_counts = {} with httpx.Client(timeout=10.0) as c: for coll in collections: try: r = c.post(f"{qdrant_url}/collections/{coll}/points/count", json={"exact": True}) r.raise_for_status() after_counts[coll] = r.json()["result"]["count"] except Exception: after_counts[coll] = -1 print(f"\n{'Collection':<35} {'Before':>8} {'After':>8} {'Delta':>8}") print("-" * 65) for coll in collections: before = before_counts.get(coll, 0) after = after_counts.get(coll, -1) delta = after - before if after >= 0 else "?" print(f"{coll:<35} {before:>8} {after:>8} {str(delta):>8}") # Spot-check: pick 3 random docs and verify metadata print("\nSpot-check (3 random docs):") sample = random.sample(manifest, min(3, len(manifest))) with httpx.Client(timeout=30.0) as c: for doc in sample: resp = c.post( f"{qdrant_url}/collections/{doc['collection']}/points/scroll", json={ "limit": 3, "with_payload": True, "with_vector": False, "filter": { "must": [{ "key": "object_name", "match": {"value": doc["object_name"]}, }] }, }, ) if resp.status_code != 200: print(f" {doc['object_name']}: QUERY FAILED") continue points = resp.json()["result"]["points"] if not points: print(f" {doc['object_name']}: NO CHUNKS FOUND") continue has_section = sum(1 for p in points if p["payload"].get("section")) has_para = sum(1 for p in points if p["payload"].get("paragraph")) print(f" {doc['filename'][:40]:<42} " f"chunks={len(points):>3} " f"with_section={has_section}/{len(points)} " f"with_para={has_para}/{len(points)}") print() # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="D5 Re-Ingestion Script") parser.add_argument("--rag-url", default=DEFAULT_RAG_URL) parser.add_argument("--qdrant-url", default=DEFAULT_QDRANT_URL) parser.add_argument("--dry-run", action="store_true", help="Build manifest only, no changes") parser.add_argument("--collection", default=None, help="Process only this collection") parser.add_argument("--resume", action="store_true", help="Resume from progress file") args = parser.parse_args() collections = [args.collection] if args.collection else TARGET_COLLECTIONS # Phase 0 before_counts = preflight_checks(args.rag_url, args.qdrant_url) # Phase 1 manifest = build_manifest(args.qdrant_url, collections) # Save manifest for inspection with open(MANIFEST_FILE, "w", encoding="utf-8") as f: json.dump(manifest, f, indent=2, ensure_ascii=False) logger.info("Manifest saved to %s", MANIFEST_FILE) if args.dry_run: print(f"\nDRY RUN: {len(manifest)} documents found. See {MANIFEST_FILE}") for doc in manifest[:10]: reg = doc["extra_metadata"].get("regulation_code", "?") print(f" {reg:<30} {doc['collection']:<35} chunks={doc['old_chunk_count']}") if len(manifest) > 10: print(f" ... and {len(manifest) - 10} more") sys.exit(0) # Phase 2 progress = load_progress() if args.resume else {"documents": {}} progress["started_at"] = datetime.now(timezone.utc).isoformat() progress["before_counts"] = before_counts done = 0 skipped = 0 failed = 0 for i, doc in enumerate(manifest, 1): key = doc_key(doc["object_name"], doc["collection"]) reg = doc["extra_metadata"].get("regulation_code", "?") if progress.get("documents", {}).get(key, {}).get("status") == "done": done += 1 continue logger.info("[%d/%d] %s (%s) — %d old chunks", i, len(manifest), reg, doc["collection"], doc["old_chunk_count"]) ok = process_document(doc, args.rag_url, args.qdrant_url, progress) if ok: done += 1 new_chunks = progress["documents"][key].get("new_chunks", "?") logger.info(" OK: %d old → %s new chunks", doc["old_chunk_count"], new_chunks) elif progress["documents"][key].get("status") == "skipped": skipped += 1 else: failed += 1 save_progress(progress) time.sleep(2) logger.info("Phase 2 complete: %d done, %d skipped, %d failed", done, skipped, failed) # Phase 3 verify_results(args.qdrant_url, before_counts, collections, manifest) print(f"Summary: {done} done, {skipped} skipped, {failed} failed") if failed: print(f"Re-run with --resume to retry {failed} failed documents") sys.exit(1) if __name__ == "__main__": main()