""" Legal Corpus Chunking — Text splitting, semantic chunking, and HTML-to-text conversion. Provides German-aware sentence splitting, paragraph splitting, semantic chunking with overlap, and HTML-to-text conversion for legal document ingestion. """ import re from typing import Dict, List, Optional, Tuple # German abbreviations that don't end sentences GERMAN_ABBREVIATIONS = { 'bzw', 'ca', 'chr', 'd.h', 'dr', 'etc', 'evtl', 'ggf', 'inkl', 'max', 'min', 'mio', 'mrd', 'nr', 'prof', 's', 'sog', 'u.a', 'u.ä', 'usw', 'v.a', 'vgl', 'vs', 'z.b', 'z.t', 'zzgl', 'abs', 'art', 'aufl', 'bd', 'betr', 'bzgl', 'dgl', 'ebd', 'hrsg', 'jg', 'kap', 'lt', 'rdnr', 'rn', 'std', 'str', 'tel', 'ua', 'uvm', 'va', 'zb', 'bsi', 'tr', 'owasp', 'iso', 'iec', 'din', 'en' } def split_into_sentences(text: str) -> List[str]: """Split text into sentences with German language support.""" if not text: return [] text = re.sub(r'\s+', ' ', text).strip() # Protect abbreviations protected_text = text for abbrev in GERMAN_ABBREVIATIONS: pattern = re.compile(r'\b' + re.escape(abbrev) + r'\.', re.IGNORECASE) protected_text = pattern.sub(abbrev.replace('.', '') + '', protected_text) # Protect decimal/ordinal numbers and requirement IDs (e.g., "O.Data_1") protected_text = re.sub(r'(\d)\.(\d)', r'\1\2', protected_text) protected_text = re.sub(r'(\d+)\.(\s)', r'\1\2', protected_text) protected_text = re.sub(r'([A-Z])\.([A-Z])', r'\1\2', protected_text) # O.Data_1 # Split on sentence endings sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9])|(?<=[.!?])$' raw_sentences = re.split(sentence_pattern, protected_text) # Restore protected characters sentences = [] for s in raw_sentences: s = s.replace('', '.').replace('', '.').replace('', '.').replace('', '.').replace('', '.') s = s.strip() if s: sentences.append(s) return sentences def split_into_paragraphs(text: str) -> List[str]: """Split text into paragraphs.""" if not text: return [] raw_paragraphs = re.split(r'\n\s*\n', text) return [para.strip() for para in raw_paragraphs if para.strip()] def chunk_text_semantic( text: str, chunk_size: int = 1000, overlap: int = 200, ) -> List[Tuple[str, int]]: """ Semantic chunking that respects paragraph and sentence boundaries. Matches NIBIS chunking strategy for consistency. Returns list of (chunk_text, start_position) tuples. """ if not text: return [] if len(text) <= chunk_size: return [(text.strip(), 0)] paragraphs = split_into_paragraphs(text) overlap_sentences = max(1, overlap // 100) # Convert char overlap to sentence overlap chunks = [] current_chunk_parts: List[str] = [] current_chunk_length = 0 chunk_start = 0 position = 0 for para in paragraphs: if len(para) > chunk_size: # Large paragraph: split into sentences sentences = split_into_sentences(para) for sentence in sentences: sentence_len = len(sentence) if sentence_len > chunk_size: # Very long sentence: save current chunk first if current_chunk_parts: chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else [] current_chunk_parts = list(overlap_buffer) current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts) # Add long sentence as its own chunk chunks.append((sentence, position)) current_chunk_parts = [sentence] current_chunk_length = len(sentence) + 1 position += sentence_len + 1 continue if current_chunk_length + sentence_len + 1 > chunk_size and current_chunk_parts: # Current chunk is full, save it chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else [] current_chunk_parts = list(overlap_buffer) current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts) chunk_start = position - current_chunk_length current_chunk_parts.append(sentence) current_chunk_length += sentence_len + 1 position += sentence_len + 1 else: # Small paragraph: try to keep together para_len = len(para) if current_chunk_length + para_len + 2 > chunk_size and current_chunk_parts: chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) last_para_sentences = split_into_sentences(current_chunk_parts[-1] if current_chunk_parts else "") overlap_buffer = last_para_sentences[-overlap_sentences:] if overlap_sentences > 0 and last_para_sentences else [] current_chunk_parts = list(overlap_buffer) current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts) chunk_start = position - current_chunk_length if current_chunk_parts: current_chunk_parts.append(para) current_chunk_length += para_len + 2 else: current_chunk_parts = [para] current_chunk_length = para_len chunk_start = position position += para_len + 2 # Don't forget the last chunk if current_chunk_parts: chunk_text = ' '.join(current_chunk_parts) chunks.append((chunk_text, chunk_start)) # Clean up whitespace return [(re.sub(r'\s+', ' ', c).strip(), pos) for c, pos in chunks if c.strip()] def extract_article_info(text: str) -> Optional[Dict]: """Extract article number and paragraph from text.""" # Pattern for "Artikel X" or "Art. X" article_match = re.search(r'(?:Artikel|Art\.?)\s+(\d+)', text) paragraph_match = re.search(r'(?:Absatz|Abs\.?)\s+(\d+)', text) if article_match: return { "article": article_match.group(1), "paragraph": paragraph_match.group(1) if paragraph_match else None, } return None def html_to_text(html_content: str) -> str: """Convert HTML to clean text.""" # Remove script and style tags html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL) html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL) # Remove comments html_content = re.sub(r'', '', html_content, flags=re.DOTALL) # Replace common HTML entities html_content = html_content.replace(' ', ' ') html_content = html_content.replace('&', '&') html_content = html_content.replace('<', '<') html_content = html_content.replace('>', '>') html_content = html_content.replace('"', '"') # Convert breaks and paragraphs to newlines for better chunking html_content = re.sub(r'', '\n', html_content, flags=re.IGNORECASE) html_content = re.sub(r'

', '\n\n', html_content, flags=re.IGNORECASE) html_content = re.sub(r'', '\n', html_content, flags=re.IGNORECASE) html_content = re.sub(r'', '\n\n', html_content, flags=re.IGNORECASE) # Remove remaining HTML tags text = re.sub(r'<[^>]+>', ' ', html_content) # Clean up whitespace (but preserve paragraph breaks) text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n[ \t]+', '\n', text) text = re.sub(r'[ \t]+\n', '\n', text) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip()