From 4e9043f26d15bc2ab6378c2a2750e9cf8d513120 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 14 May 2026 23:19:39 +0200 Subject: [PATCH] feat(cross-doc): search all texts for all doc_types + misplacement finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cross-Document Intelligence: When a doc_type row is empty, searches ALL other loaded documents for that content. If found (e.g. Widerruf in AGB), extracts the section, runs the check, AND creates a finding: "Widerrufsbelehrung in falschem Dokument gefunden — schwer auffindbar" Keywords for: widerruf, cookie, social_media, impressum, agb, dsb. Integrated as Step 1c in compliance check pipeline. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/agent_compliance_check_routes.py | 16 +- .../compliance/services/section_splitter.py | 162 ++++++++++++++++++ 2 files changed, 176 insertions(+), 2 deletions(-) diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index e46bd371..874ab48e 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -178,11 +178,16 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): # 1. Same URL used for multiple doc_types → split by heading # 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows from compliance.services.section_splitter import ( - split_shared_texts, auto_fill_from_dsi, + split_shared_texts, auto_fill_from_dsi, cross_search_documents, ) split_shared_texts(doc_entries, url_text_cache) auto_fill_from_dsi(doc_entries) - # Refresh doc_texts after splitting + + # Step 1c: Cross-document search — find doc_types in wrong documents + _update(check_id, "Dokumente werden uebergreifend durchsucht...") + placement_findings = cross_search_documents(doc_entries) + + # Refresh doc_texts after all splitting/searching for entry in doc_entries: if entry.get("text"): doc_texts[entry["doc_type"]] = entry["text"] @@ -232,6 +237,13 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): # Apply profile context filter result = _apply_profile_filter(result, profile, doc_type) + # Add placement findings (doc found in wrong location) + for pf in placement_findings: + if pf.get("doc_type") == doc_type: + result.checks.insert(0, CheckItem(**{ + k: v for k, v in pf.items() if k != "doc_type" + })) + results.append(result) total_findings += result.findings_count diff --git a/backend-compliance/compliance/services/section_splitter.py b/backend-compliance/compliance/services/section_splitter.py index a94c4f4a..b7d31cdf 100644 --- a/backend-compliance/compliance/services/section_splitter.py +++ b/backend-compliance/compliance/services/section_splitter.py @@ -213,3 +213,165 @@ def auto_fill_from_dsi(doc_entries: list[dict]) -> None: "Auto-filled %d empty rows from DSI sections: %s", len(filled), ", ".join(filled), ) + + +# ── Cross-Document Search ──────────────────────────────────────────── + +# Keywords that indicate a doc_type is present in text (case-insensitive) +_DOC_TYPE_KEYWORDS = { + "widerruf": [ + "widerrufsrecht", "widerrufsbelehrung", "widerrufsfrist", + "binnen 14 tagen", "widerruf erklaeren", "muster-widerrufsformular", + ], + "cookie": [ + "cookie-richtlinie", "cookie-tabelle", "cookiebot", "consent-tool", + "arten der cookies", "session-cookie", "tracking-cookie", + ], + "social_media": [ + "gemeinsam verantwortlich", "art. 26 dsgvo", "fanpage", + "social media plugin", "facebook-seite", "instagram-profil", + ], + "impressum": [ + "angaben gemaess", "angaben gemäß", "§ 5 tmg", "§5 tmg", + "telemediengesetz", "impressum", + ], + "agb": [ + "allgemeine geschaeftsbedingungen", "allgemeine geschäftsbedingungen", + "geltungsbereich", "vertragsschluss", "§305 bgb", + ], + "dsb": [ + "datenschutzbeauftragte", "dsb@", "dpo@", + "datenschutzbeauftragten", + ], +} + + +def cross_search_documents(doc_entries: list[dict]) -> list[dict]: + """Search ALL texts for ALL doc_types and fill missing entries. + + For each empty doc_type row, search through all other documents' + texts to find the content. If found in the wrong document, extract + it, assign it, and create a finding about incorrect placement. + + Returns list of findings (misplacement warnings). + """ + findings: list[dict] = [] + + # Collect all available texts with their source doc_type + all_texts: list[tuple[str, str, str]] = [] # (doc_type, url, text) + for entry in doc_entries: + if entry.get("text") and len(entry["text"]) > 100: + all_texts.append((entry["doc_type"], entry.get("url", ""), entry["text"])) + + if not all_texts: + return findings + + # For each empty or short entry, search all other texts + for entry in doc_entries: + if entry.get("text") and len(entry["text"].split()) > 50: + continue # Already has content + + target_type = entry["doc_type"] + keywords = _DOC_TYPE_KEYWORDS.get(target_type, []) + if not keywords: + continue + + # Search all other texts for this doc_type's keywords + best_match: dict | None = None + best_score = 0 + + for source_type, source_url, source_text in all_texts: + if source_type == target_type: + continue # Don't search in the same doc_type + + text_lower = source_text.lower() + score = sum(1 for kw in keywords if kw in text_lower) + + if score >= 2 and score > best_score: + best_score = score + # Extract the relevant section + section = _extract_section_by_keywords(source_text, keywords) + if section and len(section.split()) >= 30: + best_match = { + "source_type": source_type, + "source_url": source_url, + "section_text": section, + "keyword_hits": score, + } + + if best_match: + entry["text"] = best_match["section_text"] + entry["word_count"] = len(best_match["section_text"].split()) + source_label = best_match["source_type"].upper() + entry["url"] = f"(gefunden in {source_label})" + + findings.append({ + "id": f"placement-{target_type}", + "label": f"{_type_label(target_type)} in falschem Dokument", + "passed": False, + "severity": "MEDIUM", + "level": 1, + "parent": None, + "skipped": False, + "matched_text": "", + "hint": ( + f"Die {_type_label(target_type)} wurde nicht als eigenes " + f"Dokument gefunden, sondern in der/den {source_label} " + f"({best_match['source_url']}). Gemaess Art. 246a EGBGB / " + f"§312d BGB muss die {_type_label(target_type)} leicht " + f"auffindbar und klar erkennbar sein. Empfehlung: Als " + f"eigenen Link im Footer oder als separates Dokument " + f"bereitstellen." + ), + "source": "cross_document_search", + "doc_type": target_type, + }) + + logger.info( + "Cross-doc: Found %s in %s (%d keywords, %d words)", + target_type, best_match["source_type"], + best_match["keyword_hits"], + entry["word_count"], + ) + + return findings + + +def _extract_section_by_keywords( + text: str, keywords: list[str], +) -> str | None: + """Extract the section of text around the keyword matches.""" + text_lower = text.lower() + lines = text.split("\n") + + # Find first and last line containing any keyword + first_line = len(lines) + last_line = 0 + for i, line in enumerate(lines): + line_lower = line.lower() + if any(kw in line_lower for kw in keywords): + first_line = min(first_line, i) + last_line = max(last_line, i) + + if first_line >= last_line: + return None + + # Expand to include context (5 lines before first, 10 after last) + start = max(0, first_line - 5) + end = min(len(lines), last_line + 10) + + section = "\n".join(lines[start:end]) + return section if len(section.split()) >= 30 else None + + +def _type_label(doc_type: str) -> str: + labels = { + "widerruf": "Widerrufsbelehrung", + "cookie": "Cookie-Richtlinie", + "social_media": "Social-Media-Datenschutz", + "impressum": "Impressum", + "agb": "AGB", + "dsb": "DSB-Kontakt", + "dse": "Datenschutzerklaerung", + } + return labels.get(doc_type, doc_type)