From 539bc824fd7e4b6317fc3e0c26f29f96f9ac3803 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 6 May 2026 10:44:42 +0200 Subject: [PATCH] feat: Auto-detect sub-sections within a page and check each separately MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a single URL contains multiple document sections (e.g. IHK DSI page with Cookies, Social Media, Dienste von Drittanbietern), the system now: 1. Extracts full page text (main document check as before) 2. Splits text at heading boundaries (short uppercase lines) 3. Classifies each section: Cookie→cookie checklist, Social Media→DSI etc. 4. Runs type-specific checklist per section 5. Returns all results: main doc + sub-sections Section type detection via SECTION_TYPE_MAP patterns: - 'Cookie*' → §25 TDDDG checklist - 'Dienste von Drittanbietern' → DSI checklist - 'Social Media' → DSI checklist (Art. 26 joint controllership) - 'Widerrufsrecht' → §355 BGB checklist - 'Impressum' → §5 TMG checklist Co-Authored-By: Claude Opus 4.6 (1M context) --- .../compliance/api/agent_doc_check_routes.py | 184 ++++++++++++++---- 1 file changed, 143 insertions(+), 41 deletions(-) diff --git a/backend-compliance/compliance/api/agent_doc_check_routes.py b/backend-compliance/compliance/api/agent_doc_check_routes.py index 2b93c2e..21f5625 100644 --- a/backend-compliance/compliance/api/agent_doc_check_routes.py +++ b/backend-compliance/compliance/api/agent_doc_check_routes.py @@ -120,9 +120,9 @@ async def _run_doc_check(check_id: str, req: DocCheckRequest): f"Dokument {i+1}/{len(req.entries)}: {entry.label}..." ) - result = await _check_single_document(entry) - results.append(result) - total_findings += result.findings_count + doc_results = await _check_single_document(entry) + results.extend(doc_results) + total_findings += sum(r.findings_count for r in doc_results) # Optional: Cookie banner check on first URL cookie_result = None @@ -158,8 +158,13 @@ async def _run_doc_check(check_id: str, req: DocCheckRequest): _doc_check_jobs[check_id]["error"] = str(e)[:500] -async def _check_single_document(entry: DocCheckEntry) -> DocCheckResult: - """Load a single URL, expand content, extract text, run checklist.""" +async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]: + """Load a single URL, expand content, extract text, split into sections, + and check each section against its type-specific checklist. + + Returns multiple results if the page contains sub-documents + (e.g. Cookies section, Social Media section on a DSI page). + """ try: async with httpx.AsyncClient(timeout=90.0) as client: resp = await client.post( @@ -167,15 +172,14 @@ async def _check_single_document(entry: DocCheckEntry) -> DocCheckResult: json={"url": entry.url, "max_documents": 1}, ) if resp.status_code != 200: - return DocCheckResult( + return [DocCheckResult( label=entry.label, url=entry.url, doc_type=entry.doc_type, error=f"Seite nicht erreichbar (HTTP {resp.status_code})", - ) + )] data = resp.json() docs = data.get("documents", []) - # Use the first document found, or fall back to any text doc_text = "" word_count = 0 if docs: @@ -183,50 +187,148 @@ async def _check_single_document(entry: DocCheckEntry) -> DocCheckResult: word_count = docs[0].get("word_count", 0) if not doc_text or len(doc_text) < 50: - return DocCheckResult( + return [DocCheckResult( label=entry.label, url=entry.url, doc_type=entry.doc_type, error="Kein Text extrahierbar", + )] + + # Split text into sections and check each + sections = _split_into_sections(doc_text, entry.label, entry.url) + all_results: list[DocCheckResult] = [] + + # Main document check (full text against primary type) + main_result = _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count) + all_results.append(main_result) + + # Sub-section checks (auto-detected from headings) + for section in sections: + if section["word_count"] < 100: + continue + sub_result = _run_checklist( + section["text"], section["doc_type"], + section["title"], entry.url, + section["word_count"], ) + all_results.append(sub_result) - # Run checklist - findings = check_document_completeness( - doc_text, entry.doc_type, entry.label, entry.url, - ) - - # Extract all_checks from SCORE finding - all_checks: list[CheckItem] = [] - completeness = 0 - for f in findings: - if "SCORE" in f.get("code", ""): - checks_data = f.get("all_checks", []) - all_checks = [ - CheckItem( - id=c["id"], label=c["label"], passed=c["passed"], - severity=c["severity"], matched_text=c.get("matched_text", ""), - ) - for c in checks_data - ] - # Extract percentage - import re - pct_match = re.search(r"(\d+)%", f.get("text", "")) - if pct_match: - completeness = int(pct_match.group(1)) - - non_score = [f for f in findings if "SCORE" not in f.get("code", "")] - - return DocCheckResult( - label=entry.label, url=entry.url, doc_type=entry.doc_type, - word_count=word_count, completeness_pct=completeness, - checks=all_checks, findings_count=len(non_score), - ) + return all_results except Exception as e: logger.warning("Doc check failed for %s: %s", entry.url, e) - return DocCheckResult( + return [DocCheckResult( label=entry.label, url=entry.url, doc_type=entry.doc_type, error=str(e)[:200], + )] + + +def _run_checklist(text: str, doc_type: str, label: str, url: str, word_count: int = 0) -> DocCheckResult: + """Run checklist against text and return structured result.""" + import re as _re + findings = check_document_completeness(text, doc_type, label, url) + + all_checks: list[CheckItem] = [] + completeness = 0 + for f in findings: + if "SCORE" in f.get("code", ""): + for c in f.get("all_checks", []): + all_checks.append(CheckItem( + id=c["id"], label=c["label"], passed=c["passed"], + severity=c["severity"], matched_text=c.get("matched_text", ""), + )) + pct_match = _re.search(r"(\d+)%", f.get("text", "")) + if pct_match: + completeness = int(pct_match.group(1)) + + non_score = [f for f in findings if "SCORE" not in f.get("code", "")] + return DocCheckResult( + label=label, url=url, doc_type=doc_type, + word_count=word_count or len(text.split()), + completeness_pct=completeness, + checks=all_checks, findings_count=len(non_score), + ) + + +# Section heading patterns → document type mapping +SECTION_TYPE_MAP = [ + (r"cookie", "cookie"), + (r"dienste?\s+von\s+drittanbieter", "dse"), + (r"social\s+media", "dse"), + (r"datensicherheit", "dse"), + (r"betroffenenrecht", "dse"), + (r"widerrufsrecht|widerruf", "widerruf"), + (r"impressum", "impressum"), + (r"nutzungsbedingung|agb|geschaeftsbedingung", "agb"), + (r"datenschutz(?:folge|risiko).*(?:analyse|abschaetzung)|dsfa", "dse"), + (r"datenschutzerkl(?:ae|ä)rung.*social", "dse"), +] + + +def _split_into_sections(text: str, parent_label: str, url: str) -> list[dict]: + """Split document text at major headings into sub-sections. + + Detects sections like 'Cookies', 'Social Media', 'Dienste von Drittanbietern' + and classifies each by document type for separate checking. + """ + import re as _re + sections = [] + + # Split by lines that look like headings (short, followed by longer content) + lines = text.split("\n") + current_heading = "" + current_text = [] + + for line in lines: + stripped = line.strip() + # Detect heading: short line (< 80 chars), not empty, followed by content + is_heading = ( + 5 < len(stripped) < 80 + and not stripped.endswith(".") + and not stripped.endswith(",") + and stripped[0].isupper() ) + if is_heading and current_heading and len("\n".join(current_text)) > 200: + # Save previous section + sec_text = "\n".join(current_text) + sec_type = _classify_section(current_heading) + if sec_type and sec_type != "skip": + sections.append({ + "title": f"{parent_label} > {current_heading}", + "text": sec_text, + "doc_type": sec_type, + "word_count": len(sec_text.split()), + }) + + if is_heading: + current_heading = stripped + current_text = [] + else: + current_text.append(line) + + # Last section + if current_heading and len("\n".join(current_text)) > 200: + sec_text = "\n".join(current_text) + sec_type = _classify_section(current_heading) + if sec_type and sec_type != "skip": + sections.append({ + "title": f"{parent_label} > {current_heading}", + "text": sec_text, + "doc_type": sec_type, + "word_count": len(sec_text.split()), + }) + + return sections + + +def _classify_section(heading: str) -> str | None: + """Classify a section heading into a document type.""" + import re as _re + heading_lower = heading.lower() + for pattern, doc_type in SECTION_TYPE_MAP: + if _re.search(pattern, heading_lower): + return doc_type + return None + async def _check_cookie_banner(url: str) -> dict | None: """Run cookie banner consent test on a URL."""