""" Legal Corpus Ingestion for UCCA RAG Integration. Indexes all 19 regulations from the Compliance Hub into Qdrant for semantic search during UCCA assessments and explanations. Collections: - bp_legal_corpus: All regulation texts (GDPR, AI Act, CRA, BSI, etc.) Usage: python legal_corpus_ingestion.py --ingest-all python legal_corpus_ingestion.py --ingest GDPR AIACT python legal_corpus_ingestion.py --status """ import asyncio import hashlib import json import logging import os import re from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple from urllib.parse import urlparse import httpx from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, FieldCondition, Filter, MatchValue, PointStruct, VectorParams, ) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration - Support both QDRANT_URL and QDRANT_HOST/PORT _qdrant_url = os.getenv("QDRANT_URL", "") if _qdrant_url: # Parse URL: http://qdrant:6333 -> host=qdrant, port=6333 from urllib.parse import urlparse _parsed = urlparse(_qdrant_url) QDRANT_HOST = _parsed.hostname or "localhost" QDRANT_PORT = _parsed.port or 6333 else: QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost") QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333")) EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:8087") LEGAL_CORPUS_COLLECTION = "bp_legal_corpus" VECTOR_SIZE = 1024 # BGE-M3 dimension # Chunking configuration - matched to NIBIS settings for semantic chunking CHUNK_SIZE = int(os.getenv("LEGAL_CHUNK_SIZE", "1000")) CHUNK_OVERLAP = int(os.getenv("LEGAL_CHUNK_OVERLAP", "200")) # Base path for local PDF/HTML files # In Docker: /app/docs/legal_corpus (mounted volume) # Local dev: relative to script location _default_docs_path = Path(__file__).parent.parent / "docs" / "legal_corpus" LEGAL_DOCS_PATH = Path(os.getenv("LEGAL_DOCS_PATH", str(_default_docs_path))) # Docker-specific override: if /app/docs exists, use it if Path("/app/docs/legal_corpus").exists(): LEGAL_DOCS_PATH = Path("/app/docs/legal_corpus") @dataclass class Regulation: """Regulation metadata.""" code: str name: str full_name: str regulation_type: str source_url: str description: str celex: Optional[str] = None # CELEX number for EUR-Lex direct access local_path: Optional[str] = None language: str = "de" requirement_count: int = 0 # All 19 regulations from Compliance Hub REGULATIONS: List[Regulation] = [ Regulation( code="GDPR", name="DSGVO", full_name="Verordnung (EU) 2016/679 - Datenschutz-Grundverordnung", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2016/679/oj/deu", description="Grundverordnung zum Schutz natuerlicher Personen bei der Verarbeitung personenbezogener Daten.", celex="32016R0679", requirement_count=99, ), Regulation( code="EPRIVACY", name="ePrivacy-Richtlinie", full_name="Richtlinie 2002/58/EG", regulation_type="eu_directive", source_url="https://eur-lex.europa.eu/eli/dir/2002/58/oj/deu", description="Datenschutz in der elektronischen Kommunikation, Cookies und Tracking.", celex="32002L0058", requirement_count=25, ), Regulation( code="TDDDG", name="TDDDG", full_name="Telekommunikation-Digitale-Dienste-Datenschutz-Gesetz", regulation_type="de_law", source_url="https://www.gesetze-im-internet.de/ttdsg/TDDDG.pdf", description="Deutsche Umsetzung der ePrivacy-Richtlinie (30 Paragraphen).", requirement_count=30, ), Regulation( code="SCC", name="Standardvertragsklauseln", full_name="Durchfuehrungsbeschluss (EU) 2021/914", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/dec_impl/2021/914/oj/deu", description="Standardvertragsklauseln fuer Drittlandtransfers.", celex="32021D0914", requirement_count=18, ), Regulation( code="DPF", name="EU-US Data Privacy Framework", full_name="Durchfuehrungsbeschluss (EU) 2023/1795", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/dec_impl/2023/1795/oj", description="Angemessenheitsbeschluss fuer USA-Transfers.", celex="32023D1795", requirement_count=12, ), Regulation( code="AIACT", name="EU AI Act", full_name="Verordnung (EU) 2024/1689 - KI-Verordnung", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2024/1689/oj/deu", description="EU-Verordnung zur Regulierung von KI-Systemen nach Risikostufen.", celex="32024R1689", requirement_count=85, ), Regulation( code="CRA", name="Cyber Resilience Act", full_name="Verordnung (EU) 2024/2847", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2024/2847/oj/deu", description="Cybersicherheitsanforderungen, SBOM-Pflicht.", celex="32024R2847", requirement_count=45, ), Regulation( code="NIS2", name="NIS2-Richtlinie", full_name="Richtlinie (EU) 2022/2555", regulation_type="eu_directive", source_url="https://eur-lex.europa.eu/eli/dir/2022/2555/oj/deu", description="Cybersicherheit fuer wesentliche Einrichtungen.", celex="32022L2555", requirement_count=46, ), Regulation( code="EUCSA", name="EU Cybersecurity Act", full_name="Verordnung (EU) 2019/881", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2019/881/oj/deu", description="ENISA und Cybersicherheitszertifizierung.", celex="32019R0881", requirement_count=35, ), Regulation( code="DATAACT", name="Data Act", full_name="Verordnung (EU) 2023/2854", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2023/2854/oj/deu", description="Fairer Datenzugang, IoT-Daten, Cloud-Wechsel.", celex="32023R2854", requirement_count=42, ), Regulation( code="DGA", name="Data Governance Act", full_name="Verordnung (EU) 2022/868", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2022/868/oj/deu", description="Weiterverwendung oeffentlicher Daten.", celex="32022R0868", requirement_count=35, ), Regulation( code="DSA", name="Digital Services Act", full_name="Verordnung (EU) 2022/2065", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2022/2065/oj/deu", description="Digitale Dienste, Transparenzpflichten.", celex="32022R2065", requirement_count=93, ), Regulation( code="EAA", name="European Accessibility Act", full_name="Richtlinie (EU) 2019/882", regulation_type="eu_directive", source_url="https://eur-lex.europa.eu/eli/dir/2019/882/oj/deu", description="Barrierefreiheit digitaler Produkte.", celex="32019L0882", requirement_count=25, ), Regulation( code="DSM", name="DSM-Urheberrechtsrichtlinie", full_name="Richtlinie (EU) 2019/790", regulation_type="eu_directive", source_url="https://eur-lex.europa.eu/eli/dir/2019/790/oj/deu", description="Urheberrecht, Text- und Data-Mining.", celex="32019L0790", requirement_count=22, ), Regulation( code="PLD", name="Produkthaftungsrichtlinie", full_name="Richtlinie (EU) 2024/2853", regulation_type="eu_directive", source_url="https://eur-lex.europa.eu/eli/dir/2024/2853/oj/deu", description="Produkthaftung inkl. Software und KI.", celex="32024L2853", requirement_count=18, ), Regulation( code="GPSR", name="General Product Safety", full_name="Verordnung (EU) 2023/988", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2023/988/oj/deu", description="Allgemeine Produktsicherheit.", celex="32023R0988", requirement_count=30, ), Regulation( code="BSI-TR-03161-1", name="BSI-TR-03161 Teil 1", full_name="BSI Technische Richtlinie - Allgemeine Anforderungen", regulation_type="bsi_standard", source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-1.pdf?__blob=publicationFile&v=6", description="Allgemeine Sicherheitsanforderungen (45 Pruefaspekte).", requirement_count=45, ), Regulation( code="BSI-TR-03161-2", name="BSI-TR-03161 Teil 2", full_name="BSI Technische Richtlinie - Web-Anwendungen", regulation_type="bsi_standard", source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-2.pdf?__blob=publicationFile&v=5", description="Web-Sicherheit (40 Pruefaspekte).", requirement_count=40, ), Regulation( code="BSI-TR-03161-3", name="BSI-TR-03161 Teil 3", full_name="BSI Technische Richtlinie - Hintergrundsysteme", regulation_type="bsi_standard", source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-3.pdf?__blob=publicationFile&v=5", description="Backend-Sicherheit (35 Pruefaspekte).", requirement_count=35, ), # Additional regulations for financial sector and health Regulation( code="DORA", name="DORA", full_name="Verordnung (EU) 2022/2554 - Digital Operational Resilience Act", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2022/2554/oj/deu", description="Digitale operationale Resilienz fuer den Finanzsektor. IKT-Risikomanagement, Vorfallmeldung, Resilienz-Tests.", celex="32022R2554", requirement_count=64, ), Regulation( code="PSD2", name="PSD2", full_name="Richtlinie (EU) 2015/2366 - Zahlungsdiensterichtlinie", regulation_type="eu_directive", source_url="https://eur-lex.europa.eu/eli/dir/2015/2366/oj/deu", description="Zahlungsdienste im Binnenmarkt. Starke Kundenauthentifizierung, Open Banking APIs.", celex="32015L2366", requirement_count=117, ), Regulation( code="AMLR", name="AML-Verordnung", full_name="Verordnung (EU) 2024/1624 - Geldwaeschebekaempfung", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2024/1624/oj/deu", description="Verhinderung der Nutzung des Finanzsystems zur Geldwaesche und Terrorismusfinanzierung.", celex="32024R1624", requirement_count=89, ), Regulation( code="EHDS", name="EHDS", full_name="Verordnung (EU) 2025/327 - Europaeischer Gesundheitsdatenraum", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2025/327/oj/deu", description="Europaeischer Raum fuer Gesundheitsdaten. Primaer- und Sekundaernutzung von Gesundheitsdaten.", celex="32025R0327", requirement_count=95, ), Regulation( code="MiCA", name="MiCA", full_name="Verordnung (EU) 2023/1114 - Markets in Crypto-Assets", regulation_type="eu_regulation", source_url="https://eur-lex.europa.eu/eli/reg/2023/1114/oj/deu", description="Regulierung von Kryptowerten, Stablecoins und Crypto-Asset-Dienstleistern.", celex="32023R1114", requirement_count=149, ), ] class LegalCorpusIngestion: """Handles ingestion of legal documents into Qdrant.""" def __init__(self): self.qdrant = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) self.http_client = httpx.AsyncClient(timeout=60.0) self._ensure_collection() def _ensure_collection(self): """Create the legal corpus collection if it doesn't exist.""" collections = self.qdrant.get_collections().collections collection_names = [c.name for c in collections] if LEGAL_CORPUS_COLLECTION not in collection_names: logger.info(f"Creating collection: {LEGAL_CORPUS_COLLECTION}") self.qdrant.create_collection( collection_name=LEGAL_CORPUS_COLLECTION, vectors_config=VectorParams( size=VECTOR_SIZE, distance=Distance.COSINE, ), ) logger.info(f"Collection {LEGAL_CORPUS_COLLECTION} created") else: logger.info(f"Collection {LEGAL_CORPUS_COLLECTION} already exists") async def _generate_embeddings(self, texts: List[str]) -> List[List[float]]: """Generate embeddings via the embedding service.""" try: response = await self.http_client.post( f"{EMBEDDING_SERVICE_URL}/embed", json={"texts": texts}, timeout=120.0, ) response.raise_for_status() data = response.json() return data["embeddings"] except Exception as e: logger.error(f"Embedding generation failed: {e}") raise # German abbreviations that don't end sentences GERMAN_ABBREVIATIONS = { 'bzw', 'ca', 'chr', 'd.h', 'dr', 'etc', 'evtl', 'ggf', 'inkl', 'max', 'min', 'mio', 'mrd', 'nr', 'prof', 's', 'sog', 'u.a', 'u.ä', 'usw', 'v.a', 'vgl', 'vs', 'z.b', 'z.t', 'zzgl', 'abs', 'art', 'aufl', 'bd', 'betr', 'bzgl', 'dgl', 'ebd', 'hrsg', 'jg', 'kap', 'lt', 'rdnr', 'rn', 'std', 'str', 'tel', 'ua', 'uvm', 'va', 'zb', 'bsi', 'tr', 'owasp', 'iso', 'iec', 'din', 'en' } def _split_into_sentences(self, text: str) -> List[str]: """Split text into sentences with German language support.""" if not text: return [] text = re.sub(r'\s+', ' ', text).strip() # Protect abbreviations protected_text = text for abbrev in self.GERMAN_ABBREVIATIONS: pattern = re.compile(r'\b' + re.escape(abbrev) + r'\.', re.IGNORECASE) protected_text = pattern.sub(abbrev.replace('.', '') + '', protected_text) # Protect decimal/ordinal numbers and requirement IDs (e.g., "O.Data_1") protected_text = re.sub(r'(\d)\.(\d)', r'\1\2', protected_text) protected_text = re.sub(r'(\d+)\.(\s)', r'\1\2', protected_text) protected_text = re.sub(r'([A-Z])\.([A-Z])', r'\1\2', protected_text) # O.Data_1 # Split on sentence endings sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9])|(?<=[.!?])$' raw_sentences = re.split(sentence_pattern, protected_text) # Restore protected characters sentences = [] for s in raw_sentences: s = s.replace('', '.').replace('', '.').replace('', '.').replace('', '.').replace('', '.') s = s.strip() if s: sentences.append(s) return sentences def _split_into_paragraphs(self, text: str) -> List[str]: """Split text into paragraphs.""" if not text: return [] raw_paragraphs = re.split(r'\n\s*\n', text) return [para.strip() for para in raw_paragraphs if para.strip()] def _chunk_text_semantic(self, text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[Tuple[str, int]]: """ Semantic chunking that respects paragraph and sentence boundaries. Matches NIBIS chunking strategy for consistency. Returns list of (chunk_text, start_position) tuples. """ if not text: return [] if len(text) <= chunk_size: return [(text.strip(), 0)] paragraphs = self._split_into_paragraphs(text) overlap_sentences = max(1, overlap // 100) # Convert char overlap to sentence overlap chunks = [] current_chunk_parts = [] current_chunk_length = 0 chunk_start = 0 position = 0 for para in paragraphs: if len(para) > chunk_size: # Large paragraph: split into sentences sentences = self._split_into_sentences(para) for sentence in sentences: sentence_len = len(sentence) if sentence_len > chunk_size: # Very long sentence: save current chunk first if current_chunk_parts: chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else [] current_chunk_parts = list(overlap_buffer) current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts) # Add long sentence as its own chunk chunks.append((sentence, position)) current_chunk_parts = [sentence] current_chunk_length = len(sentence) + 1 position += sentence_len + 1 continue if current_chunk_length + sentence_len + 1 > chunk_size and current_chunk_parts: # Current chunk is full, save it chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else [] current_chunk_parts = list(overlap_buffer) current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts) chunk_start = position - current_chunk_length current_chunk_parts.append(sentence) current_chunk_length += sentence_len + 1 position += sentence_len + 1 else: # Small paragraph: try to keep together para_len = len(para) if current_chunk_length + para_len + 2 > chunk_size and current_chunk_parts: chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) last_para_sentences = self._split_into_sentences(current_chunk_parts[-1] if current_chunk_parts else "") overlap_buffer = last_para_sentences[-overlap_sentences:] if overlap_sentences > 0 and last_para_sentences else [] current_chunk_parts = list(overlap_buffer) current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts) chunk_start = position - current_chunk_length if current_chunk_parts: current_chunk_parts.append(para) current_chunk_length += para_len + 2 else: current_chunk_parts = [para] current_chunk_length = para_len chunk_start = position position += para_len + 2 # Don't forget the last chunk if current_chunk_parts: chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) # Clean up whitespace return [(re.sub(r'\s+', ' ', c).strip(), pos) for c, pos in chunks if c.strip()] def _extract_article_info(self, text: str) -> Optional[Dict]: """Extract article number and paragraph from text.""" # Pattern for "Artikel X" or "Art. X" article_match = re.search(r'(?:Artikel|Art\.?)\s+(\d+)', text) paragraph_match = re.search(r'(?:Absatz|Abs\.?)\s+(\d+)', text) if article_match: return { "article": article_match.group(1), "paragraph": paragraph_match.group(1) if paragraph_match else None, } return None async def _fetch_document_text(self, regulation: Regulation) -> Optional[str]: """ Fetch document text from local file or URL. Priority: 1. Local file in docs/legal_corpus/ (.txt or .pdf) 2. EUR-Lex via CELEX URL (for EU regulations/directives) 3. Fallback to original source URL """ # Check for local file first local_file = LEGAL_DOCS_PATH / f"{regulation.code}.txt" if local_file.exists(): logger.info(f"Loading {regulation.code} from local file: {local_file}") return local_file.read_text(encoding="utf-8") local_pdf = LEGAL_DOCS_PATH / f"{regulation.code}.pdf" if local_pdf.exists(): logger.info(f"Extracting text from PDF: {local_pdf}") try: # Use embedding service for PDF extraction response = await self.http_client.post( f"{EMBEDDING_SERVICE_URL}/extract-pdf", files={"file": open(local_pdf, "rb")}, timeout=120.0, ) response.raise_for_status() data = response.json() return data.get("text", "") except Exception as e: logger.error(f"PDF extraction failed for {regulation.code}: {e}") # Try EUR-Lex CELEX URL if available (bypasses JavaScript CAPTCHA) if regulation.celex: celex_url = f"https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{regulation.celex}" logger.info(f"Fetching {regulation.code} from EUR-Lex CELEX: {celex_url}") try: response = await self.http_client.get( celex_url, follow_redirects=True, headers={ "Accept": "text/html,application/xhtml+xml", "Accept-Language": "de-DE,de;q=0.9", "User-Agent": "Mozilla/5.0 (compatible; LegalCorpusIndexer/1.0)", }, timeout=120.0, ) response.raise_for_status() html_content = response.text # Check if we got actual content, not a CAPTCHA page if "verify that you're not a robot" not in html_content and len(html_content) > 10000: text = self._html_to_text(html_content) if text and len(text) > 1000: logger.info(f"Successfully fetched {regulation.code} via CELEX ({len(text)} chars)") return text else: logger.warning(f"CELEX response too short for {regulation.code}, trying fallback") else: logger.warning(f"CELEX returned CAPTCHA for {regulation.code}, trying fallback") except Exception as e: logger.warning(f"CELEX fetch failed for {regulation.code}: {e}, trying fallback") # Fallback to original source URL logger.info(f"Fetching {regulation.code} from: {regulation.source_url}") try: # Check if source URL is a PDF (handle URLs with query parameters) parsed_url = urlparse(regulation.source_url) is_pdf_url = parsed_url.path.lower().endswith('.pdf') if is_pdf_url: logger.info(f"Downloading PDF from URL for {regulation.code}") response = await self.http_client.get( regulation.source_url, follow_redirects=True, headers={ "Accept": "application/pdf", "User-Agent": "Mozilla/5.0 (compatible; LegalCorpusIndexer/1.0)", }, timeout=180.0, ) response.raise_for_status() # Extract text from PDF via embedding service pdf_content = response.content extract_response = await self.http_client.post( f"{EMBEDDING_SERVICE_URL}/extract-pdf", files={"file": ("document.pdf", pdf_content, "application/pdf")}, timeout=180.0, ) extract_response.raise_for_status() data = extract_response.json() text = data.get("text", "") if text: logger.info(f"Successfully extracted PDF text for {regulation.code} ({len(text)} chars)") return text else: logger.warning(f"PDF extraction returned empty text for {regulation.code}") return None else: # Regular HTML fetch response = await self.http_client.get( regulation.source_url, follow_redirects=True, headers={ "Accept": "text/html,application/xhtml+xml", "Accept-Language": "de-DE,de;q=0.9", "User-Agent": "Mozilla/5.0 (compatible; LegalCorpusIndexer/1.0)", }, timeout=120.0, ) response.raise_for_status() text = self._html_to_text(response.text) return text except Exception as e: logger.error(f"Failed to fetch {regulation.code}: {e}") return None def _html_to_text(self, html_content: str) -> str: """Convert HTML to clean text.""" # Remove script and style tags html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL) html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL) # Remove comments html_content = re.sub(r'', '', html_content, flags=re.DOTALL) # Replace common HTML entities html_content = html_content.replace(' ', ' ') html_content = html_content.replace('&', '&') html_content = html_content.replace('<', '<') html_content = html_content.replace('>', '>') html_content = html_content.replace('"', '"') # Convert breaks and paragraphs to newlines for better chunking html_content = re.sub(r'', '\n', html_content, flags=re.IGNORECASE) html_content = re.sub(r'

', '\n\n', html_content, flags=re.IGNORECASE) html_content = re.sub(r'', '\n', html_content, flags=re.IGNORECASE) html_content = re.sub(r'', '\n\n', html_content, flags=re.IGNORECASE) # Remove remaining HTML tags text = re.sub(r'<[^>]+>', ' ', html_content) # Clean up whitespace (but preserve paragraph breaks) text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n[ \t]+', '\n', text) text = re.sub(r'[ \t]+\n', '\n', text) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() async def ingest_regulation(self, regulation: Regulation) -> int: """ Ingest a single regulation into Qdrant. Returns number of chunks indexed. """ logger.info(f"Ingesting {regulation.code}: {regulation.name}") # Fetch document text text = await self._fetch_document_text(regulation) if not text or len(text) < 100: logger.warning(f"No text found for {regulation.code}, skipping") return 0 # Chunk the text chunks = self._chunk_text_semantic(text) logger.info(f"Created {len(chunks)} chunks for {regulation.code}") if not chunks: return 0 # Generate embeddings in batches (very small for CPU stability) batch_size = 4 all_points = [] max_retries = 3 for i in range(0, len(chunks), batch_size): batch_chunks = chunks[i:i + batch_size] chunk_texts = [c[0] for c in batch_chunks] # Retry logic for embedding service stability embeddings = None for retry in range(max_retries): try: embeddings = await self._generate_embeddings(chunk_texts) break except Exception as e: logger.warning(f"Embedding attempt {retry+1}/{max_retries} failed for batch {i//batch_size}: {e}") if retry < max_retries - 1: await asyncio.sleep(3 * (retry + 1)) # Longer backoff: 3s, 6s, 9s else: logger.error(f"Embedding failed permanently for batch {i//batch_size}") if embeddings is None: continue # Longer delay between batches for CPU stability await asyncio.sleep(1.5) for j, ((chunk_text, position), embedding) in enumerate(zip(batch_chunks, embeddings)): chunk_idx = i + j point_id = hashlib.md5(f"{regulation.code}-{chunk_idx}".encode()).hexdigest() # Extract article info if present article_info = self._extract_article_info(chunk_text) point = PointStruct( id=point_id, vector=embedding, payload={ "text": chunk_text, "regulation_code": regulation.code, "regulation_name": regulation.name, "regulation_full_name": regulation.full_name, "regulation_type": regulation.regulation_type, "source_url": regulation.source_url, "chunk_index": chunk_idx, "chunk_position": position, "article": article_info.get("article") if article_info else None, "paragraph": article_info.get("paragraph") if article_info else None, "language": regulation.language, "indexed_at": datetime.utcnow().isoformat(), "training_allowed": False, # Legal texts - no training }, ) all_points.append(point) # Upsert to Qdrant if all_points: self.qdrant.upsert( collection_name=LEGAL_CORPUS_COLLECTION, points=all_points, ) logger.info(f"Indexed {len(all_points)} chunks for {regulation.code}") return len(all_points) async def ingest_all(self) -> Dict[str, int]: """Ingest all regulations.""" results = {} total = 0 for regulation in REGULATIONS: try: count = await self.ingest_regulation(regulation) results[regulation.code] = count total += count except Exception as e: logger.error(f"Failed to ingest {regulation.code}: {e}") results[regulation.code] = 0 logger.info(f"Ingestion complete: {total} total chunks indexed") return results async def ingest_selected(self, codes: List[str]) -> Dict[str, int]: """Ingest selected regulations by code.""" results = {} for code in codes: regulation = next((r for r in REGULATIONS if r.code == code), None) if not regulation: logger.warning(f"Unknown regulation code: {code}") results[code] = 0 continue try: count = await self.ingest_regulation(regulation) results[code] = count except Exception as e: logger.error(f"Failed to ingest {code}: {e}") results[code] = 0 return results def get_status(self) -> Dict: """Get collection status and indexed regulations.""" try: collection_info = self.qdrant.get_collection(LEGAL_CORPUS_COLLECTION) # Count points per regulation regulation_counts = {} for reg in REGULATIONS: result = self.qdrant.count( collection_name=LEGAL_CORPUS_COLLECTION, count_filter=Filter( must=[ FieldCondition( key="regulation_code", match=MatchValue(value=reg.code), ) ] ), ) regulation_counts[reg.code] = result.count return { "collection": LEGAL_CORPUS_COLLECTION, "total_points": collection_info.points_count, "vector_size": VECTOR_SIZE, "regulations": regulation_counts, "status": "ready" if collection_info.points_count > 0 else "empty", } except Exception as e: return { "collection": LEGAL_CORPUS_COLLECTION, "error": str(e), "status": "error", } async def search( self, query: str, regulation_codes: Optional[List[str]] = None, top_k: int = 5, ) -> List[Dict]: """ Search the legal corpus for relevant passages. Args: query: Search query text regulation_codes: Optional list of regulation codes to filter top_k: Number of results to return Returns: List of search results with text and metadata """ # Generate query embedding embeddings = await self._generate_embeddings([query]) query_vector = embeddings[0] # Build filter search_filter = None if regulation_codes: search_filter = Filter( should=[ FieldCondition( key="regulation_code", match=MatchValue(value=code), ) for code in regulation_codes ] ) # Search results = self.qdrant.search( collection_name=LEGAL_CORPUS_COLLECTION, query_vector=query_vector, query_filter=search_filter, limit=top_k, ) return [ { "text": hit.payload.get("text"), "regulation_code": hit.payload.get("regulation_code"), "regulation_name": hit.payload.get("regulation_name"), "article": hit.payload.get("article"), "paragraph": hit.payload.get("paragraph"), "source_url": hit.payload.get("source_url"), "score": hit.score, } for hit in results ] async def close(self): """Close HTTP client.""" await self.http_client.aclose() async def main(): """CLI entry point.""" import argparse parser = argparse.ArgumentParser(description="Legal Corpus Ingestion for UCCA") parser.add_argument("--ingest-all", action="store_true", help="Ingest all 19 regulations") parser.add_argument("--ingest", nargs="+", metavar="CODE", help="Ingest specific regulations by code") parser.add_argument("--status", action="store_true", help="Show collection status") parser.add_argument("--search", type=str, help="Test search query") args = parser.parse_args() ingestion = LegalCorpusIngestion() try: if args.status: status = ingestion.get_status() print(json.dumps(status, indent=2)) elif args.ingest_all: print("Ingesting all 19 regulations...") results = await ingestion.ingest_all() print("\nResults:") for code, count in results.items(): print(f" {code}: {count} chunks") print(f"\nTotal: {sum(results.values())} chunks") elif args.ingest: print(f"Ingesting: {', '.join(args.ingest)}") results = await ingestion.ingest_selected(args.ingest) print("\nResults:") for code, count in results.items(): print(f" {code}: {count} chunks") elif args.search: print(f"Searching: {args.search}") results = await ingestion.search(args.search) print(f"\nFound {len(results)} results:") for i, result in enumerate(results, 1): print(f"\n{i}. [{result['regulation_code']}] Score: {result['score']:.3f}") if result.get('article'): print(f" Art. {result['article']}" + (f" Abs. {result['paragraph']}" if result.get('paragraph') else "")) print(f" {result['text'][:200]}...") else: parser.print_help() finally: await ingestion.close() if __name__ == "__main__": asyncio.run(main())