""" Section splitter for shared URLs in unified compliance checks. When the same URL is used for multiple document types (e.g. /datenschutz used for DSI + Cookie + DSB), this module splits the text at headings and assigns the best-matching section to each doc_type. """ import logging import re logger = logging.getLogger(__name__) # Heading keyword → doc_type mapping _HEADING_TYPE_MAP = [ ("cookie", "cookie"), ("datenschutzbeauftragte", "dsb"), ("widerruf", "widerruf"), ("impressum", "impressum"), ("agb", "agb"), ("nutzungsbedingung", "agb"), ("social media", "social_media"), ("soziale medien", "social_media"), ("soziale netzwerke", "social_media"), ("google analytics", "cookie"), ("tracking", "cookie"), ("verwendung von cookies", "cookie"), ("nutzung von google", "cookie"), ("webanalyse", "cookie"), ] def split_shared_texts( doc_entries: list[dict], url_cache: dict[str, str], ) -> None: """When the same URL is used for multiple doc_types, split text into sections and assign the best-matching section to each doc_type. Mutates doc_entries in place. """ # Group entries by normalized URL url_groups: dict[str, list[int]] = {} for i, entry in enumerate(doc_entries): if not entry.get("url"): continue key = entry["url"].strip().rstrip("/").lower() url_groups.setdefault(key, []).append(i) for url_key, indices in url_groups.items(): if len(indices) < 2: continue full_text = doc_entries[indices[0]].get("text", "") if not full_text or len(full_text) < 200: continue sections = _split_at_headings(full_text) if not sections: continue for idx in indices: doc_type = doc_entries[idx]["doc_type"] best = _find_section_for_type(sections, doc_type) if best: doc_entries[idx]["text"] = best doc_entries[idx]["word_count"] = len(best.split()) typed = [s for s in sections if s.get("type")] logger.info( "Split shared URL into %d typed sections for %d doc_types: %s", len(typed), len(indices), ", ".join(f"{s['type']}({len(s['text'].split())}w)" for s in typed), ) def _split_at_headings(text: str) -> list[dict]: """Split text at classified headings into typed sections.""" lines = text.split("\n") sections: list[dict] = [] current_type: str | None = None current_heading = "" current_lines: list[str] = [] preamble_lines: list[str] = [] for line in lines: stripped = line.strip() classified = _classify_heading(stripped) if classified: # Save previous section if current_type and current_lines: _add_section(sections, current_heading, current_type, current_lines) elif not current_type and current_lines: preamble_lines.extend(current_lines) current_type = classified current_heading = stripped current_lines = [] else: current_lines.append(line) # Save last section if current_type and current_lines: _add_section(sections, current_heading, current_type, current_lines) elif current_lines: preamble_lines.extend(current_lines) # Add preamble as untyped section (main DSI text) if preamble_lines: preamble_text = "\n".join(preamble_lines) if len(preamble_text.split()) >= 30: sections.insert(0, { "heading": "(Haupttext)", "text": preamble_text, "type": "dse", }) return sections def _add_section( sections: list[dict], heading: str, sec_type: str, lines: list[str], ) -> None: """Add a section, merging with existing same-type sections.""" text = "\n".join(lines) if len(text.split()) < 20: return # Merge if same type already exists for s in sections: if s["type"] == sec_type: s["text"] += "\n\n" + text return sections.append({"heading": heading, "text": text, "type": sec_type}) def _classify_heading(line: str) -> str | None: """Classify a line as a section heading. Returns doc_type or None.""" if not line or len(line) < 5 or len(line) > 80: return None if line.endswith(".") or line.endswith(","): return None if len(line.split()) > 10: return None if not (line[0].isupper() or line[0].isdigit()): return None heading_lower = line.lower().strip() heading_lower = re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip() for keyword, doc_type in _HEADING_TYPE_MAP: if keyword in heading_lower: return doc_type return None def _find_section_for_type(sections: list[dict], doc_type: str) -> str | None: """Find the best text section for a given doc_type. DSI always gets the full text (main document). Other types get their matching section if found. """ if doc_type in ("dse", "datenschutz", "privacy"): return None # Keep full text for DSI for section in sections: if section.get("type") == doc_type and section.get("text"): return section["text"] return None # No match → keep full text