""" Agent Document Check Routes — Multi-URL document verification. The user provides explicit URLs + document types. No crawling needed. Each document is loaded, expanded (accordions/tabs), text extracted, and checked against its type-specific legal checklist. POST /api/compliance/agent/doc-check """ import asyncio import logging import os import uuid as _uuid from datetime import datetime, timezone import httpx from fastapi import APIRouter from pydantic import BaseModel from compliance.services.dsi_document_checker import ( check_document_completeness, classify_document_type, ) from compliance.services.smtp_sender import send_email logger = logging.getLogger(__name__) router = APIRouter(prefix="/compliance/agent", tags=["agent"]) CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094" class DocCheckEntry(BaseModel): doc_type: str # dse, agb, impressum, cookie, widerruf, other label: str url: str class DocCheckRequest(BaseModel): entries: list[DocCheckEntry] recipient: str = "dsb@breakpilot.local" check_cookie_banner: bool = False class CheckItem(BaseModel): id: str label: str passed: bool severity: str matched_text: str = "" level: int = 1 parent: str | None = None skipped: bool = False hint: str = "" class DocCheckResult(BaseModel): label: str url: str doc_type: str word_count: int = 0 completeness_pct: int = 0 correctness_pct: int = 0 checks: list[CheckItem] = [] findings_count: int = 0 error: str = "" class DocCheckResponse(BaseModel): results: list[DocCheckResult] cookie_banner_result: dict | None = None total_documents: int total_findings: int email_status: str = "" checked_at: str # In-memory job store for async processing _doc_check_jobs: dict[str, dict] = {} class DocCheckStartResponse(BaseModel): check_id: str status: str = "running" class DocCheckStatusResponse(BaseModel): check_id: str status: str progress: str = "" result: DocCheckResponse | None = None error: str = "" @router.post("/doc-check") async def start_doc_check(req: DocCheckRequest): """Start async multi-URL document check.""" check_id = str(_uuid.uuid4())[:8] _doc_check_jobs[check_id] = {"status": "running", "progress": "Pruefung gestartet...", "result": None, "error": ""} asyncio.create_task(_run_doc_check(check_id, req)) return DocCheckStartResponse(check_id=check_id, status="running") @router.get("/doc-check/{check_id}") async def get_doc_check_status(check_id: str): """Poll document check status.""" job = _doc_check_jobs.get(check_id) if not job: return {"check_id": check_id, "status": "not_found"} return DocCheckStatusResponse( check_id=check_id, status=job["status"], progress=job.get("progress", ""), result=job.get("result"), error=job.get("error", ""), ) async def _run_doc_check(check_id: str, req: DocCheckRequest): """Background task: check each document.""" try: results: list[DocCheckResult] = [] total_findings = 0 for i, entry in enumerate(req.entries): _doc_check_jobs[check_id]["progress"] = ( f"Dokument {i+1}/{len(req.entries)}: {entry.label}..." ) doc_results = await _check_single_document(entry) results.extend(doc_results) total_findings += sum(r.findings_count for r in doc_results) # Optional: Cookie banner check on first URL cookie_result = None if req.check_cookie_banner and req.entries: _doc_check_jobs[check_id]["progress"] = "Cookie-Banner wird geprueft..." cookie_result = await _check_cookie_banner(req.entries[0].url) # Build email report _doc_check_jobs[check_id]["progress"] = "Report wird erstellt..." summary = _build_report(results, cookie_result) email_result = send_email( recipient=req.recipient, subject=f"[DOKUMENTEN-PRUEFUNG] {len(results)} Dokumente geprueft", body_html=summary, ) response = DocCheckResponse( results=results, cookie_banner_result=cookie_result, total_documents=len(results), total_findings=total_findings, email_status=email_result.get("status", "failed"), checked_at=datetime.now(timezone.utc).isoformat(), ) _doc_check_jobs[check_id]["status"] = "completed" _doc_check_jobs[check_id]["result"] = response _doc_check_jobs[check_id]["progress"] = "Fertig" except Exception as e: logger.error("Doc check %s failed: %s", check_id, e) _doc_check_jobs[check_id]["status"] = "failed" _doc_check_jobs[check_id]["error"] = str(e)[:500] async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]: """Load a single URL, expand content, extract text, split into sections, and check each section against its type-specific checklist. Returns multiple results if the page contains sub-documents (e.g. Cookies section, Social Media section on a DSI page). """ try: async with httpx.AsyncClient(timeout=90.0) as client: resp = await client.post( f"{CONSENT_TESTER_URL}/dsi-discovery", json={"url": entry.url, "max_documents": 1}, ) if resp.status_code != 200: return [DocCheckResult( label=entry.label, url=entry.url, doc_type=entry.doc_type, error=f"Seite nicht erreichbar (HTTP {resp.status_code})", )] data = resp.json() docs = data.get("documents", []) doc_text = "" word_count = 0 if docs: doc_text = docs[0].get("full_text", "") or docs[0].get("text_preview", "") word_count = docs[0].get("word_count", 0) if not doc_text or len(doc_text) < 50: return [DocCheckResult( label=entry.label, url=entry.url, doc_type=entry.doc_type, error="Kein Text extrahierbar", )] # Split text into sections and check each sections = _split_into_sections(doc_text, entry.label, entry.url) all_results: list[DocCheckResult] = [] # Main document check (full text against primary type) main_result = _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count) # Control Library deep check — DISABLED until doc-check-specific # Master Controls with binary pass/fail criteria are available. # See: zeroclaw/INSTRUCTION-master-controls-for-doc-check.md # Code: compliance/services/rag_document_checker.py (ready to re-enable) all_results.append(main_result) # Sub-section checks (auto-detected from headings) for section in sections: if section["word_count"] < 100: continue sub_result = _run_checklist( section["text"], section["doc_type"], section["title"], entry.url, section["word_count"], ) all_results.append(sub_result) return all_results except Exception as e: logger.warning("Doc check failed for %s: %s", entry.url, e) return [DocCheckResult( label=entry.label, url=entry.url, doc_type=entry.doc_type, error=str(e)[:200], )] def _run_checklist(text: str, doc_type: str, label: str, url: str, word_count: int = 0) -> DocCheckResult: """Run checklist against text and return structured result.""" findings = check_document_completeness(text, doc_type, label, url) all_checks: list[CheckItem] = [] completeness = 0 correctness = 0 for f in findings: if "SCORE" in f.get("code", ""): for c in f.get("all_checks", []): all_checks.append(CheckItem( id=c["id"], label=c["label"], passed=c["passed"], severity=c["severity"], matched_text=c.get("matched_text", ""), level=c.get("level", 1), parent=c.get("parent"), skipped=c.get("skipped", False), hint=c.get("hint", ""), )) completeness = f.get("completeness_pct", 0) correctness = f.get("correctness_pct", 0) non_score = [f for f in findings if "SCORE" not in f.get("code", "")] return DocCheckResult( label=label, url=url, doc_type=doc_type, word_count=word_count or len(text.split()), completeness_pct=completeness, correctness_pct=correctness, checks=all_checks, findings_count=len(non_score), ) # Section heading patterns → document type mapping # ONLY sections that are genuinely separate document types with their own checklists. # Everything else (Social Media, Betroffenenrechte, Dienste von Drittanbietern) # is part of the parent DSI and inherits its checks. SECTION_TYPE_MAP = [ (r"^cookie", "cookie"), (r"widerrufsrecht|widerrufsbelehrung", "widerruf"), (r"^impressum$", "impressum"), (r"^(?:agb|allgemeine geschäftsbedingungen|nutzungsbedingungen)$", "agb"), # DSFA MUST be checked BEFORE social_media (both can contain "Social Media") (r"datenschutzfolge|dsfa|risikoanalyse", "dsfa"), (r"^social\s*media$", "social_media"), # Standalone heading "Social Media" = DSE (r"datenschutzerkl(?:ae|ä)rung.*social|datenschutz\s+f(?:ue|ü)r\s+social", "social_media"), ] def _split_into_sections(text: str, parent_label: str, url: str) -> list[dict]: """Split document text at major headings into sub-sections. Detects sections like 'Cookies', 'Social Media', 'Dienste von Drittanbietern' and classifies each by document type for separate checking. Deduplicates: if the same doc_type appears twice, texts are merged. """ import re as _re sections: list[dict] = [] seen_types: dict[str, int] = {} # doc_type -> index in sections lines = text.split("\n") current_heading = "" current_text: list[str] = [] def _save_section(heading: str, text_lines: list[str]) -> None: sec_text = "\n".join(text_lines) if len(sec_text.split()) < 100: return sec_type = _classify_section(heading) if not sec_type: return # Merge duplicate doc_types (e.g. two "Social Media" headings) if sec_type in seen_types: idx = seen_types[sec_type] sections[idx]["text"] += "\n\n" + sec_text sections[idx]["word_count"] = len(sections[idx]["text"].split()) else: seen_types[sec_type] = len(sections) sections.append({ "title": f"{parent_label} > {heading}", "text": sec_text, "doc_type": sec_type, "word_count": len(sec_text.split()), }) for line in lines: stripped = line.strip() is_heading = ( 5 < len(stripped) < 80 and not stripped.endswith(".") and not stripped.endswith(",") and stripped[0].isupper() ) is_skip = is_heading and stripped.lower().strip() in SKIP_HEADINGS if is_heading and not is_skip and current_heading: _save_section(current_heading, current_text) if is_heading and not is_skip: current_heading = stripped current_text = [] else: current_text.append(line) # Last section if current_heading: _save_section(current_heading, current_text) return sections # Headings to skip — sub-sections of other documents, not standalone SKIP_HEADINGS = { "nutzungskonzept social media", # Internal concept, no legal checklist "risikoabwägung und datenschutzfolgenabschätzung", # Sub-section of DSFA "risikoabwaegung und datenschutzfolgenabschaetzung", } # Track already-seen section types to avoid duplicate sub-documents # (e.g. two "Social Media" headings on the same page) _DEDUP_TYPES = {"social_media", "cookie", "dsfa", "widerruf", "impressum"} def _classify_section(heading: str) -> str | None: """Classify a section heading into a document type.""" import re as _re heading_lower = heading.lower().strip() # Skip known sub-sections if heading_lower in SKIP_HEADINGS: return None for pattern, doc_type in SECTION_TYPE_MAP: if _re.search(pattern, heading_lower): return doc_type return None async def _check_cookie_banner(url: str) -> dict | None: """Run cookie banner consent test on a URL.""" try: async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post( f"{CONSENT_TESTER_URL}/scan", json={"url": url, "timeout_per_phase": 8}, ) if resp.status_code == 200: return resp.json() except Exception as e: logger.warning("Cookie banner check failed: %s", e) return None def _build_report(results: list[DocCheckResult], cookie_result: dict | None) -> str: from .agent_doc_check_report import build_html_report return build_html_report(results, cookie_result)