Files
breakpilot-lehrer/klausur-service/backend/legal_corpus_chunking.py
Benjamin Admin b2a0126f14 [split-required] Split remaining Python monoliths (Phase 1 continued)
klausur-service (7 monoliths):
- grid_editor_helpers.py (1,737 → 5 files: columns, filters, headers, zones)
- cv_cell_grid.py (1,675 → 7 files: build, legacy, streaming, merge, vocab)
- worksheet_editor_api.py (1,305 → 4 files: models, AI, reconstruct, routes)
- legal_corpus_ingestion.py (1,280 → 3 files: registry, chunking, ingestion)
- cv_review.py (1,248 → 4 files: pipeline, spell, LLM, barrel)
- cv_preprocessing.py (1,166 → 3 files: deskew, dewarp, barrel)
- rbac.py, admin_api.py, routes/eh.py remain (next batch)

backend-lehrer (1 monolith):
- classroom_engine/repository.py (1,705 → 7 files by domain)

All re-export barrels preserve backward compatibility.
Zero import errors verified.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 22:47:59 +02:00

198 lines
7.9 KiB
Python

"""
Legal Corpus Chunking — Text splitting, semantic chunking, and HTML-to-text conversion.
Provides German-aware sentence splitting, paragraph splitting, semantic chunking
with overlap, and HTML-to-text conversion for legal document ingestion.
"""
import re
from typing import Dict, List, Optional, Tuple
# German abbreviations that don't end sentences
GERMAN_ABBREVIATIONS = {
'bzw', 'ca', 'chr', 'd.h', 'dr', 'etc', 'evtl', 'ggf', 'inkl', 'max',
'min', 'mio', 'mrd', 'nr', 'prof', 's', 'sog', 'u.a', 'u.ä', 'usw',
'v.a', 'vgl', 'vs', 'z.b', 'z.t', 'zzgl', 'abs', 'art', 'aufl',
'bd', 'betr', 'bzgl', 'dgl', 'ebd', 'hrsg', 'jg', 'kap', 'lt',
'rdnr', 'rn', 'std', 'str', 'tel', 'ua', 'uvm', 'va', 'zb',
'bsi', 'tr', 'owasp', 'iso', 'iec', 'din', 'en'
}
def split_into_sentences(text: str) -> List[str]:
"""Split text into sentences with German language support."""
if not text:
return []
text = re.sub(r'\s+', ' ', text).strip()
# Protect abbreviations
protected_text = text
for abbrev in GERMAN_ABBREVIATIONS:
pattern = re.compile(r'\b' + re.escape(abbrev) + r'\.', re.IGNORECASE)
protected_text = pattern.sub(abbrev.replace('.', '<DOT>') + '<ABBR>', protected_text)
# Protect decimal/ordinal numbers and requirement IDs (e.g., "O.Data_1")
protected_text = re.sub(r'(\d)\.(\d)', r'\1<DECIMAL>\2', protected_text)
protected_text = re.sub(r'(\d+)\.(\s)', r'\1<ORD>\2', protected_text)
protected_text = re.sub(r'([A-Z])\.([A-Z])', r'\1<REQ>\2', protected_text) # O.Data_1
# Split on sentence endings
sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9])|(?<=[.!?])$'
raw_sentences = re.split(sentence_pattern, protected_text)
# Restore protected characters
sentences = []
for s in raw_sentences:
s = s.replace('<DOT>', '.').replace('<ABBR>', '.').replace('<DECIMAL>', '.').replace('<ORD>', '.').replace('<REQ>', '.')
s = s.strip()
if s:
sentences.append(s)
return sentences
def split_into_paragraphs(text: str) -> List[str]:
"""Split text into paragraphs."""
if not text:
return []
raw_paragraphs = re.split(r'\n\s*\n', text)
return [para.strip() for para in raw_paragraphs if para.strip()]
def chunk_text_semantic(
text: str,
chunk_size: int = 1000,
overlap: int = 200,
) -> List[Tuple[str, int]]:
"""
Semantic chunking that respects paragraph and sentence boundaries.
Matches NIBIS chunking strategy for consistency.
Returns list of (chunk_text, start_position) tuples.
"""
if not text:
return []
if len(text) <= chunk_size:
return [(text.strip(), 0)]
paragraphs = split_into_paragraphs(text)
overlap_sentences = max(1, overlap // 100) # Convert char overlap to sentence overlap
chunks = []
current_chunk_parts: List[str] = []
current_chunk_length = 0
chunk_start = 0
position = 0
for para in paragraphs:
if len(para) > chunk_size:
# Large paragraph: split into sentences
sentences = split_into_sentences(para)
for sentence in sentences:
sentence_len = len(sentence)
if sentence_len > chunk_size:
# Very long sentence: save current chunk first
if current_chunk_parts:
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else []
current_chunk_parts = list(overlap_buffer)
current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts)
# Add long sentence as its own chunk
chunks.append((sentence, position))
current_chunk_parts = [sentence]
current_chunk_length = len(sentence) + 1
position += sentence_len + 1
continue
if current_chunk_length + sentence_len + 1 > chunk_size and current_chunk_parts:
# Current chunk is full, save it
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else []
current_chunk_parts = list(overlap_buffer)
current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts)
chunk_start = position - current_chunk_length
current_chunk_parts.append(sentence)
current_chunk_length += sentence_len + 1
position += sentence_len + 1
else:
# Small paragraph: try to keep together
para_len = len(para)
if current_chunk_length + para_len + 2 > chunk_size and current_chunk_parts:
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
last_para_sentences = split_into_sentences(current_chunk_parts[-1] if current_chunk_parts else "")
overlap_buffer = last_para_sentences[-overlap_sentences:] if overlap_sentences > 0 and last_para_sentences else []
current_chunk_parts = list(overlap_buffer)
current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts)
chunk_start = position - current_chunk_length
if current_chunk_parts:
current_chunk_parts.append(para)
current_chunk_length += para_len + 2
else:
current_chunk_parts = [para]
current_chunk_length = para_len
chunk_start = position
position += para_len + 2
# Don't forget the last chunk
if current_chunk_parts:
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
# Clean up whitespace
return [(re.sub(r'\s+', ' ', c).strip(), pos) for c, pos in chunks if c.strip()]
def extract_article_info(text: str) -> Optional[Dict]:
"""Extract article number and paragraph from text."""
# Pattern for "Artikel X" or "Art. X"
article_match = re.search(r'(?:Artikel|Art\.?)\s+(\d+)', text)
paragraph_match = re.search(r'(?:Absatz|Abs\.?)\s+(\d+)', text)
if article_match:
return {
"article": article_match.group(1),
"paragraph": paragraph_match.group(1) if paragraph_match else None,
}
return None
def html_to_text(html_content: str) -> str:
"""Convert HTML to clean text."""
# Remove script and style tags
html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL)
# Remove comments
html_content = re.sub(r'<!--.*?-->', '', html_content, flags=re.DOTALL)
# Replace common HTML entities
html_content = html_content.replace('&nbsp;', ' ')
html_content = html_content.replace('&amp;', '&')
html_content = html_content.replace('&lt;', '<')
html_content = html_content.replace('&gt;', '>')
html_content = html_content.replace('&quot;', '"')
# Convert breaks and paragraphs to newlines for better chunking
html_content = re.sub(r'<br\s*/?>', '\n', html_content, flags=re.IGNORECASE)
html_content = re.sub(r'</p>', '\n\n', html_content, flags=re.IGNORECASE)
html_content = re.sub(r'</div>', '\n', html_content, flags=re.IGNORECASE)
html_content = re.sub(r'</h[1-6]>', '\n\n', html_content, flags=re.IGNORECASE)
# Remove remaining HTML tags
text = re.sub(r'<[^>]+>', ' ', html_content)
# Clean up whitespace (but preserve paragraph breaks)
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n[ \t]+', '\n', text)
text = re.sub(r'[ \t]+\n', '\n', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()