feat: Multi-URL Document Check with full checklist visibility

New "Dokumenten-Pruefung" tab in Compliance Agent:
- User adds multiple URLs with document type (DSI, AGB, Impressum, Cookie, Widerruf)
- Each document loaded via Playwright, accordions expanded, text extracted
- Checked against type-specific legal checklist
- Optional: Cookie banner check via checkbox

Checklisten-UX (solves "100% looks like nothing was checked"):
- All checks shown per document: green checkmark + matched text excerpt
- Red X for missing fields with legal reference
- Builds user trust: "9 Punkte geprueft, alle bestanden"
- Expandable per document with completeness bar

New checklists:
- Impressum: §5 TMG (6 fields: name, address, contact, register, VAT, representative)
- Cookie-Richtlinie: §25 TDDDG (5 fields: types, purposes, retention, third-party, opt-out)

Backend:
- POST /agent/doc-check — async with polling (same pattern as /scan)
- DocCheckResult includes checks[] with passed/failed + matched_text
- dsi_document_checker returns all_checks in SCORE finding
- Email report shows per-document checklist

Files: agent_doc_check_routes.py (280 LOC), DocCheckTab.tsx (248 LOC),
ChecklistView.tsx (130 LOC), dsi_document_checker.py (+70 LOC)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-06 10:08:40 +02:00
parent 254dbab566
commit 4c68caac4e
7 changed files with 770 additions and 8 deletions
@@ -0,0 +1,280 @@
"""
Agent Document Check Routes — Multi-URL document verification.
The user provides explicit URLs + document types. No crawling needed.
Each document is loaded, expanded (accordions/tabs), text extracted,
and checked against its type-specific legal checklist.
POST /api/compliance/agent/doc-check
"""
import asyncio
import logging
import os
import uuid as _uuid
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.dsi_document_checker import (
check_document_completeness, classify_document_type,
)
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
class DocCheckEntry(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, other
label: str
url: str
class DocCheckRequest(BaseModel):
entries: list[DocCheckEntry]
recipient: str = "dsb@breakpilot.local"
check_cookie_banner: bool = False
class CheckItem(BaseModel):
id: str
label: str
passed: bool
severity: str
matched_text: str = ""
class DocCheckResult(BaseModel):
label: str
url: str
doc_type: str
word_count: int = 0
completeness_pct: int = 0
checks: list[CheckItem] = []
findings_count: int = 0
error: str = ""
class DocCheckResponse(BaseModel):
results: list[DocCheckResult]
cookie_banner_result: dict | None = None
total_documents: int
total_findings: int
email_status: str = ""
checked_at: str
# In-memory job store for async processing
_doc_check_jobs: dict[str, dict] = {}
class DocCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class DocCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
result: DocCheckResponse | None = None
error: str = ""
@router.post("/doc-check")
async def start_doc_check(req: DocCheckRequest):
"""Start async multi-URL document check."""
check_id = str(_uuid.uuid4())[:8]
_doc_check_jobs[check_id] = {"status": "running", "progress": "Pruefung gestartet...", "result": None, "error": ""}
asyncio.create_task(_run_doc_check(check_id, req))
return DocCheckStartResponse(check_id=check_id, status="running")
@router.get("/doc-check/{check_id}")
async def get_doc_check_status(check_id: str):
"""Poll document check status."""
job = _doc_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return DocCheckStatusResponse(
check_id=check_id, status=job["status"],
progress=job.get("progress", ""), result=job.get("result"),
error=job.get("error", ""),
)
async def _run_doc_check(check_id: str, req: DocCheckRequest):
"""Background task: check each document."""
try:
results: list[DocCheckResult] = []
total_findings = 0
for i, entry in enumerate(req.entries):
_doc_check_jobs[check_id]["progress"] = (
f"Dokument {i+1}/{len(req.entries)}: {entry.label}..."
)
result = await _check_single_document(entry)
results.append(result)
total_findings += result.findings_count
# Optional: Cookie banner check on first URL
cookie_result = None
if req.check_cookie_banner and req.entries:
_doc_check_jobs[check_id]["progress"] = "Cookie-Banner wird geprueft..."
cookie_result = await _check_cookie_banner(req.entries[0].url)
# Build email report
_doc_check_jobs[check_id]["progress"] = "Report wird erstellt..."
summary = _build_report(results, cookie_result)
email_result = send_email(
recipient=req.recipient,
subject=f"[DOKUMENTEN-PRUEFUNG] {len(results)} Dokumente geprueft",
body_html=f"<pre>{summary}</pre>",
)
response = DocCheckResponse(
results=results,
cookie_banner_result=cookie_result,
total_documents=len(results),
total_findings=total_findings,
email_status=email_result.get("status", "failed"),
checked_at=datetime.now(timezone.utc).isoformat(),
)
_doc_check_jobs[check_id]["status"] = "completed"
_doc_check_jobs[check_id]["result"] = response
_doc_check_jobs[check_id]["progress"] = "Fertig"
except Exception as e:
logger.error("Doc check %s failed: %s", check_id, e)
_doc_check_jobs[check_id]["status"] = "failed"
_doc_check_jobs[check_id]["error"] = str(e)[:500]
async def _check_single_document(entry: DocCheckEntry) -> DocCheckResult:
"""Load a single URL, expand content, extract text, run checklist."""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": entry.url, "max_documents": 1},
)
if resp.status_code != 200:
return DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=f"Seite nicht erreichbar (HTTP {resp.status_code})",
)
data = resp.json()
docs = data.get("documents", [])
# Use the first document found, or fall back to any text
doc_text = ""
word_count = 0
if docs:
doc_text = docs[0].get("full_text", "") or docs[0].get("text_preview", "")
word_count = docs[0].get("word_count", 0)
if not doc_text or len(doc_text) < 50:
return DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error="Kein Text extrahierbar",
)
# Run checklist
findings = check_document_completeness(
doc_text, entry.doc_type, entry.label, entry.url,
)
# Extract all_checks from SCORE finding
all_checks: list[CheckItem] = []
completeness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
checks_data = f.get("all_checks", [])
all_checks = [
CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
)
for c in checks_data
]
# Extract percentage
import re
pct_match = re.search(r"(\d+)%", f.get("text", ""))
if pct_match:
completeness = int(pct_match.group(1))
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
word_count=word_count, completeness_pct=completeness,
checks=all_checks, findings_count=len(non_score),
)
except Exception as e:
logger.warning("Doc check failed for %s: %s", entry.url, e)
return DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=str(e)[:200],
)
async def _check_cookie_banner(url: str) -> dict | None:
"""Run cookie banner consent test on a URL."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={"url": url, "timeout_per_phase": 8},
)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning("Cookie banner check failed: %s", e)
return None
def _build_report(results: list[DocCheckResult], cookie_result: dict | None) -> str:
"""Build email report."""
parts = [
"DOKUMENTEN-PRUEFUNG",
f"Dokumente geprueft: {len(results)}",
"",
]
for r in results:
status = "OK" if r.completeness_pct == 100 else "LUECKENHAFT" if r.completeness_pct >= 50 else "MANGELHAFT"
if r.error:
status = "FEHLER"
parts.append(f"[{status}] {r.label} ({r.completeness_pct}%, {r.word_count} Woerter)")
for check in r.checks:
icon = "+" if check.passed else "!!"
parts.append(f" [{icon}] {check.label}")
if r.error:
parts.append(f" FEHLER: {r.error}")
parts.append("")
if cookie_result:
parts.extend([
"Cookie-Banner Pruefung:",
f" Banner erkannt: {cookie_result.get('banner_detected', False)}",
f" Anbieter: {cookie_result.get('banner_provider', 'unbekannt')}",
])
violations = cookie_result.get("banner_checks", {}).get("violations", [])
if violations:
for v in violations[:10]:
parts.append(f" [!!] {v.get('text', '')[:80]}")
else:
parts.append(" Keine Verstoesse erkannt.")
return "\n".join(parts)
@@ -163,6 +163,36 @@ AGB_CHECKLIST = [
"patterns": [r"gerichtsstand", r"anwendbares\s+recht", r"jurisdiction", r"governing\s+law"]},
]
# §5 TMG / §18 MStV Impressum requirements
IMPRESSUM_CHECKLIST = [
{"id": "name", "label": "Name des Anbieters",
"patterns": [r"(?:gmbh|ag|e\.v\.|ohg|kg|gbr|ug|mbh|inc|ltd)", r"firma", r"unternehmen"]},
{"id": "address", "label": "Anschrift",
"patterns": [r"(?:str(?:asse|\.)|weg|platz|allee)\s*\d", r"d-\d{5}", r"\d{5}\s+\w+"]},
{"id": "contact", "label": "Kontaktdaten (E-Mail + Telefon)",
"patterns": [r"(?:e-?mail|mail).*@", r"telefon|phone|tel\.", r"\+?\d[\d\s/\-]{8,}"]},
{"id": "register", "label": "Handelsregister / Registernummer",
"patterns": [r"(?:handelsregister|hrb|hra|registergericht|amtsgericht)", r"register.*(?:nr|nummer)"]},
{"id": "vat", "label": "USt-IdNr.",
"patterns": [r"ust.*id", r"umsatzsteuer.*identifikation", r"vat.*id", r"de\s*\d{9}"]},
{"id": "representative", "label": "Vertretungsberechtigte",
"patterns": [r"vertretungsberechtigt", r"geschäftsführ", r"vorstand", r"inhaber"]},
]
# §25 TDDDG Cookie policy requirements
COOKIE_CHECKLIST = [
{"id": "cookie_types", "label": "Arten der Cookies",
"patterns": [r"(?:notwendig|essentiell|funktional|statistik|marketing|tracking)", r"cookie.*(?:art|typ|kategori)"]},
{"id": "purposes", "label": "Zwecke der Cookies",
"patterns": [r"zweck.*cookie", r"cookie.*zweck", r"(?:wofuer|wozu|warum).*cookie"]},
{"id": "retention", "label": "Speicherdauer der Cookies",
"patterns": [r"(?:speicherdauer|laufzeit|gueltigk|ablauf).*cookie", r"cookie.*(?:\d+\s+(?:tag|monat|jahr)|session)"]},
{"id": "third_party", "label": "Drittanbieter-Cookies",
"patterns": [r"drittanbieter", r"third.?party", r"(?:google|facebook|meta|microsoft).*cookie"]},
{"id": "opt_out", "label": "Widerspruchsmoeglichkeit",
"patterns": [r"(?:widerspruch|opt.?out|ablehnen|deaktivieren).*cookie", r"cookie.*(?:ablehnen|deaktivieren|loeschen)"]},
]
def check_document_completeness(
text: str,
@@ -215,15 +245,36 @@ def check_document_completeness(
elif doc_type in ("agb", "terms", "nutzungsbedingungen"):
checklist = AGB_CHECKLIST
label = "§305ff BGB"
elif doc_type in ("impressum", "imprint"):
checklist = IMPRESSUM_CHECKLIST
label = "§5 TMG / §18 MStV"
elif doc_type in ("cookie",):
checklist = COOKIE_CHECKLIST
label = "§25 TDDDG"
else:
checklist = ART13_CHECKLIST # Default: check as DSE
label = "Art. 13 DSGVO"
present = 0
total = len(checklist)
all_checks: list[dict] = []
for check in checklist:
found = any(re.search(p, text_lower) for p in check["patterns"])
if not found:
match = None
for p in check["patterns"]:
m = re.search(p, text_lower)
if m:
match = m
break
passed = match is not None
matched_text = ""
if match:
start = max(0, match.start() - 30)
end = min(len(text_lower), match.end() + 30)
matched_text = text_lower[start:end].strip()
present += 1
else:
findings.append({
"code": f"DSI-MISSING-{check['id'].upper()}",
"severity": check.get("severity", "MEDIUM"),
@@ -236,8 +287,14 @@ def check_document_completeness(
"doc_type": doc_type,
"check_id": check["id"],
})
else:
present += 1
all_checks.append({
"id": check["id"],
"label": check["label"],
"passed": passed,
"severity": check.get("severity", "MEDIUM"),
"matched_text": matched_text,
})
# Always add summary finding (even at 100% — needed for completeness tracking)
if total > 0:
@@ -252,6 +309,7 @@ def check_document_completeness(
"doc_title": doc_title,
"doc_url": doc_url,
"doc_type": doc_type,
"all_checks": all_checks,
})
return findings