#!/usr/bin/env python3 """ D4 Integration Test: Upload BGB excerpt → verify Qdrant payloads. Usage: # Dry-run (local chunking only, no services needed) python3 scripts/test_d4_integration.py --dry-run # Against Mac Mini python3 scripts/test_d4_integration.py \ --rag-url https://macmini:8097 \ --qdrant-url http://macmini:6333 # Against production python3 scripts/test_d4_integration.py \ --rag-url https://rag-prod:8097 \ --qdrant-url http://qdrant-prod:6333 """ import argparse import json import os import sys import time import httpx FIXTURE_PATH = os.path.join( os.path.dirname(__file__), "..", "..", "embedding-service", "tests", "fixtures", "bgb_312_excerpt.txt", ) COLLECTION = "bp_compliance_gesetze" REG_CODE = "BGB_D4_TEST" # Expected sections in the BGB excerpt EXPECTED_SECTIONS = {"§ 312", "§ 312a", "§ 312g", "§ 312k"} def load_fixture() -> str: with open(FIXTURE_PATH, encoding="utf-8") as f: return f.read() def upload_document(rag_url: str, text: str) -> dict: """Upload BGB excerpt to RAG service.""" metadata = json.dumps({ "regulation_code": REG_CODE, "regulation_name_de": "BGB (D4 Test)", "source_type": "law", }) with httpx.Client(timeout=60.0, verify=False) as client: resp = client.post( f"{rag_url}/api/v1/documents/upload", files={"file": ("bgb_312_test.txt", text.encode(), "text/plain")}, data={ "collection": COLLECTION, "data_type": "law", "bundesland": "bund", "use_case": "compliance", "year": "2026", "chunk_strategy": "recursive", "chunk_size": "1500", "chunk_overlap": "100", "metadata_json": metadata, }, ) resp.raise_for_status() return resp.json() def scroll_chunks(qdrant_url: str, document_id: str) -> list[dict]: """Scroll Qdrant for chunks matching this document_id.""" all_points = [] offset = None with httpx.Client(timeout=30.0) as client: while True: body: dict = { "limit": 100, "with_payload": True, "with_vector": False, "filter": { "must": [{ "key": "document_id", "match": {"value": document_id}, }] }, } if offset: body["offset"] = offset resp = client.post( f"{qdrant_url}/collections/{COLLECTION}/points/scroll", json=body, ) resp.raise_for_status() data = resp.json()["result"] all_points.extend(data["points"]) offset = data.get("next_page_offset") if not offset: break return all_points def delete_test_data(qdrant_url: str, document_id: str): """Clean up test chunks from Qdrant.""" with httpx.Client(timeout=30.0) as client: resp = client.post( f"{qdrant_url}/collections/{COLLECTION}/points/delete", json={ "filter": { "must": [{ "key": "document_id", "match": {"value": document_id}, }] } }, ) resp.raise_for_status() def verify_chunks(points: list[dict]) -> dict: """Analyze chunks and return a verification report.""" report = { "total_chunks": len(points), "sections_found": set(), "chunks_with_section": 0, "chunks_with_paragraph": 0, "chunks_with_page": 0, "section_details": [], "issues": [], } for pt in points: payload = pt.get("payload", {}) section = payload.get("section", "") section_title = payload.get("section_title", "") paragraph = payload.get("paragraph", "") paragraph_num = payload.get("paragraph_num") page = payload.get("page") chunk_idx = payload.get("chunk_index", "?") if section: report["sections_found"].add(section) report["chunks_with_section"] += 1 if paragraph: report["chunks_with_paragraph"] += 1 if page is not None: report["chunks_with_page"] += 1 report["section_details"].append({ "chunk_index": chunk_idx, "section": section, "section_title": section_title[:40], "paragraph": paragraph, "paragraph_num": paragraph_num, "page": page, "text_preview": payload.get("chunk_text", "")[:60], }) # Checks missing = EXPECTED_SECTIONS - report["sections_found"] if missing: report["issues"].append(f"Missing sections: {missing}") if "§ 312k" not in report["sections_found"]: report["issues"].append("CRITICAL: § 312k not found!") section_ratio = report["chunks_with_section"] / max(report["total_chunks"], 1) if section_ratio < 0.9: report["issues"].append( f"Only {section_ratio:.0%} chunks have section metadata (expected >= 90%)" ) return report def print_report(report: dict): """Print verification report.""" print("\n" + "=" * 60) print("D4 VALIDATION REPORT") print("=" * 60) print(f"Total chunks: {report['total_chunks']}") print(f"With section: {report['chunks_with_section']}") print(f"With paragraph: {report['chunks_with_paragraph']}") print(f"With page: {report['chunks_with_page']}") print(f"Sections found: {sorted(report['sections_found'])}") print("\nChunk details:") for d in sorted(report["section_details"], key=lambda x: x["chunk_index"]): print( f" [{d['chunk_index']:2}] " f"section={d['section']!r:12s} " f"title={d['section_title']!r:30s} " f"para={d['paragraph']!r:8s}" ) if report["issues"]: print(f"\nISSUES ({len(report['issues'])}):") for issue in report["issues"]: print(f" - {issue}") print("\nRESULT: FAIL") else: print("\nRESULT: PASS — all sections detected, metadata quality OK") def main(): parser = argparse.ArgumentParser(description="D4 Integration Test") parser.add_argument("--rag-url", default="https://macmini:8097") parser.add_argument("--qdrant-url", default="http://macmini:6333") parser.add_argument("--dry-run", action="store_true", help="Only test local chunking, no upload") parser.add_argument("--keep", action="store_true", help="Don't delete test data after verification") args = parser.parse_args() text = load_fixture() print(f"Loaded BGB excerpt: {len(text)} chars") if args.dry_run: # Import chunking directly sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "embedding-service")) from main import chunk_text_legal_structured chunks = chunk_text_legal_structured(text, 1500, 100) # Build fake points for verification points = [{"payload": { "chunk_index": c["index"], "chunk_text": c["text"], "section": c["section"], "section_title": c["section_title"], "paragraph": c["paragraph"], "paragraph_num": c["paragraph_num"], "page": c["page"], }} for c in chunks] report = verify_chunks(points) print_report(report) sys.exit(1 if report["issues"] else 0) # Full integration test print(f"Uploading to {args.rag_url} → collection={COLLECTION}...") result = upload_document(args.rag_url, text) doc_id = result["document_id"] print(f" document_id: {doc_id}") print(f" chunks_count: {result['chunks_count']}") print(f" vectors_indexed: {result['vectors_indexed']}") print("Waiting 2s for indexing...") time.sleep(2) print(f"Scrolling Qdrant at {args.qdrant_url}...") points = scroll_chunks(args.qdrant_url, doc_id) print(f" Found {len(points)} points") report = verify_chunks(points) print_report(report) if not args.keep: print(f"\nCleaning up test data (document_id={doc_id})...") delete_test_data(args.qdrant_url, doc_id) print(" Deleted.") sys.exit(1 if report["issues"] else 0) if __name__ == "__main__": main()