"""
Robust Legal Corpus Ingestion for UCCA RAG Integration.
This version handles large documents and unstable embedding services by:
- Processing one text at a time
- Health checks before each embedding
- Automatic retry with exponential backoff
- Progress tracking for resume capability
- Longer delays to prevent service overload
Usage:
python legal_corpus_robust.py --ingest DPF
python legal_corpus_robust.py --ingest-all-missing
python legal_corpus_robust.py --status
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import httpx
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
FieldCondition,
Filter,
MatchValue,
PointStruct,
VectorParams,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Configuration
QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost")
QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333"))
EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:8087")
LEGAL_CORPUS_COLLECTION = "bp_legal_corpus"
VECTOR_SIZE = 1024
CHUNK_SIZE = 800
CHUNK_OVERLAP = 150
# Robust settings
MAX_RETRIES = 5
INITIAL_DELAY = 2.0
DELAY_BETWEEN_EMBEDDINGS = 2.0
HEALTH_CHECK_INTERVAL = 10 # Check health every N embeddings
@dataclass
class Regulation:
"""Regulation metadata."""
code: str
name: str
full_name: str
regulation_type: str
source_url: str
description: str
language: str = "de"
# Regulations that need robust loading
ROBUST_REGULATIONS: List[Regulation] = [
Regulation(
code="DPF",
name="EU-US Data Privacy Framework",
full_name="Durchführungsbeschluss (EU) 2023/1795",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/dec_impl/2023/1795/oj",
description="Angemessenheitsbeschluss für USA-Transfers.",
),
Regulation(
code="BSI-TR-03161-1",
name="BSI-TR-03161 Teil 1",
full_name="BSI Technische Richtlinie - Allgemeine Anforderungen",
regulation_type="bsi_standard",
source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-1.pdf",
description="Allgemeine Sicherheitsanforderungen (45 Prüfaspekte).",
),
Regulation(
code="BSI-TR-03161-2",
name="BSI-TR-03161 Teil 2",
full_name="BSI Technische Richtlinie - Web-Anwendungen",
regulation_type="bsi_standard",
source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-2.pdf",
description="Web-Sicherheit (40 Prüfaspekte).",
),
Regulation(
code="BSI-TR-03161-3",
name="BSI-TR-03161 Teil 3",
full_name="BSI Technische Richtlinie - Hintergrundsysteme",
regulation_type="bsi_standard",
source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-3.pdf",
description="Backend-Sicherheit (35 Prüfaspekte).",
),
]
class RobustLegalCorpusIngestion:
"""Handles robust ingestion of large legal documents."""
def __init__(self):
self.qdrant = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
self.http_client = None
self.embeddings_since_health_check = 0
self._ensure_collection()
def _ensure_collection(self):
"""Create the legal corpus collection if it doesn't exist."""
collections = self.qdrant.get_collections().collections
collection_names = [c.name for c in collections]
if LEGAL_CORPUS_COLLECTION not in collection_names:
logger.info(f"Creating collection: {LEGAL_CORPUS_COLLECTION}")
self.qdrant.create_collection(
collection_name=LEGAL_CORPUS_COLLECTION,
vectors_config=VectorParams(
size=VECTOR_SIZE,
distance=Distance.COSINE,
),
)
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self.http_client is None:
self.http_client = httpx.AsyncClient(timeout=120.0)
return self.http_client
async def _check_embedding_service_health(self) -> bool:
"""Check if embedding service is healthy."""
try:
client = await self._get_client()
response = await client.get(f"{EMBEDDING_SERVICE_URL}/health", timeout=10.0)
return response.status_code == 200
except Exception as e:
logger.warning(f"Health check failed: {e}")
return False
async def _wait_for_healthy_service(self, max_wait: int = 60) -> bool:
"""Wait for embedding service to become healthy."""
logger.info("Waiting for embedding service to become healthy...")
start = datetime.now()
while (datetime.now() - start).seconds < max_wait:
if await self._check_embedding_service_health():
logger.info("Embedding service is healthy")
return True
await asyncio.sleep(5)
logger.error("Embedding service did not become healthy")
return False
async def _generate_single_embedding(self, text: str) -> Optional[List[float]]:
"""Generate embedding for a single text with robust retry."""
for attempt in range(MAX_RETRIES):
try:
# Health check periodically
self.embeddings_since_health_check += 1
if self.embeddings_since_health_check >= HEALTH_CHECK_INTERVAL:
if not await self._check_embedding_service_health():
await self._wait_for_healthy_service()
self.embeddings_since_health_check = 0
client = await self._get_client()
response = await client.post(
f"{EMBEDDING_SERVICE_URL}/embed",
json={"texts": [text]},
timeout=60.0,
)
response.raise_for_status()
data = response.json()
return data["embeddings"][0]
except Exception as e:
delay = INITIAL_DELAY * (2 ** attempt)
logger.warning(f"Embedding attempt {attempt + 1}/{MAX_RETRIES} failed: {e}")
logger.info(f"Waiting {delay}s before retry...")
# Close and recreate client on connection errors
if "disconnect" in str(e).lower() or "connection" in str(e).lower():
if self.http_client:
await self.http_client.aclose()
self.http_client = None
# Wait for service to recover
await asyncio.sleep(delay)
if not await self._wait_for_healthy_service():
continue
else:
await asyncio.sleep(delay)
logger.error(f"Failed to generate embedding after {MAX_RETRIES} attempts")
return None
def _chunk_text_semantic(self, text: str) -> List[Tuple[str, int]]:
"""Chunk text semantically, respecting German sentence boundaries."""
sentence_endings = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ])')
sentences = sentence_endings.split(text)
chunks = []
current_chunk = []
current_length = 0
chunk_start = 0
position = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
sentence_length = len(sentence)
if current_length + sentence_length > CHUNK_SIZE and current_chunk:
chunk_text = " ".join(current_chunk)
chunks.append((chunk_text, chunk_start))
# Keep some sentences for overlap
overlap_sentences = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) > CHUNK_OVERLAP:
break
overlap_sentences.insert(0, s)
overlap_length += len(s)
current_chunk = overlap_sentences
current_length = overlap_length
chunk_start = position - overlap_length
current_chunk.append(sentence)
current_length += sentence_length
position += sentence_length + 1
if current_chunk:
chunk_text = " ".join(current_chunk)
chunks.append((chunk_text, chunk_start))
return chunks
def _extract_article_info(self, text: str) -> Optional[Dict]:
"""Extract article number and paragraph from text."""
article_match = re.search(r'(?:Artikel|Art\.?)\s+(\d+)', text)
paragraph_match = re.search(r'(?:Absatz|Abs\.?)\s+(\d+)', text)
if article_match:
return {
"article": article_match.group(1),
"paragraph": paragraph_match.group(1) if paragraph_match else None,
}
return None
async def _fetch_document_text(self, regulation: Regulation) -> Optional[str]:
"""Fetch document text from URL."""
logger.info(f"Fetching {regulation.code} from: {regulation.source_url}")
try:
client = await self._get_client()
response = await client.get(
regulation.source_url,
follow_redirects=True,
headers={"Accept": "text/html,application/xhtml+xml"},
timeout=60.0,
)
response.raise_for_status()
html_content = response.text
html_content = re.sub(r'', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'', '', html_content, flags=re.DOTALL)
text = re.sub(r'<[^>]+>', ' ', html_content)
text = re.sub(r'\s+', ' ', text).strip()
return text
except Exception as e:
logger.error(f"Failed to fetch {regulation.code}: {e}")
return None
def get_existing_chunk_count(self, regulation_code: str) -> int:
"""Get count of existing chunks for a regulation."""
try:
result = self.qdrant.count(
collection_name=LEGAL_CORPUS_COLLECTION,
count_filter=Filter(
must=[
FieldCondition(
key="regulation_code",
match=MatchValue(value=regulation_code),
)
]
),
)
return result.count
except:
return 0
async def ingest_regulation_robust(self, regulation: Regulation, resume: bool = True) -> int:
"""
Ingest a regulation with robust error handling.
Args:
regulation: The regulation to ingest
resume: If True, skip already indexed chunks
Returns:
Number of chunks indexed
"""
logger.info(f"=== Starting robust ingestion for {regulation.code} ===")
# Check existing chunks
existing_count = self.get_existing_chunk_count(regulation.code)
logger.info(f"Existing chunks for {regulation.code}: {existing_count}")
# Fetch document
text = await self._fetch_document_text(regulation)
if not text or len(text) < 100:
logger.warning(f"No text found for {regulation.code}")
return 0
# Chunk the text
chunks = self._chunk_text_semantic(text)
total_chunks = len(chunks)
logger.info(f"Total chunks to process: {total_chunks}")
if resume and existing_count >= total_chunks:
logger.info(f"{regulation.code} already fully indexed")
return existing_count
# Determine starting point
start_idx = existing_count if resume else 0
logger.info(f"Starting from chunk {start_idx}")
indexed = 0
for idx, (chunk_text, position) in enumerate(chunks[start_idx:], start=start_idx):
# Progress logging
if idx % 10 == 0:
logger.info(f"Progress: {idx}/{total_chunks} chunks ({idx*100//total_chunks}%)")
# Generate embedding
embedding = await self._generate_single_embedding(chunk_text)
if embedding is None:
logger.error(f"Failed to embed chunk {idx}, stopping")
break
# Create point
point_id = hashlib.md5(f"{regulation.code}-{idx}".encode()).hexdigest()
article_info = self._extract_article_info(chunk_text)
point = PointStruct(
id=point_id,
vector=embedding,
payload={
"text": chunk_text,
"regulation_code": regulation.code,
"regulation_name": regulation.name,
"regulation_full_name": regulation.full_name,
"regulation_type": regulation.regulation_type,
"source_url": regulation.source_url,
"chunk_index": idx,
"chunk_position": position,
"article": article_info.get("article") if article_info else None,
"paragraph": article_info.get("paragraph") if article_info else None,
"language": regulation.language,
"indexed_at": datetime.utcnow().isoformat(),
"training_allowed": False,
},
)
# Upsert single point
self.qdrant.upsert(
collection_name=LEGAL_CORPUS_COLLECTION,
points=[point],
)
indexed += 1
# Delay between embeddings
await asyncio.sleep(DELAY_BETWEEN_EMBEDDINGS)
logger.info(f"=== Completed {regulation.code}: {indexed} new chunks indexed ===")
return existing_count + indexed
def get_status(self) -> Dict:
"""Get ingestion status for all robust regulations."""
status = {
"collection": LEGAL_CORPUS_COLLECTION,
"regulations": {},
}
for reg in ROBUST_REGULATIONS:
count = self.get_existing_chunk_count(reg.code)
status["regulations"][reg.code] = {
"name": reg.name,
"chunks": count,
"status": "complete" if count > 0 else "missing",
}
return status
async def close(self):
"""Close HTTP client."""
if self.http_client:
await self.http_client.aclose()
async def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Robust Legal Corpus Ingestion")
parser.add_argument("--ingest", nargs="+", metavar="CODE", help="Ingest specific regulations")
parser.add_argument("--ingest-all-missing", action="store_true", help="Ingest all missing regulations")
parser.add_argument("--status", action="store_true", help="Show status")
parser.add_argument("--no-resume", action="store_true", help="Don't resume from existing chunks")
args = parser.parse_args()
ingestion = RobustLegalCorpusIngestion()
try:
if args.status:
status = ingestion.get_status()
print(json.dumps(status, indent=2))
elif args.ingest_all_missing:
print("Ingesting all missing regulations...")
for reg in ROBUST_REGULATIONS:
if ingestion.get_existing_chunk_count(reg.code) == 0:
count = await ingestion.ingest_regulation_robust(reg, resume=not args.no_resume)
print(f"{reg.code}: {count} chunks")
elif args.ingest:
for code in args.ingest:
reg = next((r for r in ROBUST_REGULATIONS if r.code == code), None)
if not reg:
print(f"Unknown regulation: {code}")
continue
count = await ingestion.ingest_regulation_robust(reg, resume=not args.no_resume)
print(f"{code}: {count} chunks")
else:
parser.print_help()
finally:
await ingestion.close()
if __name__ == "__main__":
asyncio.run(main())