Files
breakpilot-compliance/backend-compliance/compliance/api/agent_doc_check_routes.py
T
Benjamin Admin 13c5880f51 fix: Restrict sub-section detection to genuinely separate document types
Only Cookie and Widerruf sections are checked as separate documents.
Social Media, DSFA, Betroffenenrechte, Dienste von Drittanbietern are
part of the parent DSI and no longer generate false findings.

Added PLAN-rag-document-check.md for Phase 2:
- RAG-based checks with document-type-specific Controls
- DSFA checklist (Art. 35 + Landes-Listen)
- AVV checklist (Art. 28)
- Reference detection (sub-doc → parent doc)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 11:02:36 +02:00

383 lines
13 KiB
Python

"""
Agent Document Check Routes — Multi-URL document verification.
The user provides explicit URLs + document types. No crawling needed.
Each document is loaded, expanded (accordions/tabs), text extracted,
and checked against its type-specific legal checklist.
POST /api/compliance/agent/doc-check
"""
import asyncio
import logging
import os
import uuid as _uuid
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.dsi_document_checker import (
check_document_completeness, classify_document_type,
)
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
class DocCheckEntry(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, other
label: str
url: str
class DocCheckRequest(BaseModel):
entries: list[DocCheckEntry]
recipient: str = "dsb@breakpilot.local"
check_cookie_banner: bool = False
class CheckItem(BaseModel):
id: str
label: str
passed: bool
severity: str
matched_text: str = ""
class DocCheckResult(BaseModel):
label: str
url: str
doc_type: str
word_count: int = 0
completeness_pct: int = 0
checks: list[CheckItem] = []
findings_count: int = 0
error: str = ""
class DocCheckResponse(BaseModel):
results: list[DocCheckResult]
cookie_banner_result: dict | None = None
total_documents: int
total_findings: int
email_status: str = ""
checked_at: str
# In-memory job store for async processing
_doc_check_jobs: dict[str, dict] = {}
class DocCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class DocCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
result: DocCheckResponse | None = None
error: str = ""
@router.post("/doc-check")
async def start_doc_check(req: DocCheckRequest):
"""Start async multi-URL document check."""
check_id = str(_uuid.uuid4())[:8]
_doc_check_jobs[check_id] = {"status": "running", "progress": "Pruefung gestartet...", "result": None, "error": ""}
asyncio.create_task(_run_doc_check(check_id, req))
return DocCheckStartResponse(check_id=check_id, status="running")
@router.get("/doc-check/{check_id}")
async def get_doc_check_status(check_id: str):
"""Poll document check status."""
job = _doc_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return DocCheckStatusResponse(
check_id=check_id, status=job["status"],
progress=job.get("progress", ""), result=job.get("result"),
error=job.get("error", ""),
)
async def _run_doc_check(check_id: str, req: DocCheckRequest):
"""Background task: check each document."""
try:
results: list[DocCheckResult] = []
total_findings = 0
for i, entry in enumerate(req.entries):
_doc_check_jobs[check_id]["progress"] = (
f"Dokument {i+1}/{len(req.entries)}: {entry.label}..."
)
doc_results = await _check_single_document(entry)
results.extend(doc_results)
total_findings += sum(r.findings_count for r in doc_results)
# Optional: Cookie banner check on first URL
cookie_result = None
if req.check_cookie_banner and req.entries:
_doc_check_jobs[check_id]["progress"] = "Cookie-Banner wird geprueft..."
cookie_result = await _check_cookie_banner(req.entries[0].url)
# Build email report
_doc_check_jobs[check_id]["progress"] = "Report wird erstellt..."
summary = _build_report(results, cookie_result)
email_result = send_email(
recipient=req.recipient,
subject=f"[DOKUMENTEN-PRUEFUNG] {len(results)} Dokumente geprueft",
body_html=f"<pre>{summary}</pre>",
)
response = DocCheckResponse(
results=results,
cookie_banner_result=cookie_result,
total_documents=len(results),
total_findings=total_findings,
email_status=email_result.get("status", "failed"),
checked_at=datetime.now(timezone.utc).isoformat(),
)
_doc_check_jobs[check_id]["status"] = "completed"
_doc_check_jobs[check_id]["result"] = response
_doc_check_jobs[check_id]["progress"] = "Fertig"
except Exception as e:
logger.error("Doc check %s failed: %s", check_id, e)
_doc_check_jobs[check_id]["status"] = "failed"
_doc_check_jobs[check_id]["error"] = str(e)[:500]
async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
"""Load a single URL, expand content, extract text, split into sections,
and check each section against its type-specific checklist.
Returns multiple results if the page contains sub-documents
(e.g. Cookies section, Social Media section on a DSI page).
"""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": entry.url, "max_documents": 1},
)
if resp.status_code != 200:
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=f"Seite nicht erreichbar (HTTP {resp.status_code})",
)]
data = resp.json()
docs = data.get("documents", [])
doc_text = ""
word_count = 0
if docs:
doc_text = docs[0].get("full_text", "") or docs[0].get("text_preview", "")
word_count = docs[0].get("word_count", 0)
if not doc_text or len(doc_text) < 50:
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error="Kein Text extrahierbar",
)]
# Split text into sections and check each
sections = _split_into_sections(doc_text, entry.label, entry.url)
all_results: list[DocCheckResult] = []
# Main document check (full text against primary type)
main_result = _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count)
all_results.append(main_result)
# Sub-section checks (auto-detected from headings)
for section in sections:
if section["word_count"] < 100:
continue
sub_result = _run_checklist(
section["text"], section["doc_type"],
section["title"], entry.url,
section["word_count"],
)
all_results.append(sub_result)
return all_results
except Exception as e:
logger.warning("Doc check failed for %s: %s", entry.url, e)
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=str(e)[:200],
)]
def _run_checklist(text: str, doc_type: str, label: str, url: str, word_count: int = 0) -> DocCheckResult:
"""Run checklist against text and return structured result."""
import re as _re
findings = check_document_completeness(text, doc_type, label, url)
all_checks: list[CheckItem] = []
completeness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
for c in f.get("all_checks", []):
all_checks.append(CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
))
pct_match = _re.search(r"(\d+)%", f.get("text", ""))
if pct_match:
completeness = int(pct_match.group(1))
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=label, url=url, doc_type=doc_type,
word_count=word_count or len(text.split()),
completeness_pct=completeness,
checks=all_checks, findings_count=len(non_score),
)
# Section heading patterns → document type mapping
# ONLY sections that are genuinely separate document types with their own checklists.
# Everything else (Social Media, Betroffenenrechte, Dienste von Drittanbietern)
# is part of the parent DSI and inherits its checks.
SECTION_TYPE_MAP = [
(r"^cookie", "cookie"), # Cookie-Richtlinie → §25 TDDDG
(r"widerrufsrecht|widerrufsbelehrung", "widerruf"), # Widerruf → §355 BGB
(r"^impressum$", "impressum"), # Impressum → §5 TMG
(r"^(?:agb|allgemeine geschäftsbedingungen|nutzungsbedingungen)$", "agb"),
# NOTE: Social Media, DSFA, Datensicherheit, Betroffenenrechte are NOT
# separate documents — they are sections within the parent DSI.
# DSFA needs its own checklist (RAG-based) — Phase 2.
]
def _split_into_sections(text: str, parent_label: str, url: str) -> list[dict]:
"""Split document text at major headings into sub-sections.
Detects sections like 'Cookies', 'Social Media', 'Dienste von Drittanbietern'
and classifies each by document type for separate checking.
"""
import re as _re
sections = []
# Split by lines that look like headings (short, followed by longer content)
lines = text.split("\n")
current_heading = ""
current_text = []
for line in lines:
stripped = line.strip()
# Detect heading: short line (< 80 chars), not empty, followed by content
is_heading = (
5 < len(stripped) < 80
and not stripped.endswith(".")
and not stripped.endswith(",")
and stripped[0].isupper()
)
if is_heading and current_heading and len("\n".join(current_text)) > 200:
# Save previous section
sec_text = "\n".join(current_text)
sec_type = _classify_section(current_heading)
if sec_type and sec_type != "skip":
sections.append({
"title": f"{parent_label} > {current_heading}",
"text": sec_text,
"doc_type": sec_type,
"word_count": len(sec_text.split()),
})
if is_heading:
current_heading = stripped
current_text = []
else:
current_text.append(line)
# Last section
if current_heading and len("\n".join(current_text)) > 200:
sec_text = "\n".join(current_text)
sec_type = _classify_section(current_heading)
if sec_type and sec_type != "skip":
sections.append({
"title": f"{parent_label} > {current_heading}",
"text": sec_text,
"doc_type": sec_type,
"word_count": len(sec_text.split()),
})
return sections
def _classify_section(heading: str) -> str | None:
"""Classify a section heading into a document type."""
import re as _re
heading_lower = heading.lower()
for pattern, doc_type in SECTION_TYPE_MAP:
if _re.search(pattern, heading_lower):
return doc_type
return None
async def _check_cookie_banner(url: str) -> dict | None:
"""Run cookie banner consent test on a URL."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={"url": url, "timeout_per_phase": 8},
)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning("Cookie banner check failed: %s", e)
return None
def _build_report(results: list[DocCheckResult], cookie_result: dict | None) -> str:
"""Build email report."""
parts = [
"DOKUMENTEN-PRUEFUNG",
f"Dokumente geprueft: {len(results)}",
"",
]
for r in results:
status = "OK" if r.completeness_pct == 100 else "LUECKENHAFT" if r.completeness_pct >= 50 else "MANGELHAFT"
if r.error:
status = "FEHLER"
parts.append(f"[{status}] {r.label} ({r.completeness_pct}%, {r.word_count} Woerter)")
for check in r.checks:
icon = "+" if check.passed else "!!"
parts.append(f" [{icon}] {check.label}")
if r.error:
parts.append(f" FEHLER: {r.error}")
parts.append("")
if cookie_result:
parts.extend([
"Cookie-Banner Pruefung:",
f" Banner erkannt: {cookie_result.get('banner_detected', False)}",
f" Anbieter: {cookie_result.get('banner_provider', 'unbekannt')}",
])
violations = cookie_result.get("banner_checks", {}).get("violations", [])
if violations:
for v in violations[:10]:
parts.append(f" [!!] {v.get('text', '')[:80]}")
else:
parts.append(" Keine Verstoesse erkannt.")
return "\n".join(parts)