Files
breakpilot-compliance/backend-compliance/compliance/services/section_splitter.py
T
Benjamin Admin c702260ec1
Build + Deploy / build-backend-compliance (push) Successful in 23s
Build + Deploy / build-ai-sdk (push) Successful in 13s
Build + Deploy / build-admin-compliance (push) Successful in 13s
Build + Deploy / build-developer-portal (push) Successful in 14s
Build + Deploy / build-tts (push) Successful in 15s
Build + Deploy / build-document-crawler (push) Successful in 13s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Successful in 39s
Build + Deploy / build-dsms-gateway (push) Successful in 15s
Build + Deploy / build-dsms-node (push) Successful in 14s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 15s
CI / nodejs-build (push) Successful in 2m26s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-python-backend (push) Successful in 39s
CI / test-python-document-crawler (push) Successful in 25s
CI / test-python-dsms-gateway (push) Successful in 22s
CI / validate-canonical-controls (push) Successful in 15s
Build + Deploy / trigger-orca (push) Successful in 2m28s
fix: 5 regex bugs + text extraction scroll + GT update
Root cause: Spiegel DSI text was truncated (lazy-loading) — the
rights/DSB/complaints sections at the bottom were never extracted.

Fixes:
1. Text extraction: scroll to bottom before innerText (dsi_discovery.py)
2. V.i.S.d.P.: add "verantwortlicher i.s.v." + "§18 Abs. N MStV" pattern
3. USt-IdNr: add "umsatzsteuer-id" + "DE 212 442 423" (with spaces)
4. Profiler: remove generic "anwalt"/"praxis" (false positive on Spiegel
   "Redaktionsanwalt"), keep only "rechtsanwalt", "kanzlei" etc.
5. Section splitter: auto_fill_from_dsi() fills empty Cookie/Social-Media
   rows from sections found in the DSI text

Ground Truth 06-spiegel.md fully rewritten with verified data from
live website — 3 L1 False Negatives identified (DSB, Beschwerderecht,
Betroffenenrechte all present on website but not in extracted text).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-13 01:20:55 +02:00

216 lines
6.7 KiB
Python

"""
Section splitter for shared URLs in unified compliance checks.
When the same URL is used for multiple document types (e.g. /datenschutz
used for DSI + Cookie + DSB), this module splits the text at headings
and assigns the best-matching section to each doc_type.
"""
import logging
import re
logger = logging.getLogger(__name__)
# Heading keyword → doc_type mapping
_HEADING_TYPE_MAP = [
("cookie", "cookie"),
("datenschutzbeauftragte", "dsb"),
("widerruf", "widerruf"),
("impressum", "impressum"),
("agb", "agb"),
("nutzungsbedingung", "agb"),
("social media", "social_media"),
("soziale medien", "social_media"),
("soziale netzwerke", "social_media"),
("google analytics", "cookie"),
("tracking", "cookie"),
("verwendung von cookies", "cookie"),
("nutzung von google", "cookie"),
("webanalyse", "cookie"),
]
def split_shared_texts(
doc_entries: list[dict],
url_cache: dict[str, str],
) -> None:
"""When the same URL is used for multiple doc_types, split text into
sections and assign the best-matching section to each doc_type.
Mutates doc_entries in place.
"""
# Group entries by normalized URL
url_groups: dict[str, list[int]] = {}
for i, entry in enumerate(doc_entries):
if not entry.get("url"):
continue
key = entry["url"].strip().rstrip("/").lower()
url_groups.setdefault(key, []).append(i)
for url_key, indices in url_groups.items():
if len(indices) < 2:
continue
full_text = doc_entries[indices[0]].get("text", "")
if not full_text or len(full_text) < 200:
continue
sections = _split_at_headings(full_text)
if not sections:
continue
for idx in indices:
doc_type = doc_entries[idx]["doc_type"]
best = _find_section_for_type(sections, doc_type)
if best:
doc_entries[idx]["text"] = best
doc_entries[idx]["word_count"] = len(best.split())
typed = [s for s in sections if s.get("type")]
logger.info(
"Split shared URL into %d typed sections for %d doc_types: %s",
len(typed), len(indices),
", ".join(f"{s['type']}({len(s['text'].split())}w)" for s in typed),
)
def _split_at_headings(text: str) -> list[dict]:
"""Split text at classified headings into typed sections."""
lines = text.split("\n")
sections: list[dict] = []
current_type: str | None = None
current_heading = ""
current_lines: list[str] = []
preamble_lines: list[str] = []
for line in lines:
stripped = line.strip()
classified = _classify_heading(stripped)
if classified:
# Save previous section
if current_type and current_lines:
_add_section(sections, current_heading, current_type, current_lines)
elif not current_type and current_lines:
preamble_lines.extend(current_lines)
current_type = classified
current_heading = stripped
current_lines = []
else:
current_lines.append(line)
# Save last section
if current_type and current_lines:
_add_section(sections, current_heading, current_type, current_lines)
elif current_lines:
preamble_lines.extend(current_lines)
# Add preamble as untyped section (main DSI text)
if preamble_lines:
preamble_text = "\n".join(preamble_lines)
if len(preamble_text.split()) >= 30:
sections.insert(0, {
"heading": "(Haupttext)",
"text": preamble_text,
"type": "dse",
})
return sections
def _add_section(
sections: list[dict], heading: str, sec_type: str, lines: list[str],
) -> None:
"""Add a section, merging with existing same-type sections."""
text = "\n".join(lines)
if len(text.split()) < 20:
return
# Merge if same type already exists
for s in sections:
if s["type"] == sec_type:
s["text"] += "\n\n" + text
return
sections.append({"heading": heading, "text": text, "type": sec_type})
def _classify_heading(line: str) -> str | None:
"""Classify a line as a section heading. Returns doc_type or None."""
if not line or len(line) < 5 or len(line) > 80:
return None
if line.endswith(".") or line.endswith(","):
return None
if len(line.split()) > 10:
return None
if not (line[0].isupper() or line[0].isdigit()):
return None
heading_lower = line.lower().strip()
heading_lower = re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip()
for keyword, doc_type in _HEADING_TYPE_MAP:
if keyword in heading_lower:
return doc_type
return None
def _find_section_for_type(sections: list[dict], doc_type: str) -> str | None:
"""Find the best text section for a given doc_type.
DSI always gets the full text (main document).
Other types get their matching section if found.
"""
if doc_type in ("dse", "datenschutz", "privacy"):
return None # Keep full text for DSI
for section in sections:
if section.get("type") == doc_type and section.get("text"):
return section["text"]
return None # No match → keep full text
def auto_fill_from_dsi(doc_entries: list[dict]) -> None:
"""Auto-fill empty document rows from sections found in the DSI text.
If the user only entered the DSI URL but left Cookie/Social-Media empty,
and the DSI text contains those sections, auto-fill them.
"""
# Find the DSI entry
dsi_entry = None
for entry in doc_entries:
if entry["doc_type"] in ("dse", "datenschutz", "privacy") and entry.get("text"):
dsi_entry = entry
break
if not dsi_entry:
return
dsi_text = dsi_entry["text"]
if len(dsi_text) < 300:
return
# Split DSI into sections
sections = _split_at_headings(dsi_text)
if not sections:
return
# Find empty entries that could be filled from DSI sections
filled = []
for entry in doc_entries:
if entry.get("text") or entry.get("url"):
continue # Already has content
doc_type = entry["doc_type"]
section_text = _find_section_for_type(sections, doc_type)
if section_text and len(section_text.split()) >= 30:
entry["text"] = section_text
entry["word_count"] = len(section_text.split())
entry["url"] = f"{dsi_entry.get('url', '')} (Abschnitt)"
filled.append(doc_type)
if filled:
logger.info(
"Auto-filled %d empty rows from DSI sections: %s",
len(filled), ", ".join(filled),
)