""" Section splitter for shared URLs in unified compliance checks. When the same URL is used for multiple document types (e.g. /datenschutz used for DSI + Cookie + DSB), this module splits the text at headings and assigns the best-matching section to each doc_type. """ import logging import re logger = logging.getLogger(__name__) # Heading keyword → doc_type mapping _HEADING_TYPE_MAP = [ ("cookie", "cookie"), ("datenschutzbeauftragte", "dsb"), ("widerruf", "widerruf"), ("impressum", "impressum"), ("agb", "agb"), ("nutzungsbedingung", "agb"), ("social media", "social_media"), ("soziale medien", "social_media"), ("soziale netzwerke", "social_media"), ("google analytics", "cookie"), ("tracking", "cookie"), ("verwendung von cookies", "cookie"), ("nutzung von google", "cookie"), ("webanalyse", "cookie"), ] def split_shared_texts( doc_entries: list[dict], url_cache: dict[str, str], ) -> None: """When the same URL is used for multiple doc_types, split text into sections and assign the best-matching section to each doc_type. Mutates doc_entries in place. """ # Group entries by normalized URL url_groups: dict[str, list[int]] = {} for i, entry in enumerate(doc_entries): if not entry.get("url"): continue key = entry["url"].strip().rstrip("/").lower() url_groups.setdefault(key, []).append(i) for url_key, indices in url_groups.items(): if len(indices) < 2: continue full_text = doc_entries[indices[0]].get("text", "") if not full_text or len(full_text) < 200: continue sections = _split_at_headings(full_text) if not sections: continue for idx in indices: doc_type = doc_entries[idx]["doc_type"] best = _find_section_for_type(sections, doc_type) if best: doc_entries[idx]["text"] = best doc_entries[idx]["word_count"] = len(best.split()) typed = [s for s in sections if s.get("type")] logger.info( "Split shared URL into %d typed sections for %d doc_types: %s", len(typed), len(indices), ", ".join(f"{s['type']}({len(s['text'].split())}w)" for s in typed), ) def _split_at_headings(text: str) -> list[dict]: """Split text at classified headings into typed sections.""" lines = text.split("\n") sections: list[dict] = [] current_type: str | None = None current_heading = "" current_lines: list[str] = [] preamble_lines: list[str] = [] for line in lines: stripped = line.strip() classified = _classify_heading(stripped) if classified: # Save previous section if current_type and current_lines: _add_section(sections, current_heading, current_type, current_lines) elif not current_type and current_lines: preamble_lines.extend(current_lines) current_type = classified current_heading = stripped current_lines = [] else: current_lines.append(line) # Save last section if current_type and current_lines: _add_section(sections, current_heading, current_type, current_lines) elif current_lines: preamble_lines.extend(current_lines) # Add preamble as untyped section (main DSI text) if preamble_lines: preamble_text = "\n".join(preamble_lines) if len(preamble_text.split()) >= 30: sections.insert(0, { "heading": "(Haupttext)", "text": preamble_text, "type": "dse", }) return sections def _add_section( sections: list[dict], heading: str, sec_type: str, lines: list[str], ) -> None: """Add a section, merging with existing same-type sections.""" text = "\n".join(lines) if len(text.split()) < 20: return # Merge if same type already exists for s in sections: if s["type"] == sec_type: s["text"] += "\n\n" + text return sections.append({"heading": heading, "text": text, "type": sec_type}) def _classify_heading(line: str) -> str | None: """Classify a line as a section heading. Returns doc_type or None.""" if not line or len(line) < 5 or len(line) > 80: return None if line.endswith(".") or line.endswith(","): return None if len(line.split()) > 10: return None if not (line[0].isupper() or line[0].isdigit()): return None heading_lower = line.lower().strip() heading_lower = re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip() for keyword, doc_type in _HEADING_TYPE_MAP: if keyword in heading_lower: return doc_type return None def _find_section_for_type(sections: list[dict], doc_type: str) -> str | None: """Find the best text section for a given doc_type. DSI always gets the full text (main document). Other types get their matching section if found. """ if doc_type in ("dse", "datenschutz", "privacy"): return None # Keep full text for DSI for section in sections: if section.get("type") == doc_type and section.get("text"): return section["text"] return None # No match → keep full text def auto_fill_from_dsi(doc_entries: list[dict]) -> None: """Auto-fill empty document rows from sections found in the DSI text. If the user only entered the DSI URL but left Cookie/Social-Media empty, and the DSI text contains those sections, auto-fill them. """ # Find the DSI entry dsi_entry = None for entry in doc_entries: if entry["doc_type"] in ("dse", "datenschutz", "privacy") and entry.get("text"): dsi_entry = entry break if not dsi_entry: return dsi_text = dsi_entry["text"] if len(dsi_text) < 300: return # Split DSI into sections sections = _split_at_headings(dsi_text) if not sections: return # Find empty entries that could be filled from DSI sections filled = [] for entry in doc_entries: if entry.get("text") or entry.get("url"): continue # Already has content # Auto-discovery already tried + decided: skip. Don't override its # 'NICHT GEFUNDEN' verdict with a pseudo-match from DSI sections # (which produces false MANGELHAFT findings for genuinely missing # docs like BMW's AGB). if entry.get("discovery_attempted") and not entry.get("auto_discovered"): continue doc_type = entry["doc_type"] section_text = _find_section_for_type(sections, doc_type) if section_text and len(section_text.split()) >= 30: entry["text"] = section_text entry["word_count"] = len(section_text.split()) entry["url"] = f"{dsi_entry.get('url', '')} (Abschnitt)" filled.append(doc_type) if filled: logger.info( "Auto-filled %d empty rows from DSI sections: %s", len(filled), ", ".join(filled), ) # ── Cross-Document Search ──────────────────────────────────────────── # Keywords that indicate a doc_type is present in text (case-insensitive) _DOC_TYPE_KEYWORDS = { "widerruf": [ "widerrufsrecht", "widerrufsbelehrung", "widerrufsfrist", "binnen 14 tagen", "widerruf erklaeren", "muster-widerrufsformular", ], "cookie": [ "cookie-richtlinie", "cookie-tabelle", "cookiebot", "consent-tool", "arten der cookies", "session-cookie", "tracking-cookie", ], "social_media": [ "gemeinsam verantwortlich", "art. 26 dsgvo", "fanpage", "social media plugin", "facebook-seite", "instagram-profil", ], "impressum": [ "angaben gemaess", "angaben gemäß", "§ 5 tmg", "§5 tmg", "telemediengesetz", "impressum", ], "agb": [ "allgemeine geschaeftsbedingungen", "allgemeine geschäftsbedingungen", "geltungsbereich", "vertragsschluss", "§305 bgb", ], "dsb": [ "datenschutzbeauftragte", "dsb@", "dpo@", "datenschutzbeauftragten", ], } def cross_search_documents(doc_entries: list[dict]) -> list[dict]: """Search ALL texts for ALL doc_types and fill missing entries. For each empty doc_type row, search through all other documents' texts to find the content. If found in the wrong document, extract it, assign it, and create a finding about incorrect placement. Returns list of findings (misplacement warnings). """ findings: list[dict] = [] # Collect all available texts with their source doc_type all_texts: list[tuple[str, str, str]] = [] # (doc_type, url, text) for entry in doc_entries: if entry.get("text") and len(entry["text"]) > 100: all_texts.append((entry["doc_type"], entry.get("url", ""), entry["text"])) if not all_texts: return findings # For each entry, check if: # a) It has text but the text doesn't match the doc_type → search other texts # (Empty entries from auto-discovery 'not found' are NOT pseudo-filled # from other docs — that would silently revive a 'NICHT GEFUNDEN' verdict # as a misleading MANGELHAFT row.) for entry in doc_entries: target_type = entry["doc_type"] keywords = _DOC_TYPE_KEYWORDS.get(target_type, []) if not keywords: continue has_text = entry.get("text") and len(entry["text"].split()) > 50 text_matches = False if has_text: entry_lower = entry["text"].lower() match_score = sum(1 for kw in keywords if kw in entry_lower) text_matches = match_score >= 2 if has_text and text_matches: continue # Text present AND matches doc_type → skip # Skip empty entries the auto-discovery has already ruled on. if not has_text and entry.get("discovery_attempted") and not entry.get("auto_discovered"): continue # Search all other texts for this doc_type's keywords best_match: dict | None = None best_score = 0 for source_type, source_url, source_text in all_texts: if source_type == target_type: continue text_lower = source_text.lower() score = sum(1 for kw in keywords if kw in text_lower) if score >= 2 and score > best_score: best_score = score # Extract the relevant section section = _extract_section_by_keywords(source_text, keywords) if section and len(section.split()) >= 30: best_match = { "source_type": source_type, "source_url": source_url, "section_text": section, "keyword_hits": score, } if best_match: entry["text"] = best_match["section_text"] entry["word_count"] = len(best_match["section_text"].split()) source_label = best_match["source_type"].upper() entry["url"] = f"(gefunden in {source_label})" findings.append({ "id": f"placement-{target_type}", "label": f"{_type_label(target_type)} in falschem Dokument", "passed": False, "severity": "MEDIUM", "level": 1, "parent": None, "skipped": False, "matched_text": "", "hint": ( f"Die {_type_label(target_type)} wurde nicht als eigenes " f"Dokument gefunden, sondern in der/den {source_label} " f"({best_match['source_url']}). Gemaess Art. 246a EGBGB / " f"§312d BGB muss die {_type_label(target_type)} leicht " f"auffindbar und klar erkennbar sein. Empfehlung: Als " f"eigenen Link im Footer oder als separates Dokument " f"bereitstellen." ), "source": "cross_document_search", "doc_type": target_type, }) logger.info( "Cross-doc: Found %s in %s (%d keywords, %d words)", target_type, best_match["source_type"], best_match["keyword_hits"], entry["word_count"], ) elif has_text and not text_matches: # Text present but doesn't match — wrong text assigned findings.append({ "id": f"wrong-text-{target_type}", "label": f"{_type_label(target_type)} nicht im eingereichten Text", "passed": False, "severity": "HIGH", "level": 1, "parent": None, "skipped": False, "matched_text": "", "hint": ( f"Der eingereichte Text enthaelt keine " f"{_type_label(target_type)}. Moeglicherweise wurde " f"die falsche URL eingegeben. Das System konnte die " f"{_type_label(target_type)} auch in keinem anderen " f"eingereichten Dokument finden." ), "source": "cross_document_search", "doc_type": target_type, }) logger.info("Cross-doc: %s text doesn't match doc_type, not found elsewhere", target_type) return findings def _extract_section_by_keywords( text: str, keywords: list[str], ) -> str | None: """Extract the section of text around the keyword matches.""" text_lower = text.lower() lines = text.split("\n") # Find first and last line containing any keyword first_line = len(lines) last_line = 0 for i, line in enumerate(lines): line_lower = line.lower() if any(kw in line_lower for kw in keywords): first_line = min(first_line, i) last_line = max(last_line, i) if first_line >= last_line: return None # Expand to include context (5 lines before first, 10 after last) start = max(0, first_line - 5) end = min(len(lines), last_line + 10) section = "\n".join(lines[start:end]) return section if len(section.split()) >= 30 else None def _type_label(doc_type: str) -> str: labels = { "widerruf": "Widerrufsbelehrung", "cookie": "Cookie-Richtlinie", "social_media": "Social-Media-Datenschutz", "impressum": "Impressum", "agb": "AGB", "dsb": "DSB-Kontakt", "dse": "Datenschutzerklaerung", } return labels.get(doc_type, doc_type)