""" Legal Templates Chunking — text splitting, type inference, and chunk creation. Extracted from legal_templates_ingestion.py to keep files under 500 LOC. Lizenz: Apache 2.0 """ import re from dataclasses import dataclass, field from datetime import datetime from typing import List, Optional from template_sources import SourceConfig from github_crawler import ExtractedDocument # Chunking configuration defaults (can be overridden by env vars in ingestion module) DEFAULT_CHUNK_SIZE = 1000 DEFAULT_CHUNK_OVERLAP = 200 @dataclass class TemplateChunk: """A chunk of template text ready for indexing.""" text: str chunk_index: int document_title: str template_type: str clause_category: Optional[str] language: str jurisdiction: str license_id: str license_name: str license_url: str attribution_required: bool share_alike: bool no_derivatives: bool commercial_use: bool source_name: str source_url: str source_repo: Optional[str] source_commit: Optional[str] source_file: str source_hash: str attribution_text: Optional[str] copyright_notice: Optional[str] is_complete_document: bool is_modular: bool requires_customization: bool placeholders: List[str] training_allowed: bool output_allowed: bool modification_allowed: bool distortion_prohibited: bool @dataclass class IngestionStatus: """Status of a source ingestion.""" source_name: str status: str # "pending", "running", "completed", "failed" documents_found: int = 0 chunks_created: int = 0 chunks_indexed: int = 0 errors: List[str] = field(default_factory=list) started_at: Optional[datetime] = None completed_at: Optional[datetime] = None def split_sentences(text: str) -> List[str]: """Split text into sentences with basic abbreviation handling.""" # Protect common abbreviations abbreviations = ['bzw', 'ca', 'd.h', 'etc', 'ggf', 'inkl', 'u.a', 'usw', 'z.B', 'z.b', 'e.g', 'i.e', 'vs', 'no'] protected = text for abbr in abbreviations: pattern = re.compile(r'\b' + re.escape(abbr) + r'\.', re.IGNORECASE) protected = pattern.sub(abbr.replace('.', '') + '', protected) # Protect decimal numbers protected = re.sub(r'(\d)\.(\d)', r'\1\2', protected) # Split on sentence endings sentences = re.split(r'(?<=[.!?])\s+', protected) # Restore protected characters result = [] for s in sentences: s = s.replace('', '.').replace('', '.').replace('', '.') s = s.strip() if s: result.append(s) return result def chunk_text( text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_CHUNK_OVERLAP, ) -> List[str]: """ Split text into overlapping chunks. Respects paragraph and sentence boundaries where possible. """ if not text: return [] if len(text) <= chunk_size: return [text.strip()] # Split into paragraphs first paragraphs = text.split('\n\n') chunks = [] current_chunk: List[str] = [] current_length = 0 for para in paragraphs: para = para.strip() if not para: continue para_length = len(para) if para_length > chunk_size: # Large paragraph: split by sentences if current_chunk: chunks.append('\n\n'.join(current_chunk)) current_chunk = [] current_length = 0 # Split long paragraph by sentences sentences = split_sentences(para) for sentence in sentences: if current_length + len(sentence) + 1 > chunk_size: if current_chunk: chunks.append(' '.join(current_chunk)) # Keep overlap overlap_count = max(1, len(current_chunk) // 3) current_chunk = current_chunk[-overlap_count:] current_length = sum(len(s) + 1 for s in current_chunk) current_chunk.append(sentence) current_length += len(sentence) + 1 elif current_length + para_length + 2 > chunk_size: # Paragraph would exceed chunk size if current_chunk: chunks.append('\n\n'.join(current_chunk)) current_chunk = [] current_length = 0 current_chunk.append(para) current_length = para_length else: current_chunk.append(para) current_length += para_length + 2 # Add final chunk if current_chunk: chunks.append('\n\n'.join(current_chunk)) return [c.strip() for c in chunks if c.strip()] def infer_template_type(doc: ExtractedDocument, source: SourceConfig) -> str: """Infer the template type from document content and metadata.""" text_lower = doc.text.lower() title_lower = doc.title.lower() # Check known indicators type_indicators = { "privacy_policy": ["datenschutz", "privacy", "personal data", "personenbezogen"], "terms_of_service": ["nutzungsbedingungen", "terms of service", "terms of use", "agb"], "cookie_banner": ["cookie", "cookies", "tracking"], "impressum": ["impressum", "legal notice", "imprint"], "widerruf": ["widerruf", "cancellation", "withdrawal", "right to cancel"], "dpa": ["auftragsverarbeitung", "data processing agreement", "dpa"], "sla": ["service level", "availability", "uptime"], "nda": ["confidential", "non-disclosure", "geheimhaltung", "vertraulich"], "community_guidelines": ["community", "guidelines", "conduct", "verhaltens"], "acceptable_use": ["acceptable use", "acceptable usage", "nutzungsrichtlinien"], } for template_type, indicators in type_indicators.items(): for indicator in indicators: if indicator in text_lower or indicator in title_lower: return template_type # Fall back to source's first template type if source.template_types: return source.template_types[0] return "clause" # Generic fallback def infer_clause_category(text: str) -> Optional[str]: """Infer the clause category from text content.""" text_lower = text.lower() categories = { "haftung": ["haftung", "liability", "haftungsausschluss", "limitation"], "datenschutz": ["datenschutz", "privacy", "personal data", "personenbezogen"], "widerruf": ["widerruf", "cancellation", "withdrawal"], "gewaehrleistung": ["gewaehrleistung", "warranty", "garantie"], "kuendigung": ["kuendigung", "termination", "beendigung"], "zahlung": ["zahlung", "payment", "preis", "price"], "gerichtsstand": ["gerichtsstand", "jurisdiction", "governing law"], "aenderungen": ["aenderung", "modification", "amendment"], "schlussbestimmungen": ["schlussbestimmung", "miscellaneous", "final provisions"], } for category, indicators in categories.items(): for indicator in indicators: if indicator in text_lower: return category return None def create_chunks( doc: ExtractedDocument, source: SourceConfig, chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, ) -> List[TemplateChunk]: """Create template chunks from an extracted document.""" license_info = source.license_info template_type = infer_template_type(doc, source) # Chunk the text text_chunks = chunk_text(doc.text, chunk_size, chunk_overlap) chunks = [] for i, chunk_text_str in enumerate(text_chunks): # Determine if this is a complete document or a clause is_complete = len(text_chunks) == 1 and len(chunk_text_str) > 500 is_modular = len(doc.sections) > 0 or '##' in doc.text requires_customization = len(doc.placeholders) > 0 # Generate attribution text attribution_text = None if license_info.attribution_required: attribution_text = license_info.get_attribution_text( source.name, doc.source_url or source.get_source_url() ) chunk = TemplateChunk( text=chunk_text_str, chunk_index=i, document_title=doc.title, template_type=template_type, clause_category=infer_clause_category(chunk_text_str), language=doc.language, jurisdiction=source.jurisdiction, license_id=license_info.id.value, license_name=license_info.name, license_url=license_info.url, attribution_required=license_info.attribution_required, share_alike=license_info.share_alike, no_derivatives=license_info.no_derivatives, commercial_use=license_info.commercial_use, source_name=source.name, source_url=doc.source_url or source.get_source_url(), source_repo=source.repo_url, source_commit=doc.source_commit, source_file=doc.file_path, source_hash=doc.source_hash, attribution_text=attribution_text, copyright_notice=None, is_complete_document=is_complete, is_modular=is_modular, requires_customization=requires_customization, placeholders=doc.placeholders, training_allowed=license_info.training_allowed, output_allowed=license_info.output_allowed, modification_allowed=license_info.modification_allowed, distortion_prohibited=license_info.distortion_prohibited, ) chunks.append(chunk) return chunks