bd2d6976d6
Cross-search now validates if existing text matches the expected doc_type using keyword scoring. If text is present but doesn't match (e.g. Nutzungsbedingungen in Widerruf row), searches other texts and creates a finding explaining the mismatch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
410 lines
14 KiB
Python
410 lines
14 KiB
Python
"""
|
|
Section splitter for shared URLs in unified compliance checks.
|
|
|
|
When the same URL is used for multiple document types (e.g. /datenschutz
|
|
used for DSI + Cookie + DSB), this module splits the text at headings
|
|
and assigns the best-matching section to each doc_type.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Heading keyword → doc_type mapping
|
|
_HEADING_TYPE_MAP = [
|
|
("cookie", "cookie"),
|
|
("datenschutzbeauftragte", "dsb"),
|
|
("widerruf", "widerruf"),
|
|
("impressum", "impressum"),
|
|
("agb", "agb"),
|
|
("nutzungsbedingung", "agb"),
|
|
("social media", "social_media"),
|
|
("soziale medien", "social_media"),
|
|
("soziale netzwerke", "social_media"),
|
|
("google analytics", "cookie"),
|
|
("tracking", "cookie"),
|
|
("verwendung von cookies", "cookie"),
|
|
("nutzung von google", "cookie"),
|
|
("webanalyse", "cookie"),
|
|
]
|
|
|
|
|
|
def split_shared_texts(
|
|
doc_entries: list[dict],
|
|
url_cache: dict[str, str],
|
|
) -> None:
|
|
"""When the same URL is used for multiple doc_types, split text into
|
|
sections and assign the best-matching section to each doc_type.
|
|
|
|
Mutates doc_entries in place.
|
|
"""
|
|
# Group entries by normalized URL
|
|
url_groups: dict[str, list[int]] = {}
|
|
for i, entry in enumerate(doc_entries):
|
|
if not entry.get("url"):
|
|
continue
|
|
key = entry["url"].strip().rstrip("/").lower()
|
|
url_groups.setdefault(key, []).append(i)
|
|
|
|
for url_key, indices in url_groups.items():
|
|
if len(indices) < 2:
|
|
continue
|
|
|
|
full_text = doc_entries[indices[0]].get("text", "")
|
|
if not full_text or len(full_text) < 200:
|
|
continue
|
|
|
|
sections = _split_at_headings(full_text)
|
|
if not sections:
|
|
continue
|
|
|
|
for idx in indices:
|
|
doc_type = doc_entries[idx]["doc_type"]
|
|
best = _find_section_for_type(sections, doc_type)
|
|
if best:
|
|
doc_entries[idx]["text"] = best
|
|
doc_entries[idx]["word_count"] = len(best.split())
|
|
|
|
typed = [s for s in sections if s.get("type")]
|
|
logger.info(
|
|
"Split shared URL into %d typed sections for %d doc_types: %s",
|
|
len(typed), len(indices),
|
|
", ".join(f"{s['type']}({len(s['text'].split())}w)" for s in typed),
|
|
)
|
|
|
|
|
|
def _split_at_headings(text: str) -> list[dict]:
|
|
"""Split text at classified headings into typed sections."""
|
|
lines = text.split("\n")
|
|
sections: list[dict] = []
|
|
current_type: str | None = None
|
|
current_heading = ""
|
|
current_lines: list[str] = []
|
|
preamble_lines: list[str] = []
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
classified = _classify_heading(stripped)
|
|
|
|
if classified:
|
|
# Save previous section
|
|
if current_type and current_lines:
|
|
_add_section(sections, current_heading, current_type, current_lines)
|
|
elif not current_type and current_lines:
|
|
preamble_lines.extend(current_lines)
|
|
|
|
current_type = classified
|
|
current_heading = stripped
|
|
current_lines = []
|
|
else:
|
|
current_lines.append(line)
|
|
|
|
# Save last section
|
|
if current_type and current_lines:
|
|
_add_section(sections, current_heading, current_type, current_lines)
|
|
elif current_lines:
|
|
preamble_lines.extend(current_lines)
|
|
|
|
# Add preamble as untyped section (main DSI text)
|
|
if preamble_lines:
|
|
preamble_text = "\n".join(preamble_lines)
|
|
if len(preamble_text.split()) >= 30:
|
|
sections.insert(0, {
|
|
"heading": "(Haupttext)",
|
|
"text": preamble_text,
|
|
"type": "dse",
|
|
})
|
|
|
|
return sections
|
|
|
|
|
|
def _add_section(
|
|
sections: list[dict], heading: str, sec_type: str, lines: list[str],
|
|
) -> None:
|
|
"""Add a section, merging with existing same-type sections."""
|
|
text = "\n".join(lines)
|
|
if len(text.split()) < 20:
|
|
return
|
|
# Merge if same type already exists
|
|
for s in sections:
|
|
if s["type"] == sec_type:
|
|
s["text"] += "\n\n" + text
|
|
return
|
|
sections.append({"heading": heading, "text": text, "type": sec_type})
|
|
|
|
|
|
def _classify_heading(line: str) -> str | None:
|
|
"""Classify a line as a section heading. Returns doc_type or None."""
|
|
if not line or len(line) < 5 or len(line) > 80:
|
|
return None
|
|
if line.endswith(".") or line.endswith(","):
|
|
return None
|
|
if len(line.split()) > 10:
|
|
return None
|
|
if not (line[0].isupper() or line[0].isdigit()):
|
|
return None
|
|
|
|
heading_lower = line.lower().strip()
|
|
heading_lower = re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip()
|
|
|
|
for keyword, doc_type in _HEADING_TYPE_MAP:
|
|
if keyword in heading_lower:
|
|
return doc_type
|
|
return None
|
|
|
|
|
|
def _find_section_for_type(sections: list[dict], doc_type: str) -> str | None:
|
|
"""Find the best text section for a given doc_type.
|
|
|
|
DSI always gets the full text (main document).
|
|
Other types get their matching section if found.
|
|
"""
|
|
if doc_type in ("dse", "datenschutz", "privacy"):
|
|
return None # Keep full text for DSI
|
|
|
|
for section in sections:
|
|
if section.get("type") == doc_type and section.get("text"):
|
|
return section["text"]
|
|
|
|
return None # No match → keep full text
|
|
|
|
|
|
def auto_fill_from_dsi(doc_entries: list[dict]) -> None:
|
|
"""Auto-fill empty document rows from sections found in the DSI text.
|
|
|
|
If the user only entered the DSI URL but left Cookie/Social-Media empty,
|
|
and the DSI text contains those sections, auto-fill them.
|
|
"""
|
|
# Find the DSI entry
|
|
dsi_entry = None
|
|
for entry in doc_entries:
|
|
if entry["doc_type"] in ("dse", "datenschutz", "privacy") and entry.get("text"):
|
|
dsi_entry = entry
|
|
break
|
|
if not dsi_entry:
|
|
return
|
|
|
|
dsi_text = dsi_entry["text"]
|
|
if len(dsi_text) < 300:
|
|
return
|
|
|
|
# Split DSI into sections
|
|
sections = _split_at_headings(dsi_text)
|
|
if not sections:
|
|
return
|
|
|
|
# Find empty entries that could be filled from DSI sections
|
|
filled = []
|
|
for entry in doc_entries:
|
|
if entry.get("text") or entry.get("url"):
|
|
continue # Already has content
|
|
|
|
doc_type = entry["doc_type"]
|
|
section_text = _find_section_for_type(sections, doc_type)
|
|
if section_text and len(section_text.split()) >= 30:
|
|
entry["text"] = section_text
|
|
entry["word_count"] = len(section_text.split())
|
|
entry["url"] = f"{dsi_entry.get('url', '')} (Abschnitt)"
|
|
filled.append(doc_type)
|
|
|
|
if filled:
|
|
logger.info(
|
|
"Auto-filled %d empty rows from DSI sections: %s",
|
|
len(filled), ", ".join(filled),
|
|
)
|
|
|
|
|
|
# ── Cross-Document Search ────────────────────────────────────────────
|
|
|
|
# Keywords that indicate a doc_type is present in text (case-insensitive)
|
|
_DOC_TYPE_KEYWORDS = {
|
|
"widerruf": [
|
|
"widerrufsrecht", "widerrufsbelehrung", "widerrufsfrist",
|
|
"binnen 14 tagen", "widerruf erklaeren", "muster-widerrufsformular",
|
|
],
|
|
"cookie": [
|
|
"cookie-richtlinie", "cookie-tabelle", "cookiebot", "consent-tool",
|
|
"arten der cookies", "session-cookie", "tracking-cookie",
|
|
],
|
|
"social_media": [
|
|
"gemeinsam verantwortlich", "art. 26 dsgvo", "fanpage",
|
|
"social media plugin", "facebook-seite", "instagram-profil",
|
|
],
|
|
"impressum": [
|
|
"angaben gemaess", "angaben gemäß", "§ 5 tmg", "§5 tmg",
|
|
"telemediengesetz", "impressum",
|
|
],
|
|
"agb": [
|
|
"allgemeine geschaeftsbedingungen", "allgemeine geschäftsbedingungen",
|
|
"geltungsbereich", "vertragsschluss", "§305 bgb",
|
|
],
|
|
"dsb": [
|
|
"datenschutzbeauftragte", "dsb@", "dpo@",
|
|
"datenschutzbeauftragten",
|
|
],
|
|
}
|
|
|
|
|
|
def cross_search_documents(doc_entries: list[dict]) -> list[dict]:
|
|
"""Search ALL texts for ALL doc_types and fill missing entries.
|
|
|
|
For each empty doc_type row, search through all other documents'
|
|
texts to find the content. If found in the wrong document, extract
|
|
it, assign it, and create a finding about incorrect placement.
|
|
|
|
Returns list of findings (misplacement warnings).
|
|
"""
|
|
findings: list[dict] = []
|
|
|
|
# Collect all available texts with their source doc_type
|
|
all_texts: list[tuple[str, str, str]] = [] # (doc_type, url, text)
|
|
for entry in doc_entries:
|
|
if entry.get("text") and len(entry["text"]) > 100:
|
|
all_texts.append((entry["doc_type"], entry.get("url", ""), entry["text"]))
|
|
|
|
if not all_texts:
|
|
return findings
|
|
|
|
# For each entry, check if:
|
|
# a) It's empty → search other texts
|
|
# b) It has text but the text doesn't match the doc_type → search other texts
|
|
for entry in doc_entries:
|
|
target_type = entry["doc_type"]
|
|
keywords = _DOC_TYPE_KEYWORDS.get(target_type, [])
|
|
if not keywords:
|
|
continue
|
|
|
|
has_text = entry.get("text") and len(entry["text"].split()) > 50
|
|
text_matches = False
|
|
if has_text:
|
|
# Check if the current text actually contains this doc_type's content
|
|
entry_lower = entry["text"].lower()
|
|
match_score = sum(1 for kw in keywords if kw in entry_lower)
|
|
text_matches = match_score >= 2
|
|
|
|
if has_text and text_matches:
|
|
continue # Text present AND matches doc_type → skip
|
|
|
|
# Search all other texts for this doc_type's keywords
|
|
best_match: dict | None = None
|
|
best_score = 0
|
|
|
|
for source_type, source_url, source_text in all_texts:
|
|
if source_type == target_type:
|
|
continue
|
|
|
|
text_lower = source_text.lower()
|
|
score = sum(1 for kw in keywords if kw in text_lower)
|
|
|
|
if score >= 2 and score > best_score:
|
|
best_score = score
|
|
# Extract the relevant section
|
|
section = _extract_section_by_keywords(source_text, keywords)
|
|
if section and len(section.split()) >= 30:
|
|
best_match = {
|
|
"source_type": source_type,
|
|
"source_url": source_url,
|
|
"section_text": section,
|
|
"keyword_hits": score,
|
|
}
|
|
|
|
if best_match:
|
|
entry["text"] = best_match["section_text"]
|
|
entry["word_count"] = len(best_match["section_text"].split())
|
|
source_label = best_match["source_type"].upper()
|
|
entry["url"] = f"(gefunden in {source_label})"
|
|
|
|
findings.append({
|
|
"id": f"placement-{target_type}",
|
|
"label": f"{_type_label(target_type)} in falschem Dokument",
|
|
"passed": False,
|
|
"severity": "MEDIUM",
|
|
"level": 1,
|
|
"parent": None,
|
|
"skipped": False,
|
|
"matched_text": "",
|
|
"hint": (
|
|
f"Die {_type_label(target_type)} wurde nicht als eigenes "
|
|
f"Dokument gefunden, sondern in der/den {source_label} "
|
|
f"({best_match['source_url']}). Gemaess Art. 246a EGBGB / "
|
|
f"§312d BGB muss die {_type_label(target_type)} leicht "
|
|
f"auffindbar und klar erkennbar sein. Empfehlung: Als "
|
|
f"eigenen Link im Footer oder als separates Dokument "
|
|
f"bereitstellen."
|
|
),
|
|
"source": "cross_document_search",
|
|
"doc_type": target_type,
|
|
})
|
|
|
|
logger.info(
|
|
"Cross-doc: Found %s in %s (%d keywords, %d words)",
|
|
target_type, best_match["source_type"],
|
|
best_match["keyword_hits"],
|
|
entry["word_count"],
|
|
)
|
|
elif has_text and not text_matches:
|
|
# Text present but doesn't match — wrong text assigned
|
|
findings.append({
|
|
"id": f"wrong-text-{target_type}",
|
|
"label": f"{_type_label(target_type)} nicht im eingereichten Text",
|
|
"passed": False,
|
|
"severity": "HIGH",
|
|
"level": 1,
|
|
"parent": None,
|
|
"skipped": False,
|
|
"matched_text": "",
|
|
"hint": (
|
|
f"Der eingereichte Text enthaelt keine "
|
|
f"{_type_label(target_type)}. Moeglicherweise wurde "
|
|
f"die falsche URL eingegeben. Das System konnte die "
|
|
f"{_type_label(target_type)} auch in keinem anderen "
|
|
f"eingereichten Dokument finden."
|
|
),
|
|
"source": "cross_document_search",
|
|
"doc_type": target_type,
|
|
})
|
|
logger.info("Cross-doc: %s text doesn't match doc_type, not found elsewhere", target_type)
|
|
|
|
return findings
|
|
|
|
|
|
def _extract_section_by_keywords(
|
|
text: str, keywords: list[str],
|
|
) -> str | None:
|
|
"""Extract the section of text around the keyword matches."""
|
|
text_lower = text.lower()
|
|
lines = text.split("\n")
|
|
|
|
# Find first and last line containing any keyword
|
|
first_line = len(lines)
|
|
last_line = 0
|
|
for i, line in enumerate(lines):
|
|
line_lower = line.lower()
|
|
if any(kw in line_lower for kw in keywords):
|
|
first_line = min(first_line, i)
|
|
last_line = max(last_line, i)
|
|
|
|
if first_line >= last_line:
|
|
return None
|
|
|
|
# Expand to include context (5 lines before first, 10 after last)
|
|
start = max(0, first_line - 5)
|
|
end = min(len(lines), last_line + 10)
|
|
|
|
section = "\n".join(lines[start:end])
|
|
return section if len(section.split()) >= 30 else None
|
|
|
|
|
|
def _type_label(doc_type: str) -> str:
|
|
labels = {
|
|
"widerruf": "Widerrufsbelehrung",
|
|
"cookie": "Cookie-Richtlinie",
|
|
"social_media": "Social-Media-Datenschutz",
|
|
"impressum": "Impressum",
|
|
"agb": "AGB",
|
|
"dsb": "DSB-Kontakt",
|
|
"dse": "Datenschutzerklaerung",
|
|
}
|
|
return labels.get(doc_type, doc_type)
|