feat: Auto-detect sub-sections within a page and check each separately
When a single URL contains multiple document sections (e.g. IHK DSI page with Cookies, Social Media, Dienste von Drittanbietern), the system now: 1. Extracts full page text (main document check as before) 2. Splits text at heading boundaries (short uppercase lines) 3. Classifies each section: Cookie→cookie checklist, Social Media→DSI etc. 4. Runs type-specific checklist per section 5. Returns all results: main doc + sub-sections Section type detection via SECTION_TYPE_MAP patterns: - 'Cookie*' → §25 TDDDG checklist - 'Dienste von Drittanbietern' → DSI checklist - 'Social Media' → DSI checklist (Art. 26 joint controllership) - 'Widerrufsrecht' → §355 BGB checklist - 'Impressum' → §5 TMG checklist Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -120,9 +120,9 @@ async def _run_doc_check(check_id: str, req: DocCheckRequest):
|
|||||||
f"Dokument {i+1}/{len(req.entries)}: {entry.label}..."
|
f"Dokument {i+1}/{len(req.entries)}: {entry.label}..."
|
||||||
)
|
)
|
||||||
|
|
||||||
result = await _check_single_document(entry)
|
doc_results = await _check_single_document(entry)
|
||||||
results.append(result)
|
results.extend(doc_results)
|
||||||
total_findings += result.findings_count
|
total_findings += sum(r.findings_count for r in doc_results)
|
||||||
|
|
||||||
# Optional: Cookie banner check on first URL
|
# Optional: Cookie banner check on first URL
|
||||||
cookie_result = None
|
cookie_result = None
|
||||||
@@ -158,8 +158,13 @@ async def _run_doc_check(check_id: str, req: DocCheckRequest):
|
|||||||
_doc_check_jobs[check_id]["error"] = str(e)[:500]
|
_doc_check_jobs[check_id]["error"] = str(e)[:500]
|
||||||
|
|
||||||
|
|
||||||
async def _check_single_document(entry: DocCheckEntry) -> DocCheckResult:
|
async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
|
||||||
"""Load a single URL, expand content, extract text, run checklist."""
|
"""Load a single URL, expand content, extract text, split into sections,
|
||||||
|
and check each section against its type-specific checklist.
|
||||||
|
|
||||||
|
Returns multiple results if the page contains sub-documents
|
||||||
|
(e.g. Cookies section, Social Media section on a DSI page).
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=90.0) as client:
|
async with httpx.AsyncClient(timeout=90.0) as client:
|
||||||
resp = await client.post(
|
resp = await client.post(
|
||||||
@@ -167,15 +172,14 @@ async def _check_single_document(entry: DocCheckEntry) -> DocCheckResult:
|
|||||||
json={"url": entry.url, "max_documents": 1},
|
json={"url": entry.url, "max_documents": 1},
|
||||||
)
|
)
|
||||||
if resp.status_code != 200:
|
if resp.status_code != 200:
|
||||||
return DocCheckResult(
|
return [DocCheckResult(
|
||||||
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
||||||
error=f"Seite nicht erreichbar (HTTP {resp.status_code})",
|
error=f"Seite nicht erreichbar (HTTP {resp.status_code})",
|
||||||
)
|
)]
|
||||||
|
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
docs = data.get("documents", [])
|
docs = data.get("documents", [])
|
||||||
|
|
||||||
# Use the first document found, or fall back to any text
|
|
||||||
doc_text = ""
|
doc_text = ""
|
||||||
word_count = 0
|
word_count = 0
|
||||||
if docs:
|
if docs:
|
||||||
@@ -183,50 +187,148 @@ async def _check_single_document(entry: DocCheckEntry) -> DocCheckResult:
|
|||||||
word_count = docs[0].get("word_count", 0)
|
word_count = docs[0].get("word_count", 0)
|
||||||
|
|
||||||
if not doc_text or len(doc_text) < 50:
|
if not doc_text or len(doc_text) < 50:
|
||||||
return DocCheckResult(
|
return [DocCheckResult(
|
||||||
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
||||||
error="Kein Text extrahierbar",
|
error="Kein Text extrahierbar",
|
||||||
|
)]
|
||||||
|
|
||||||
|
# Split text into sections and check each
|
||||||
|
sections = _split_into_sections(doc_text, entry.label, entry.url)
|
||||||
|
all_results: list[DocCheckResult] = []
|
||||||
|
|
||||||
|
# Main document check (full text against primary type)
|
||||||
|
main_result = _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count)
|
||||||
|
all_results.append(main_result)
|
||||||
|
|
||||||
|
# Sub-section checks (auto-detected from headings)
|
||||||
|
for section in sections:
|
||||||
|
if section["word_count"] < 100:
|
||||||
|
continue
|
||||||
|
sub_result = _run_checklist(
|
||||||
|
section["text"], section["doc_type"],
|
||||||
|
section["title"], entry.url,
|
||||||
|
section["word_count"],
|
||||||
)
|
)
|
||||||
|
all_results.append(sub_result)
|
||||||
|
|
||||||
# Run checklist
|
return all_results
|
||||||
findings = check_document_completeness(
|
|
||||||
doc_text, entry.doc_type, entry.label, entry.url,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Extract all_checks from SCORE finding
|
|
||||||
all_checks: list[CheckItem] = []
|
|
||||||
completeness = 0
|
|
||||||
for f in findings:
|
|
||||||
if "SCORE" in f.get("code", ""):
|
|
||||||
checks_data = f.get("all_checks", [])
|
|
||||||
all_checks = [
|
|
||||||
CheckItem(
|
|
||||||
id=c["id"], label=c["label"], passed=c["passed"],
|
|
||||||
severity=c["severity"], matched_text=c.get("matched_text", ""),
|
|
||||||
)
|
|
||||||
for c in checks_data
|
|
||||||
]
|
|
||||||
# Extract percentage
|
|
||||||
import re
|
|
||||||
pct_match = re.search(r"(\d+)%", f.get("text", ""))
|
|
||||||
if pct_match:
|
|
||||||
completeness = int(pct_match.group(1))
|
|
||||||
|
|
||||||
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
|
|
||||||
|
|
||||||
return DocCheckResult(
|
|
||||||
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
|
||||||
word_count=word_count, completeness_pct=completeness,
|
|
||||||
checks=all_checks, findings_count=len(non_score),
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Doc check failed for %s: %s", entry.url, e)
|
logger.warning("Doc check failed for %s: %s", entry.url, e)
|
||||||
return DocCheckResult(
|
return [DocCheckResult(
|
||||||
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
label=entry.label, url=entry.url, doc_type=entry.doc_type,
|
||||||
error=str(e)[:200],
|
error=str(e)[:200],
|
||||||
|
)]
|
||||||
|
|
||||||
|
|
||||||
|
def _run_checklist(text: str, doc_type: str, label: str, url: str, word_count: int = 0) -> DocCheckResult:
|
||||||
|
"""Run checklist against text and return structured result."""
|
||||||
|
import re as _re
|
||||||
|
findings = check_document_completeness(text, doc_type, label, url)
|
||||||
|
|
||||||
|
all_checks: list[CheckItem] = []
|
||||||
|
completeness = 0
|
||||||
|
for f in findings:
|
||||||
|
if "SCORE" in f.get("code", ""):
|
||||||
|
for c in f.get("all_checks", []):
|
||||||
|
all_checks.append(CheckItem(
|
||||||
|
id=c["id"], label=c["label"], passed=c["passed"],
|
||||||
|
severity=c["severity"], matched_text=c.get("matched_text", ""),
|
||||||
|
))
|
||||||
|
pct_match = _re.search(r"(\d+)%", f.get("text", ""))
|
||||||
|
if pct_match:
|
||||||
|
completeness = int(pct_match.group(1))
|
||||||
|
|
||||||
|
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
|
||||||
|
return DocCheckResult(
|
||||||
|
label=label, url=url, doc_type=doc_type,
|
||||||
|
word_count=word_count or len(text.split()),
|
||||||
|
completeness_pct=completeness,
|
||||||
|
checks=all_checks, findings_count=len(non_score),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Section heading patterns → document type mapping
|
||||||
|
SECTION_TYPE_MAP = [
|
||||||
|
(r"cookie", "cookie"),
|
||||||
|
(r"dienste?\s+von\s+drittanbieter", "dse"),
|
||||||
|
(r"social\s+media", "dse"),
|
||||||
|
(r"datensicherheit", "dse"),
|
||||||
|
(r"betroffenenrecht", "dse"),
|
||||||
|
(r"widerrufsrecht|widerruf", "widerruf"),
|
||||||
|
(r"impressum", "impressum"),
|
||||||
|
(r"nutzungsbedingung|agb|geschaeftsbedingung", "agb"),
|
||||||
|
(r"datenschutz(?:folge|risiko).*(?:analyse|abschaetzung)|dsfa", "dse"),
|
||||||
|
(r"datenschutzerkl(?:ae|ä)rung.*social", "dse"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _split_into_sections(text: str, parent_label: str, url: str) -> list[dict]:
|
||||||
|
"""Split document text at major headings into sub-sections.
|
||||||
|
|
||||||
|
Detects sections like 'Cookies', 'Social Media', 'Dienste von Drittanbietern'
|
||||||
|
and classifies each by document type for separate checking.
|
||||||
|
"""
|
||||||
|
import re as _re
|
||||||
|
sections = []
|
||||||
|
|
||||||
|
# Split by lines that look like headings (short, followed by longer content)
|
||||||
|
lines = text.split("\n")
|
||||||
|
current_heading = ""
|
||||||
|
current_text = []
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
stripped = line.strip()
|
||||||
|
# Detect heading: short line (< 80 chars), not empty, followed by content
|
||||||
|
is_heading = (
|
||||||
|
5 < len(stripped) < 80
|
||||||
|
and not stripped.endswith(".")
|
||||||
|
and not stripped.endswith(",")
|
||||||
|
and stripped[0].isupper()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if is_heading and current_heading and len("\n".join(current_text)) > 200:
|
||||||
|
# Save previous section
|
||||||
|
sec_text = "\n".join(current_text)
|
||||||
|
sec_type = _classify_section(current_heading)
|
||||||
|
if sec_type and sec_type != "skip":
|
||||||
|
sections.append({
|
||||||
|
"title": f"{parent_label} > {current_heading}",
|
||||||
|
"text": sec_text,
|
||||||
|
"doc_type": sec_type,
|
||||||
|
"word_count": len(sec_text.split()),
|
||||||
|
})
|
||||||
|
|
||||||
|
if is_heading:
|
||||||
|
current_heading = stripped
|
||||||
|
current_text = []
|
||||||
|
else:
|
||||||
|
current_text.append(line)
|
||||||
|
|
||||||
|
# Last section
|
||||||
|
if current_heading and len("\n".join(current_text)) > 200:
|
||||||
|
sec_text = "\n".join(current_text)
|
||||||
|
sec_type = _classify_section(current_heading)
|
||||||
|
if sec_type and sec_type != "skip":
|
||||||
|
sections.append({
|
||||||
|
"title": f"{parent_label} > {current_heading}",
|
||||||
|
"text": sec_text,
|
||||||
|
"doc_type": sec_type,
|
||||||
|
"word_count": len(sec_text.split()),
|
||||||
|
})
|
||||||
|
|
||||||
|
return sections
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_section(heading: str) -> str | None:
|
||||||
|
"""Classify a section heading into a document type."""
|
||||||
|
import re as _re
|
||||||
|
heading_lower = heading.lower()
|
||||||
|
for pattern, doc_type in SECTION_TYPE_MAP:
|
||||||
|
if _re.search(pattern, heading_lower):
|
||||||
|
return doc_type
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def _check_cookie_banner(url: str) -> dict | None:
|
async def _check_cookie_banner(url: str) -> dict | None:
|
||||||
"""Run cookie banner consent test on a URL."""
|
"""Run cookie banner consent test on a URL."""
|
||||||
|
|||||||
Reference in New Issue
Block a user