feat(cross-doc): search all texts for all doc_types + misplacement finding

Cross-Document Intelligence: When a doc_type row is empty, searches
ALL other loaded documents for that content. If found (e.g. Widerruf
in AGB), extracts the section, runs the check, AND creates a finding:
"Widerrufsbelehrung in falschem Dokument gefunden — schwer auffindbar"

Keywords for: widerruf, cookie, social_media, impressum, agb, dsb.
Integrated as Step 1c in compliance check pipeline.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-14 23:19:39 +02:00
parent 29fbd03c79
commit 4e9043f26d
2 changed files with 176 additions and 2 deletions
@@ -178,11 +178,16 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
# 1. Same URL used for multiple doc_types → split by heading
# 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows
from compliance.services.section_splitter import (
split_shared_texts, auto_fill_from_dsi,
split_shared_texts, auto_fill_from_dsi, cross_search_documents,
)
split_shared_texts(doc_entries, url_text_cache)
auto_fill_from_dsi(doc_entries)
# Refresh doc_texts after splitting
# Step 1c: Cross-document search — find doc_types in wrong documents
_update(check_id, "Dokumente werden uebergreifend durchsucht...")
placement_findings = cross_search_documents(doc_entries)
# Refresh doc_texts after all splitting/searching
for entry in doc_entries:
if entry.get("text"):
doc_texts[entry["doc_type"]] = entry["text"]
@@ -232,6 +237,13 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
# Apply profile context filter
result = _apply_profile_filter(result, profile, doc_type)
# Add placement findings (doc found in wrong location)
for pf in placement_findings:
if pf.get("doc_type") == doc_type:
result.checks.insert(0, CheckItem(**{
k: v for k, v in pf.items() if k != "doc_type"
}))
results.append(result)
total_findings += result.findings_count
@@ -213,3 +213,165 @@ def auto_fill_from_dsi(doc_entries: list[dict]) -> None:
"Auto-filled %d empty rows from DSI sections: %s",
len(filled), ", ".join(filled),
)
# ── Cross-Document Search ────────────────────────────────────────────
# Keywords that indicate a doc_type is present in text (case-insensitive)
_DOC_TYPE_KEYWORDS = {
"widerruf": [
"widerrufsrecht", "widerrufsbelehrung", "widerrufsfrist",
"binnen 14 tagen", "widerruf erklaeren", "muster-widerrufsformular",
],
"cookie": [
"cookie-richtlinie", "cookie-tabelle", "cookiebot", "consent-tool",
"arten der cookies", "session-cookie", "tracking-cookie",
],
"social_media": [
"gemeinsam verantwortlich", "art. 26 dsgvo", "fanpage",
"social media plugin", "facebook-seite", "instagram-profil",
],
"impressum": [
"angaben gemaess", "angaben gemäß", "§ 5 tmg", "§5 tmg",
"telemediengesetz", "impressum",
],
"agb": [
"allgemeine geschaeftsbedingungen", "allgemeine geschäftsbedingungen",
"geltungsbereich", "vertragsschluss", "§305 bgb",
],
"dsb": [
"datenschutzbeauftragte", "dsb@", "dpo@",
"datenschutzbeauftragten",
],
}
def cross_search_documents(doc_entries: list[dict]) -> list[dict]:
"""Search ALL texts for ALL doc_types and fill missing entries.
For each empty doc_type row, search through all other documents'
texts to find the content. If found in the wrong document, extract
it, assign it, and create a finding about incorrect placement.
Returns list of findings (misplacement warnings).
"""
findings: list[dict] = []
# Collect all available texts with their source doc_type
all_texts: list[tuple[str, str, str]] = [] # (doc_type, url, text)
for entry in doc_entries:
if entry.get("text") and len(entry["text"]) > 100:
all_texts.append((entry["doc_type"], entry.get("url", ""), entry["text"]))
if not all_texts:
return findings
# For each empty or short entry, search all other texts
for entry in doc_entries:
if entry.get("text") and len(entry["text"].split()) > 50:
continue # Already has content
target_type = entry["doc_type"]
keywords = _DOC_TYPE_KEYWORDS.get(target_type, [])
if not keywords:
continue
# Search all other texts for this doc_type's keywords
best_match: dict | None = None
best_score = 0
for source_type, source_url, source_text in all_texts:
if source_type == target_type:
continue # Don't search in the same doc_type
text_lower = source_text.lower()
score = sum(1 for kw in keywords if kw in text_lower)
if score >= 2 and score > best_score:
best_score = score
# Extract the relevant section
section = _extract_section_by_keywords(source_text, keywords)
if section and len(section.split()) >= 30:
best_match = {
"source_type": source_type,
"source_url": source_url,
"section_text": section,
"keyword_hits": score,
}
if best_match:
entry["text"] = best_match["section_text"]
entry["word_count"] = len(best_match["section_text"].split())
source_label = best_match["source_type"].upper()
entry["url"] = f"(gefunden in {source_label})"
findings.append({
"id": f"placement-{target_type}",
"label": f"{_type_label(target_type)} in falschem Dokument",
"passed": False,
"severity": "MEDIUM",
"level": 1,
"parent": None,
"skipped": False,
"matched_text": "",
"hint": (
f"Die {_type_label(target_type)} wurde nicht als eigenes "
f"Dokument gefunden, sondern in der/den {source_label} "
f"({best_match['source_url']}). Gemaess Art. 246a EGBGB / "
f"§312d BGB muss die {_type_label(target_type)} leicht "
f"auffindbar und klar erkennbar sein. Empfehlung: Als "
f"eigenen Link im Footer oder als separates Dokument "
f"bereitstellen."
),
"source": "cross_document_search",
"doc_type": target_type,
})
logger.info(
"Cross-doc: Found %s in %s (%d keywords, %d words)",
target_type, best_match["source_type"],
best_match["keyword_hits"],
entry["word_count"],
)
return findings
def _extract_section_by_keywords(
text: str, keywords: list[str],
) -> str | None:
"""Extract the section of text around the keyword matches."""
text_lower = text.lower()
lines = text.split("\n")
# Find first and last line containing any keyword
first_line = len(lines)
last_line = 0
for i, line in enumerate(lines):
line_lower = line.lower()
if any(kw in line_lower for kw in keywords):
first_line = min(first_line, i)
last_line = max(last_line, i)
if first_line >= last_line:
return None
# Expand to include context (5 lines before first, 10 after last)
start = max(0, first_line - 5)
end = min(len(lines), last_line + 10)
section = "\n".join(lines[start:end])
return section if len(section.split()) >= 30 else None
def _type_label(doc_type: str) -> str:
labels = {
"widerruf": "Widerrufsbelehrung",
"cookie": "Cookie-Richtlinie",
"social_media": "Social-Media-Datenschutz",
"impressum": "Impressum",
"agb": "AGB",
"dsb": "DSB-Kontakt",
"dse": "Datenschutzerklaerung",
}
return labels.get(doc_type, doc_type)