Files
breakpilot-compliance/backend-compliance/compliance/api/agent_doc_check_routes.py
T
Benjamin Admin 4bfb438c92
Build + Deploy / build-admin-compliance (push) Successful in 2m17s
Build + Deploy / build-backend-compliance (push) Successful in 3m17s
Build + Deploy / build-ai-sdk (push) Successful in 56s
Build + Deploy / build-developer-portal (push) Successful in 1m37s
Build + Deploy / build-tts (push) Successful in 1m33s
Build + Deploy / build-document-crawler (push) Successful in 42s
Build + Deploy / build-dsms-gateway (push) Successful in 33s
Build + Deploy / build-dsms-node (push) Successful in 16s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 25s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 3m33s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Failing after 1m18s
CI / test-python-backend (push) Successful in 53s
CI / test-python-document-crawler (push) Successful in 36s
CI / test-python-dsms-gateway (push) Successful in 33s
CI / validate-canonical-controls (push) Successful in 24s
Build + Deploy / trigger-orca (push) Successful in 3m19s
feat: 4 banner check upgrades — 30 CMPs, stealth, Shadow DOM, categories
1. 30 CMP selectors (was 10): Added Sourcepoint, Iubenda, Complianz,
   CookieFirst, HubSpot, Osano, Piwik PRO, Cookie Consent (Insites),
   Axeptio, Termly, CookieScript, Civic UK, GDPR Cookie Compliance,
   CookieHub, Ketch, Admiral, Sibbo, Evidon, LiveRamp, Adsimple.
   Plus improved generic fallback: role=dialog, aria-label, data-* attrs.

2. Playwright stealth mode: playwright-stealth against bot detection.
   Removes WebDriver flag, simulates plugins, realistic viewport/locale.
   Launch args: --disable-blink-features=AutomationControlled.

3. Shadow DOM: Recursive JS-based search through shadowRoot elements
   for consent banners. Fallback click via page.evaluate() when
   normal Playwright selectors can't penetrate Shadow DOM.

4. Category selection UI: User can choose which cookie categories to
   test (Notwendig, Statistik, Marketing, Funktional, Praeferenzen).
   Pill-style checkboxes in BannerCheckTab, forwarded through API chain.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-09 08:42:30 +02:00

453 lines
16 KiB
Python

"""
Agent Document Check Routes — Multi-URL document verification.
The user provides explicit URLs + document types. No crawling needed.
Each document is loaded, expanded (accordions/tabs), text extracted,
and checked against its type-specific legal checklist.
POST /api/compliance/agent/doc-check
"""
import asyncio
import logging
import os
import uuid as _uuid
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.dsi_document_checker import (
check_document_completeness, classify_document_type,
)
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
class DocCheckEntry(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, other
label: str
url: str
class DocCheckRequest(BaseModel):
entries: list[DocCheckEntry]
recipient: str = "dsb@breakpilot.local"
check_cookie_banner: bool = False
class CheckItem(BaseModel):
id: str
label: str
passed: bool
severity: str
matched_text: str = ""
level: int = 1
parent: str | None = None
skipped: bool = False
hint: str = ""
class DocCheckResult(BaseModel):
label: str
url: str
doc_type: str
word_count: int = 0
completeness_pct: int = 0
correctness_pct: int = 0
checks: list[CheckItem] = []
findings_count: int = 0
error: str = ""
class DocCheckResponse(BaseModel):
results: list[DocCheckResult]
cookie_banner_result: dict | None = None
total_documents: int
total_findings: int
email_status: str = ""
checked_at: str
# In-memory job store for async processing
_doc_check_jobs: dict[str, dict] = {}
class DocCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class DocCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
result: DocCheckResponse | None = None
error: str = ""
class BannerCheckRequest(BaseModel):
url: str
categories: list[str] = [] # empty = test all categories
@router.post("/banner-check")
async def run_banner_check(req: BannerCheckRequest):
"""Run cookie banner compliance check via consent-tester."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={
"url": req.url,
"timeout_per_phase": 10,
"categories": req.categories,
},
)
if resp.status_code == 200:
return resp.json()
return {"error": f"Consent-Tester: HTTP {resp.status_code}"}
except Exception as e:
return {"error": str(e)[:200]}
@router.post("/doc-check")
async def start_doc_check(req: DocCheckRequest):
"""Start async multi-URL document check."""
check_id = str(_uuid.uuid4())[:8]
_doc_check_jobs[check_id] = {"status": "running", "progress": "Pruefung gestartet...", "result": None, "error": ""}
asyncio.create_task(_run_doc_check(check_id, req))
return DocCheckStartResponse(check_id=check_id, status="running")
@router.get("/doc-check/{check_id}")
async def get_doc_check_status(check_id: str):
"""Poll document check status."""
job = _doc_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return DocCheckStatusResponse(
check_id=check_id, status=job["status"],
progress=job.get("progress", ""), result=job.get("result"),
error=job.get("error", ""),
)
async def _run_doc_check(check_id: str, req: DocCheckRequest):
"""Background task: check each document."""
try:
results: list[DocCheckResult] = []
total_findings = 0
for i, entry in enumerate(req.entries):
_doc_check_jobs[check_id]["progress"] = (
f"Dokument {i+1}/{len(req.entries)}: {entry.label}..."
)
doc_results = await _check_single_document(entry)
results.extend(doc_results)
total_findings += sum(r.findings_count for r in doc_results)
# Optional: Cookie banner check on first URL
cookie_result = None
if req.check_cookie_banner and req.entries:
_doc_check_jobs[check_id]["progress"] = "Cookie-Banner wird geprueft..."
cookie_result = await _check_cookie_banner(req.entries[0].url)
# Build email report
_doc_check_jobs[check_id]["progress"] = "Report wird erstellt..."
summary = _build_report(results, cookie_result)
email_result = send_email(
recipient=req.recipient,
subject=f"[DOKUMENTEN-PRUEFUNG] {len(results)} Dokumente geprueft",
body_html=summary,
)
response = DocCheckResponse(
results=results,
cookie_banner_result=cookie_result,
total_documents=len(results),
total_findings=total_findings,
email_status=email_result.get("status", "failed"),
checked_at=datetime.now(timezone.utc).isoformat(),
)
_doc_check_jobs[check_id]["status"] = "completed"
_doc_check_jobs[check_id]["result"] = response
_doc_check_jobs[check_id]["progress"] = "Fertig"
except Exception as e:
logger.error("Doc check %s failed: %s", check_id, e)
_doc_check_jobs[check_id]["status"] = "failed"
_doc_check_jobs[check_id]["error"] = str(e)[:500]
async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
"""Load a single URL, expand content, extract text, split into sections,
and check each section against its type-specific checklist.
Returns multiple results if the page contains sub-documents
(e.g. Cookies section, Social Media section on a DSI page).
"""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": entry.url, "max_documents": 1},
)
if resp.status_code != 200:
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=f"Seite nicht erreichbar (HTTP {resp.status_code})",
)]
data = resp.json()
docs = data.get("documents", [])
doc_text = ""
word_count = 0
if docs:
doc_text = docs[0].get("full_text", "") or docs[0].get("text_preview", "")
word_count = docs[0].get("word_count", 0)
if not doc_text or len(doc_text) < 50:
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error="Kein Text extrahierbar",
)]
# Split text into sections and check each
sections = _split_into_sections(doc_text, entry.label, entry.url)
all_results: list[DocCheckResult] = []
# Main document check (full text against primary type)
main_result = await _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count)
# Control Library deep check — DISABLED until doc-check-specific
# Master Controls with binary pass/fail criteria are available.
# See: zeroclaw/INSTRUCTION-master-controls-for-doc-check.md
# Code: compliance/services/rag_document_checker.py (ready to re-enable)
all_results.append(main_result)
# Sub-section checks (auto-detected from headings)
# Pass full doc_text for LLM verification fallback
for section in sections:
if section["word_count"] < 100:
continue
sub_result = await _run_checklist(
section["text"], section["doc_type"],
section["title"], entry.url,
section["word_count"],
full_text=doc_text,
)
all_results.append(sub_result)
return all_results
except Exception as e:
logger.warning("Doc check failed for %s: %s", entry.url, e)
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=str(e)[:200],
)]
async def _run_checklist(
text: str, doc_type: str, label: str, url: str,
word_count: int = 0, full_text: str = "",
) -> DocCheckResult:
"""Run checklist against text, then LLM-verify failed checks.
Args:
full_text: Optional full document text for LLM verification.
If empty, uses `text` (the section fragment).
"""
findings = check_document_completeness(text, doc_type, label, url)
all_checks: list[CheckItem] = []
completeness = 0
correctness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
for c in f.get("all_checks", []):
all_checks.append(CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
level=c.get("level", 1),
parent=c.get("parent"),
skipped=c.get("skipped", False),
hint=c.get("hint", ""),
))
completeness = f.get("completeness_pct", 0)
correctness = f.get("correctness_pct", 0)
# LLM verification: re-check regex FAILs to eliminate false positives
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
if failed:
try:
from compliance.services.doc_checks.llm_verify import verify_failed_checks
overturns = await verify_failed_checks(
full_text or text,
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
label,
)
for c in all_checks:
if c.id in overturns and overturns[c.id]["overturned"]:
c.passed = True
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
logger.info("LLM overturned: %s in %s", c.label, label)
# Recompute correctness after overturns
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2_active if c.passed)
if l2_active:
correctness = round(l2_passed / len(l2_active) * 100)
except Exception as e:
logger.warning("LLM verification skipped: %s", e)
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=label, url=url, doc_type=doc_type,
word_count=word_count or len(text.split()),
completeness_pct=completeness,
correctness_pct=correctness,
checks=all_checks, findings_count=len(non_score),
)
# Section heading patterns → document type mapping
# ONLY sections that are genuinely separate document types with their own checklists.
# Everything else (Social Media, Betroffenenrechte, Dienste von Drittanbietern)
# is part of the parent DSI and inherits its checks.
SECTION_TYPE_MAP = [
(r"^cookie", "cookie"),
(r"widerrufsrecht|widerrufsbelehrung", "widerruf"),
(r"^impressum$", "impressum"),
(r"^(?:agb|allgemeine geschäftsbedingungen|nutzungsbedingungen)$", "agb"),
# DSFA MUST be checked BEFORE social_media (both can contain "Social Media")
(r"datenschutzfolge|dsfa|risikoanalyse", "dsfa"),
(r"^social\s*media$|^soziale\s+(?:medien|netzwerke)$", "social_media"),
(r"datenschutzerkl(?:ae|ä)rung.*social|datenschutz\s+f(?:ue|ü)r\s+social", "social_media"),
(r"(?:verordnung|regulation)\s*\(?eu\)?\s*2018\s*/?\s*1725", "eu_institution"),
]
def _split_into_sections(text: str, parent_label: str, url: str) -> list[dict]:
"""Split document text at major headings into sub-sections.
Detects sections like 'Cookies', 'Social Media', 'Dienste von Drittanbietern'
and classifies each by document type for separate checking.
Deduplicates: if the same doc_type appears twice, texts are merged.
"""
import re as _re
sections: list[dict] = []
seen_types: dict[str, int] = {} # doc_type -> index in sections
lines = text.split("\n")
current_heading = ""
current_text: list[str] = []
def _save_section(heading: str, text_lines: list[str]) -> None:
sec_text = "\n".join(text_lines)
if len(sec_text.split()) < 100:
return
sec_type = _classify_section(heading)
if not sec_type:
return
# Merge duplicate doc_types (e.g. two "Social Media" headings)
if sec_type in seen_types:
idx = seen_types[sec_type]
sections[idx]["text"] += "\n\n" + sec_text
sections[idx]["word_count"] = len(sections[idx]["text"].split())
else:
seen_types[sec_type] = len(sections)
sections.append({
"title": f"{parent_label} > {heading}",
"text": sec_text,
"doc_type": sec_type,
"word_count": len(sec_text.split()),
})
for line in lines:
stripped = line.strip()
# Only split at headings that classify as a known document type.
# This prevents table content ("Funktionale Cookies", "Typen")
# from triggering section splits.
is_heading = (
5 < len(stripped) < 80
and not stripped.endswith(".")
and not stripped.endswith(",")
and (stripped[0].isupper() or stripped[0].isdigit())
)
classified = _classify_section(stripped) if is_heading else None
is_real_heading = is_heading and classified is not None
is_skip = is_real_heading and stripped.lower().strip() in SKIP_HEADINGS
if is_real_heading and not is_skip and current_heading:
_save_section(current_heading, current_text)
if is_real_heading and not is_skip:
current_heading = stripped
current_text = []
else:
current_text.append(line)
# Last section
if current_heading:
_save_section(current_heading, current_text)
return sections
# Headings to skip — sub-sections of other documents, not standalone
SKIP_HEADINGS = {
"nutzungskonzept social media", # Internal concept, no legal checklist
"risikoabwägung und datenschutzfolgenabschätzung", # Sub-section of DSFA
"risikoabwaegung und datenschutzfolgenabschaetzung",
}
# Track already-seen section types to avoid duplicate sub-documents
# (e.g. two "Social Media" headings on the same page)
_DEDUP_TYPES = {"social_media", "cookie", "dsfa", "widerruf", "impressum"}
def _classify_section(heading: str) -> str | None:
"""Classify a section heading into a document type."""
import re as _re
heading_lower = heading.lower().strip()
# Strip leading numbers/bullets: "5. Soziale Medien" → "soziale medien"
heading_lower = _re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip()
# Skip known sub-sections
if heading_lower in SKIP_HEADINGS:
return None
for pattern, doc_type in SECTION_TYPE_MAP:
if _re.search(pattern, heading_lower):
return doc_type
return None
async def _check_cookie_banner(url: str) -> dict | None:
"""Run cookie banner consent test on a URL."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={"url": url, "timeout_per_phase": 8},
)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning("Cookie banner check failed: %s", e)
return None
def _build_report(results: list[DocCheckResult], cookie_result: dict | None) -> str:
from .agent_doc_check_report import build_html_report
return build_html_report(results, cookie_result)