""" Robust Legal Corpus Ingestion for UCCA RAG Integration. This version handles large documents and unstable embedding services by: - Processing one text at a time - Health checks before each embedding - Automatic retry with exponential backoff - Progress tracking for resume capability - Longer delays to prevent service overload Usage: python legal_corpus_robust.py --ingest DPF python legal_corpus_robust.py --ingest-all-missing python legal_corpus_robust.py --status """ import asyncio import hashlib import json import logging import os import re import sys from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple import httpx from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, FieldCondition, Filter, MatchValue, PointStruct, VectorParams, ) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Configuration QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost") QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333")) EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:8087") LEGAL_CORPUS_COLLECTION = "bp_legal_corpus" VECTOR_SIZE = 1024 CHUNK_SIZE = 800 CHUNK_OVERLAP = 150 # Robust settings MAX_RETRIES = 5 INITIAL_DELAY = 2.0 DELAY_BETWEEN_EMBEDDINGS = 2.0 HEALTH_CHECK_INTERVAL = 10 # Check health every N embeddings @dataclass class Regulation: """Regulation metadata.""" code: str name: str full_name: str regulation_type: str source_url: str description: str language: str = "de" # Regulations that need robust loading ROBUST_REGULATIONS: List[Regulation] = [ Regulation( code="DPF", name="EU-US Data Privacy Framework", full_name="Durchführungsbeschluss (EU) 2023/1795", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/dec_impl/2023/1795/oj", description="Angemessenheitsbeschluss für USA-Transfers.", ), Regulation( code="BSI-TR-03161-1", name="BSI-TR-03161 Teil 1", full_name="BSI Technische Richtlinie - Allgemeine Anforderungen", regulation_type="bsi_standard", source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-1.pdf", description="Allgemeine Sicherheitsanforderungen (45 Prüfaspekte).", ), Regulation( code="BSI-TR-03161-2", name="BSI-TR-03161 Teil 2", full_name="BSI Technische Richtlinie - Web-Anwendungen", regulation_type="bsi_standard", source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-2.pdf", description="Web-Sicherheit (40 Prüfaspekte).", ), Regulation( code="BSI-TR-03161-3", name="BSI-TR-03161 Teil 3", full_name="BSI Technische Richtlinie - Hintergrundsysteme", regulation_type="bsi_standard", source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-3.pdf", description="Backend-Sicherheit (35 Prüfaspekte).", ), ] class RobustLegalCorpusIngestion: """Handles robust ingestion of large legal documents.""" def __init__(self): self.qdrant = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) self.http_client = None self.embeddings_since_health_check = 0 self._ensure_collection() def _ensure_collection(self): """Create the legal corpus collection if it doesn't exist.""" collections = self.qdrant.get_collections().collections collection_names = [c.name for c in collections] if LEGAL_CORPUS_COLLECTION not in collection_names: logger.info(f"Creating collection: {LEGAL_CORPUS_COLLECTION}") self.qdrant.create_collection( collection_name=LEGAL_CORPUS_COLLECTION, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), ) async def _get_client(self) -> httpx.AsyncClient: """Get or create HTTP client.""" if self.http_client is None: self.http_client = httpx.AsyncClient(timeout=120.0) return self.http_client async def _check_embedding_service_health(self) -> bool: """Check if embedding service is healthy.""" try: client = await self._get_client() response = await client.get(f"{EMBEDDING_SERVICE_URL}/health", timeout=10.0) return response.status_code == 200 except Exception as e: logger.warning(f"Health check failed: {e}") return False async def _wait_for_healthy_service(self, max_wait: int = 60) -> bool: """Wait for embedding service to become healthy.""" logger.info("Waiting for embedding service to become healthy...") start = datetime.now() while (datetime.now() - start).seconds < max_wait: if await self._check_embedding_service_health(): logger.info("Embedding service is healthy") return True await asyncio.sleep(5) logger.error("Embedding service did not become healthy") return False async def _generate_single_embedding(self, text: str) -> Optional[List[float]]: """Generate embedding for a single text with robust retry.""" for attempt in range(MAX_RETRIES): try: # Health check periodically self.embeddings_since_health_check += 1 if self.embeddings_since_health_check >= HEALTH_CHECK_INTERVAL: if not await self._check_embedding_service_health(): await self._wait_for_healthy_service() self.embeddings_since_health_check = 0 client = await self._get_client() response = await client.post( f"{EMBEDDING_SERVICE_URL}/embed", json={"texts": [text]}, timeout=60.0, ) response.raise_for_status() data = response.json() return data["embeddings"][0] except Exception as e: delay = INITIAL_DELAY * (2 ** attempt) logger.warning(f"Embedding attempt {attempt + 1}/{MAX_RETRIES} failed: {e}") logger.info(f"Waiting {delay}s before retry...") # Close and recreate client on connection errors if "disconnect" in str(e).lower() or "connection" in str(e).lower(): if self.http_client: await self.http_client.aclose() self.http_client = None # Wait for service to recover await asyncio.sleep(delay) if not await self._wait_for_healthy_service(): continue else: await asyncio.sleep(delay) logger.error(f"Failed to generate embedding after {MAX_RETRIES} attempts") return None def _chunk_text_semantic(self, text: str) -> List[Tuple[str, int]]: """Chunk text semantically, respecting German sentence boundaries.""" sentence_endings = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ])') sentences = sentence_endings.split(text) chunks = [] current_chunk = [] current_length = 0 chunk_start = 0 position = 0 for sentence in sentences: sentence = sentence.strip() if not sentence: continue sentence_length = len(sentence) if current_length + sentence_length > CHUNK_SIZE and current_chunk: chunk_text = " ".join(current_chunk) chunks.append((chunk_text, chunk_start)) # Keep some sentences for overlap overlap_sentences = [] overlap_length = 0 for s in reversed(current_chunk): if overlap_length + len(s) > CHUNK_OVERLAP: break overlap_sentences.insert(0, s) overlap_length += len(s) current_chunk = overlap_sentences current_length = overlap_length chunk_start = position - overlap_length current_chunk.append(sentence) current_length += sentence_length position += sentence_length + 1 if current_chunk: chunk_text = " ".join(current_chunk) chunks.append((chunk_text, chunk_start)) return chunks def _extract_article_info(self, text: str) -> Optional[Dict]: """Extract article number and paragraph from text.""" article_match = re.search(r'(?:Artikel|Art\.?)\s+(\d+)', text) paragraph_match = re.search(r'(?:Absatz|Abs\.?)\s+(\d+)', text) if article_match: return { "article": article_match.group(1), "paragraph": paragraph_match.group(1) if paragraph_match else None, } return None async def _fetch_document_text(self, regulation: Regulation) -> Optional[str]: """Fetch document text from URL.""" logger.info(f"Fetching {regulation.code} from: {regulation.source_url}") try: client = await self._get_client() response = await client.get( regulation.source_url, follow_redirects=True, headers={"Accept": "text/html,application/xhtml+xml"}, timeout=60.0, ) response.raise_for_status() html_content = response.text html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL) html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL) text = re.sub(r'<[^>]+>', ' ', html_content) text = re.sub(r'\s+', ' ', text).strip() return text except Exception as e: logger.error(f"Failed to fetch {regulation.code}: {e}") return None def get_existing_chunk_count(self, regulation_code: str) -> int: """Get count of existing chunks for a regulation.""" try: result = self.qdrant.count( collection_name=LEGAL_CORPUS_COLLECTION, count_filter=Filter( must=[ FieldCondition( key="regulation_code", match=MatchValue(value=regulation_code), ) ] ), ) return result.count except: return 0 async def ingest_regulation_robust(self, regulation: Regulation, resume: bool = True) -> int: """ Ingest a regulation with robust error handling. Args: regulation: The regulation to ingest resume: If True, skip already indexed chunks Returns: Number of chunks indexed """ logger.info(f"=== Starting robust ingestion for {regulation.code} ===") # Check existing chunks existing_count = self.get_existing_chunk_count(regulation.code) logger.info(f"Existing chunks for {regulation.code}: {existing_count}") # Fetch document text = await self._fetch_document_text(regulation) if not text or len(text) < 100: logger.warning(f"No text found for {regulation.code}") return 0 # Chunk the text chunks = self._chunk_text_semantic(text) total_chunks = len(chunks) logger.info(f"Total chunks to process: {total_chunks}") if resume and existing_count >= total_chunks: logger.info(f"{regulation.code} already fully indexed") return existing_count # Determine starting point start_idx = existing_count if resume else 0 logger.info(f"Starting from chunk {start_idx}") indexed = 0 for idx, (chunk_text, position) in enumerate(chunks[start_idx:], start=start_idx): # Progress logging if idx % 10 == 0: logger.info(f"Progress: {idx}/{total_chunks} chunks ({idx*100//total_chunks}%)") # Generate embedding embedding = await self._generate_single_embedding(chunk_text) if embedding is None: logger.error(f"Failed to embed chunk {idx}, stopping") break # Create point point_id = hashlib.md5(f"{regulation.code}-{idx}".encode()).hexdigest() article_info = self._extract_article_info(chunk_text) point = PointStruct( id=point_id, vector=embedding, payload={ "text": chunk_text, "regulation_code": regulation.code, "regulation_name": regulation.name, "regulation_full_name": regulation.full_name, "regulation_type": regulation.regulation_type, "source_url": regulation.source_url, "chunk_index": idx, "chunk_position": position, "article": article_info.get("article") if article_info else None, "paragraph": article_info.get("paragraph") if article_info else None, "language": regulation.language, "indexed_at": datetime.utcnow().isoformat(), "training_allowed": False, }, ) # Upsert single point self.qdrant.upsert( collection_name=LEGAL_CORPUS_COLLECTION, points=[point], ) indexed += 1 # Delay between embeddings await asyncio.sleep(DELAY_BETWEEN_EMBEDDINGS) logger.info(f"=== Completed {regulation.code}: {indexed} new chunks indexed ===") return existing_count + indexed def get_status(self) -> Dict: """Get ingestion status for all robust regulations.""" status = { "collection": LEGAL_CORPUS_COLLECTION, "regulations": {}, } for reg in ROBUST_REGULATIONS: count = self.get_existing_chunk_count(reg.code) status["regulations"][reg.code] = { "name": reg.name, "chunks": count, "status": "complete" if count > 0 else "missing", } return status async def close(self): """Close HTTP client.""" if self.http_client: await self.http_client.aclose() async def main(): """CLI entry point.""" import argparse parser = argparse.ArgumentParser(description="Robust Legal Corpus Ingestion") parser.add_argument("--ingest", nargs="+", metavar="CODE", help="Ingest specific regulations") parser.add_argument("--ingest-all-missing", action="store_true", help="Ingest all missing regulations") parser.add_argument("--status", action="store_true", help="Show status") parser.add_argument("--no-resume", action="store_true", help="Don't resume from existing chunks") args = parser.parse_args() ingestion = RobustLegalCorpusIngestion() try: if args.status: status = ingestion.get_status() print(json.dumps(status, indent=2)) elif args.ingest_all_missing: print("Ingesting all missing regulations...") for reg in ROBUST_REGULATIONS: if ingestion.get_existing_chunk_count(reg.code) == 0: count = await ingestion.ingest_regulation_robust(reg, resume=not args.no_resume) print(f"{reg.code}: {count} chunks") elif args.ingest: for code in args.ingest: reg = next((r for r in ROBUST_REGULATIONS if r.code == code), None) if not reg: print(f"Unknown regulation: {code}") continue count = await ingestion.ingest_regulation_robust(reg, resume=not args.no_resume) print(f"{code}: {count} chunks") else: parser.print_help() finally: await ingestion.close() if __name__ == "__main__": asyncio.run(main())