From 74f00bbb0f2104a3b4e1141f8fcd3e8e2fa77811 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 12 May 2026 12:49:57 +0200 Subject: [PATCH] feat(compliance-check): split shared URLs into sections per doc_type When the same URL is used for multiple document types (e.g. /datenschutz for DSI + Cookie + DSB), the section splitter now: - Detects duplicate URLs and fetches text only once - Splits text at classified headings (Cookie, Google Analytics, etc.) - Assigns matching sections to each doc_type - DSI always keeps the full text Extracted to section_splitter.py (170 LOC) to keep routes under 500. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/agent_compliance_check_routes.py | 31 ++-- .../compliance/services/section_splitter.py | 170 ++++++++++++++++++ 2 files changed, 191 insertions(+), 10 deletions(-) create mode 100644 backend-compliance/compliance/services/section_splitter.py diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index e484724..1229aca 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -151,11 +151,20 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): doc_texts: dict[str, str] = {} doc_entries: list[dict] = [] + # Cache fetched URLs to detect duplicates + url_text_cache: dict[str, str] = {} + for i, doc in enumerate(req.documents): _update(check_id, f"Dokument {i+1}/{len(req.documents)}: {doc.doc_type}...") text = doc.text if not text and doc.url: - text = await _fetch_text(doc.url) + url_key = doc.url.strip().rstrip("/").lower() + if url_key in url_text_cache: + text = url_text_cache[url_key] + else: + text = await _fetch_text(doc.url) + if text: + url_text_cache[url_key] = text if text: doc_texts[doc.doc_type] = text doc_entries.append({ @@ -165,6 +174,14 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): "word_count": len(text.split()) if text else 0, }) + # Step 1b: If same URL used for multiple doc_types, try section splitting + from compliance.services.section_splitter import split_shared_texts + split_shared_texts(doc_entries, url_text_cache) + # Refresh doc_texts after splitting + for entry in doc_entries: + if entry.get("text"): + doc_texts[entry["doc_type"]] = entry["text"] + # Step 2: Detect business profile _update(check_id, "Geschaeftsmodell wird erkannt...") profile = await detect_business_profile(doc_texts) @@ -431,19 +448,13 @@ def _doc_type_label(doc_type: str) -> str: def _result_to_dict(r) -> dict: """Convert DocCheckResult to JSON-serializable dict.""" + fields = ("id", "label", "passed", "severity", "matched_text", + "level", "parent", "skipped", "hint") return { "label": r.label, "url": r.url, "doc_type": r.doc_type, "word_count": r.word_count, "completeness_pct": r.completeness_pct, "correctness_pct": r.correctness_pct, - "checks": [ - { - "id": c.id, "label": c.label, "passed": c.passed, - "severity": c.severity, "matched_text": c.matched_text, - "level": c.level, "parent": c.parent, - "skipped": c.skipped, "hint": c.hint, - } - for c in r.checks - ], + "checks": [{f: getattr(c, f) for f in fields} for c in r.checks], "findings_count": r.findings_count, "error": r.error, } diff --git a/backend-compliance/compliance/services/section_splitter.py b/backend-compliance/compliance/services/section_splitter.py new file mode 100644 index 0000000..2b5ae05 --- /dev/null +++ b/backend-compliance/compliance/services/section_splitter.py @@ -0,0 +1,170 @@ +""" +Section splitter for shared URLs in unified compliance checks. + +When the same URL is used for multiple document types (e.g. /datenschutz +used for DSI + Cookie + DSB), this module splits the text at headings +and assigns the best-matching section to each doc_type. +""" + +import logging +import re + +logger = logging.getLogger(__name__) + +# Heading keyword → doc_type mapping +_HEADING_TYPE_MAP = [ + ("cookie", "cookie"), + ("datenschutzbeauftragte", "dsb"), + ("widerruf", "widerruf"), + ("impressum", "impressum"), + ("agb", "agb"), + ("nutzungsbedingung", "agb"), + ("social media", "social_media"), + ("soziale medien", "social_media"), + ("soziale netzwerke", "social_media"), + ("google analytics", "cookie"), + ("tracking", "cookie"), + ("verwendung von cookies", "cookie"), + ("nutzung von google", "cookie"), + ("webanalyse", "cookie"), +] + + +def split_shared_texts( + doc_entries: list[dict], + url_cache: dict[str, str], +) -> None: + """When the same URL is used for multiple doc_types, split text into + sections and assign the best-matching section to each doc_type. + + Mutates doc_entries in place. + """ + # Group entries by normalized URL + url_groups: dict[str, list[int]] = {} + for i, entry in enumerate(doc_entries): + if not entry.get("url"): + continue + key = entry["url"].strip().rstrip("/").lower() + url_groups.setdefault(key, []).append(i) + + for url_key, indices in url_groups.items(): + if len(indices) < 2: + continue + + full_text = doc_entries[indices[0]].get("text", "") + if not full_text or len(full_text) < 200: + continue + + sections = _split_at_headings(full_text) + if not sections: + continue + + for idx in indices: + doc_type = doc_entries[idx]["doc_type"] + best = _find_section_for_type(sections, doc_type) + if best: + doc_entries[idx]["text"] = best + doc_entries[idx]["word_count"] = len(best.split()) + + typed = [s for s in sections if s.get("type")] + logger.info( + "Split shared URL into %d typed sections for %d doc_types: %s", + len(typed), len(indices), + ", ".join(f"{s['type']}({len(s['text'].split())}w)" for s in typed), + ) + + +def _split_at_headings(text: str) -> list[dict]: + """Split text at classified headings into typed sections.""" + lines = text.split("\n") + sections: list[dict] = [] + current_type: str | None = None + current_heading = "" + current_lines: list[str] = [] + preamble_lines: list[str] = [] + + for line in lines: + stripped = line.strip() + classified = _classify_heading(stripped) + + if classified: + # Save previous section + if current_type and current_lines: + _add_section(sections, current_heading, current_type, current_lines) + elif not current_type and current_lines: + preamble_lines.extend(current_lines) + + current_type = classified + current_heading = stripped + current_lines = [] + else: + current_lines.append(line) + + # Save last section + if current_type and current_lines: + _add_section(sections, current_heading, current_type, current_lines) + elif current_lines: + preamble_lines.extend(current_lines) + + # Add preamble as untyped section (main DSI text) + if preamble_lines: + preamble_text = "\n".join(preamble_lines) + if len(preamble_text.split()) >= 30: + sections.insert(0, { + "heading": "(Haupttext)", + "text": preamble_text, + "type": "dse", + }) + + return sections + + +def _add_section( + sections: list[dict], heading: str, sec_type: str, lines: list[str], +) -> None: + """Add a section, merging with existing same-type sections.""" + text = "\n".join(lines) + if len(text.split()) < 20: + return + # Merge if same type already exists + for s in sections: + if s["type"] == sec_type: + s["text"] += "\n\n" + text + return + sections.append({"heading": heading, "text": text, "type": sec_type}) + + +def _classify_heading(line: str) -> str | None: + """Classify a line as a section heading. Returns doc_type or None.""" + if not line or len(line) < 5 or len(line) > 80: + return None + if line.endswith(".") or line.endswith(","): + return None + if len(line.split()) > 10: + return None + if not (line[0].isupper() or line[0].isdigit()): + return None + + heading_lower = line.lower().strip() + heading_lower = re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip() + + for keyword, doc_type in _HEADING_TYPE_MAP: + if keyword in heading_lower: + return doc_type + return None + + +def _find_section_for_type(sections: list[dict], doc_type: str) -> str | None: + """Find the best text section for a given doc_type. + + DSI always gets the full text (main document). + Other types get their matching section if found. + """ + if doc_type in ("dse", "datenschutz", "privacy"): + return None # Keep full text for DSI + + for section in sections: + if section.get("type") == doc_type and section.get("text"): + return section["text"] + + return None # No match → keep full text