""" Section splitter for shared URLs in unified compliance checks. When the same URL is used for multiple document types (e.g. /datenschutz used for DSI + Cookie + DSB), this module splits the text at headings and assigns the best-matching section to each doc_type. """ import logging import re logger = logging.getLogger(__name__) # Heading keyword → doc_type mapping _HEADING_TYPE_MAP = [ ("cookie", "cookie"), ("datenschutzbeauftragte", "dsb"), ("widerruf", "widerruf"), ("impressum", "impressum"), ("agb", "agb"), ("nutzungsbedingung", "agb"), ("social media", "social_media"), ("soziale medien", "social_media"), ("soziale netzwerke", "social_media"), ("google analytics", "cookie"), ("tracking", "cookie"), ("verwendung von cookies", "cookie"), ("nutzung von google", "cookie"), ("webanalyse", "cookie"), ] def split_shared_texts( doc_entries: list[dict], url_cache: dict[str, str], ) -> None: """When the same URL is used for multiple doc_types, split text into sections and assign the best-matching section to each doc_type. Mutates doc_entries in place. """ # Group entries by normalized URL url_groups: dict[str, list[int]] = {} for i, entry in enumerate(doc_entries): if not entry.get("url"): continue key = entry["url"].strip().rstrip("/").lower() url_groups.setdefault(key, []).append(i) for url_key, indices in url_groups.items(): if len(indices) < 2: continue full_text = doc_entries[indices[0]].get("text", "") if not full_text or len(full_text) < 200: continue sections = _split_at_headings(full_text) if not sections: continue for idx in indices: doc_type = doc_entries[idx]["doc_type"] best = _find_section_for_type(sections, doc_type) if best: doc_entries[idx]["text"] = best doc_entries[idx]["word_count"] = len(best.split()) typed = [s for s in sections if s.get("type")] logger.info( "Split shared URL into %d typed sections for %d doc_types: %s", len(typed), len(indices), ", ".join(f"{s['type']}({len(s['text'].split())}w)" for s in typed), ) def _split_at_headings(text: str) -> list[dict]: """Split text at classified headings into typed sections.""" lines = text.split("\n") sections: list[dict] = [] current_type: str | None = None current_heading = "" current_lines: list[str] = [] preamble_lines: list[str] = [] for line in lines: stripped = line.strip() classified = _classify_heading(stripped) if classified: # Save previous section if current_type and current_lines: _add_section(sections, current_heading, current_type, current_lines) elif not current_type and current_lines: preamble_lines.extend(current_lines) current_type = classified current_heading = stripped current_lines = [] else: current_lines.append(line) # Save last section if current_type and current_lines: _add_section(sections, current_heading, current_type, current_lines) elif current_lines: preamble_lines.extend(current_lines) # Add preamble as untyped section (main DSI text) if preamble_lines: preamble_text = "\n".join(preamble_lines) if len(preamble_text.split()) >= 30: sections.insert(0, { "heading": "(Haupttext)", "text": preamble_text, "type": "dse", }) return sections def _add_section( sections: list[dict], heading: str, sec_type: str, lines: list[str], ) -> None: """Add a section, merging with existing same-type sections.""" text = "\n".join(lines) if len(text.split()) < 20: return # Merge if same type already exists for s in sections: if s["type"] == sec_type: s["text"] += "\n\n" + text return sections.append({"heading": heading, "text": text, "type": sec_type}) def _classify_heading(line: str) -> str | None: """Classify a line as a section heading. Returns doc_type or None.""" if not line or len(line) < 5 or len(line) > 80: return None if line.endswith(".") or line.endswith(","): return None if len(line.split()) > 10: return None if not (line[0].isupper() or line[0].isdigit()): return None heading_lower = line.lower().strip() heading_lower = re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip() for keyword, doc_type in _HEADING_TYPE_MAP: if keyword in heading_lower: return doc_type return None def _find_section_for_type(sections: list[dict], doc_type: str) -> str | None: """Find the best text section for a given doc_type. DSI always gets the full text (main document). Other types get their matching section if found. """ if doc_type in ("dse", "datenschutz", "privacy"): return None # Keep full text for DSI for section in sections: if section.get("type") == doc_type and section.get("text"): return section["text"] return None # No match → keep full text def auto_fill_from_dsi(doc_entries: list[dict]) -> None: """Auto-fill empty document rows from sections found in the DSI text. If the user only entered the DSI URL but left Cookie/Social-Media empty, and the DSI text contains those sections, auto-fill them. """ # Find the DSI entry dsi_entry = None for entry in doc_entries: if entry["doc_type"] in ("dse", "datenschutz", "privacy") and entry.get("text"): dsi_entry = entry break if not dsi_entry: return dsi_text = dsi_entry["text"] if len(dsi_text) < 300: return # Split DSI into sections sections = _split_at_headings(dsi_text) if not sections: return # Find empty entries that could be filled from DSI sections filled = [] for entry in doc_entries: if entry.get("text") or entry.get("url"): continue # Already has content doc_type = entry["doc_type"] section_text = _find_section_for_type(sections, doc_type) if section_text and len(section_text.split()) >= 30: entry["text"] = section_text entry["word_count"] = len(section_text.split()) entry["url"] = f"{dsi_entry.get('url', '')} (Abschnitt)" filled.append(doc_type) if filled: logger.info( "Auto-filled %d empty rows from DSI sections: %s", len(filled), ", ".join(filled), )