Files
breakpilot-compliance/backend-compliance/compliance/api/agent_doc_check_routes.py
T
Benjamin Admin 7be34552bb
Build + Deploy / build-admin-compliance (push) Successful in 15s
Build + Deploy / build-backend-compliance (push) Successful in 21s
Build + Deploy / build-ai-sdk (push) Successful in 46s
Build + Deploy / build-developer-portal (push) Successful in 12s
Build + Deploy / build-tts (push) Successful in 13s
Build + Deploy / build-document-crawler (push) Successful in 11s
Build + Deploy / build-dsms-gateway (push) Successful in 11s
Build + Deploy / build-dsms-node (push) Successful in 14s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 17s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m46s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 47s
CI / test-python-backend (push) Successful in 39s
CI / test-python-document-crawler (push) Successful in 27s
CI / test-python-dsms-gateway (push) Successful in 22s
CI / validate-canonical-controls (push) Successful in 16s
Build + Deploy / trigger-orca (push) Successful in 2m29s
feat(compliance-check): profile extraction + scenario classification
- New profile_extractor.py: extracts Company Profile fields (name,
  legal form, address, DPO, USt-IdNr) and Compliance Scope hints
  (Art. 9 data, third country, profiling) from document texts
- Scenario per document: regenerate (<30%), fix (30-95%), import (>95%)
- Widerruf for B2B: no longer skipped, instead all checks flagged as
  INFO with "not needed for B2B" hint
- Move _build_profile_html to report builder module
- DocCheckResult gets scenario field

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-12 17:34:33 +02:00

524 lines
20 KiB
Python

"""
Agent Document Check Routes — Multi-URL document verification.
The user provides explicit URLs + document types. No crawling needed.
Each document is loaded, expanded (accordions/tabs), text extracted,
and checked against its type-specific legal checklist.
POST /api/compliance/agent/doc-check
"""
import asyncio
import logging
import os
import uuid as _uuid
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.dsi_document_checker import (
check_document_completeness, classify_document_type,
)
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
class DocCheckEntry(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, other
label: str
url: str
class DocCheckRequest(BaseModel):
entries: list[DocCheckEntry]
recipient: str = "dsb@breakpilot.local"
check_cookie_banner: bool = False
use_agent: bool = False
class CheckItem(BaseModel):
id: str
label: str
passed: bool
severity: str
matched_text: str = ""
level: int = 1
parent: str | None = None
skipped: bool = False
hint: str = ""
class DocCheckResult(BaseModel):
label: str
url: str
doc_type: str
word_count: int = 0
completeness_pct: int = 0
correctness_pct: int = 0
checks: list[CheckItem] = []
findings_count: int = 0
error: str = ""
scenario: str = "" # regenerate | fix | import | skip
class DocCheckResponse(BaseModel):
results: list[DocCheckResult]
cookie_banner_result: dict | None = None
total_documents: int
total_findings: int
email_status: str = ""
checked_at: str
# In-memory job store for async processing
_doc_check_jobs: dict[str, dict] = {}
class DocCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class DocCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
result: DocCheckResponse | None = None
error: str = ""
class BannerCheckRequest(BaseModel):
url: str
categories: list[str] = [] # empty = test all categories
@router.post("/banner-check")
async def run_banner_check(req: BannerCheckRequest):
"""Run cookie banner compliance check via consent-tester."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={
"url": req.url,
"timeout_per_phase": 10,
"categories": req.categories,
},
)
if resp.status_code != 200:
return {"error": f"Consent-Tester: HTTP {resp.status_code}"}
result = resp.json()
# Send email report
checks = result.get("structured_checks", [])
violations = [c for c in checks if not c.get("passed") and not c.get("skipped")]
passes = [c for c in checks if c.get("passed")]
provider = result.get("banner_provider", "Unbekannt")
comp_pct = result.get("completeness_pct", 0)
html = [
'<div style="font-family:-apple-system,sans-serif;max-width:700px;margin:0 auto">',
f'<h2>Banner-Check: {req.url}</h2>',
f'<p>Banner: {provider} | Vollstaendigkeit: {comp_pct}%</p>',
]
if violations:
html.append(f'<h3 style="color:#dc2626">{len(violations)} Verstoesse</h3>')
for v in violations:
html.append(
f'<div style="padding:4px 0">'
f'<span style="color:#dc2626;font-weight:bold">&#10007;</span> '
f'{v.get("label","")}'
)
if v.get("hint"):
html.append(
f'<div style="font-size:11px;color:#dc2626;margin:2px 0 4px 20px;'
f'padding:4px 8px;background:#fef2f2;border-radius:4px;'
f'border-left:3px solid #fca5a5">{v["hint"]}</div>'
)
html.append('</div>')
if passes:
html.append(f'<h3 style="color:#22c55e">{len(passes)} Bestanden</h3>')
for p in passes:
html.append(f'<div style="padding:2px 0;color:#6b7280">'
f'<span style="color:#22c55e">&#10003;</span> {p.get("label","")}</div>')
html.append('</div>')
email_result = send_email(
recipient="dsb@breakpilot.local",
subject=f"[BANNER-CHECK] {provider}{req.url}",
body_html="\n".join(html),
)
result["email_status"] = email_result.get("status", "failed")
return result
except Exception as e:
return {"error": str(e)[:200]}
@router.post("/doc-check")
async def start_doc_check(req: DocCheckRequest):
"""Start async multi-URL document check."""
check_id = str(_uuid.uuid4())[:8]
_doc_check_jobs[check_id] = {"status": "running", "progress": "Pruefung gestartet...", "result": None, "error": ""}
asyncio.create_task(_run_doc_check(check_id, req))
return DocCheckStartResponse(check_id=check_id, status="running")
@router.get("/doc-check/{check_id}")
async def get_doc_check_status(check_id: str):
"""Poll document check status."""
job = _doc_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return DocCheckStatusResponse(
check_id=check_id, status=job["status"],
progress=job.get("progress", ""), result=job.get("result"),
error=job.get("error", ""),
)
async def _run_doc_check(check_id: str, req: DocCheckRequest):
"""Background task: check each document."""
try:
results: list[DocCheckResult] = []
total_findings = 0
for i, entry in enumerate(req.entries):
_doc_check_jobs[check_id]["progress"] = (
f"Dokument {i+1}/{len(req.entries)}: {entry.label}..."
)
doc_results = await _check_single_document(entry, use_agent=req.use_agent)
results.extend(doc_results)
total_findings += sum(r.findings_count for r in doc_results)
# Optional: Cookie banner check on first URL
cookie_result = None
if req.check_cookie_banner and req.entries:
_doc_check_jobs[check_id]["progress"] = "Cookie-Banner wird geprueft..."
cookie_result = await _check_cookie_banner(req.entries[0].url)
# Build email report
_doc_check_jobs[check_id]["progress"] = "Report wird erstellt..."
summary = _build_report(results, cookie_result)
email_result = send_email(
recipient=req.recipient,
subject=f"[DOKUMENTEN-PRUEFUNG] {len(results)} Dokumente geprueft",
body_html=summary,
)
response = DocCheckResponse(
results=results,
cookie_banner_result=cookie_result,
total_documents=len(results),
total_findings=total_findings,
email_status=email_result.get("status", "failed"),
checked_at=datetime.now(timezone.utc).isoformat(),
)
_doc_check_jobs[check_id]["status"] = "completed"
_doc_check_jobs[check_id]["result"] = response
_doc_check_jobs[check_id]["progress"] = "Fertig"
except Exception as e:
logger.error("Doc check %s failed: %s", check_id, e)
_doc_check_jobs[check_id]["status"] = "failed"
_doc_check_jobs[check_id]["error"] = str(e)[:500]
async def _check_single_document(entry: DocCheckEntry, use_agent: bool = False) -> list[DocCheckResult]:
"""Load a single URL, expand content, extract text, split into sections,
and check each section against its type-specific checklist.
Returns multiple results if the page contains sub-documents
(e.g. Cookies section, Social Media section on a DSI page).
"""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": entry.url, "max_documents": 5},
)
if resp.status_code != 200:
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=f"Seite nicht erreichbar (HTTP {resp.status_code})",
)]
data = resp.json()
docs = data.get("documents", [])
# For non-DSE doc types (impressum, agb, widerruf), prefer the
# self-extracted document (html_full_page) which is the text of
# the URL the user provided — not a linked document found by
# the discovery crawler.
doc_text = ""
word_count = 0
if entry.doc_type not in ("dse", "datenschutz", "privacy"):
# Prefer html_full_page (self-extracted from the actual URL)
for d in docs:
if d.get("doc_type") == "html_full_page":
doc_text = d.get("full_text", "") or d.get("text", "")
word_count = d.get("word_count", 0)
break
if not doc_text and docs:
doc_text = docs[0].get("full_text", "") or docs[0].get("text_preview", "")
word_count = docs[0].get("word_count", 0)
if not doc_text or len(doc_text) < 50:
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error="Kein Text extrahierbar",
)]
# Split text into sections and check each
sections = _split_into_sections(doc_text, entry.label, entry.url)
all_results: list[DocCheckResult] = []
# Main document check (full text against primary type)
main_result = await _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count)
# Master Control deep check — 1.874 doc_check_controls with
# binary pass/fail criteria verified by LLM (Qwen)
try:
from compliance.services.rag_document_checker import check_document_with_controls
use_agent_flag = use_agent or os.getenv("COMPLIANCE_USE_AGENT", "false").lower() == "true"
mc_results = await check_document_with_controls(
doc_text, entry.doc_type, entry.label,
max_controls=0, use_agent=use_agent_flag,
)
if mc_results:
# Add MC results as additional checks to the main result
for mc in mc_results:
main_result.checks.append(CheckItem(**mc))
# Recompute correctness with MC results
l2 = [c for c in main_result.checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2 if c.passed)
main_result.correctness_pct = round(l2_passed / len(l2) * 100) if l2 else 0
except Exception as e:
logger.warning("MC check skipped: %s", e)
all_results.append(main_result)
# Sub-section checks (auto-detected from headings)
# Pass full doc_text for LLM verification fallback
for section in sections:
if section["word_count"] < 100:
continue
sub_result = await _run_checklist(
section["text"], section["doc_type"],
section["title"], entry.url,
section["word_count"],
full_text=doc_text,
)
all_results.append(sub_result)
return all_results
except Exception as e:
logger.warning("Doc check failed for %s: %s", entry.url, e)
return [DocCheckResult(
label=entry.label, url=entry.url, doc_type=entry.doc_type,
error=str(e)[:200],
)]
async def _run_checklist(
text: str, doc_type: str, label: str, url: str,
word_count: int = 0, full_text: str = "",
) -> DocCheckResult:
"""Run checklist against text, then LLM-verify failed checks.
Args:
full_text: Optional full document text for LLM verification.
If empty, uses `text` (the section fragment).
"""
findings = check_document_completeness(text, doc_type, label, url)
all_checks: list[CheckItem] = []
completeness = 0
correctness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
for c in f.get("all_checks", []):
all_checks.append(CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
level=c.get("level", 1),
parent=c.get("parent"),
skipped=c.get("skipped", False),
hint=c.get("hint", ""),
))
completeness = f.get("completeness_pct", 0)
correctness = f.get("correctness_pct", 0)
# LLM verification: re-check regex FAILs to eliminate false positives
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
if failed:
try:
from compliance.services.doc_checks.llm_verify import verify_failed_checks
overturns = await verify_failed_checks(
full_text or text,
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
label,
)
for c in all_checks:
if c.id in overturns and overturns[c.id]["overturned"]:
c.passed = True
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
logger.info("LLM overturned: %s in %s", c.label, label)
# Recompute correctness after overturns
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2_active if c.passed)
if l2_active:
correctness = round(l2_passed / len(l2_active) * 100)
except Exception as e:
logger.warning("LLM verification skipped: %s", e)
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=label, url=url, doc_type=doc_type,
word_count=word_count or len(text.split()),
completeness_pct=completeness,
correctness_pct=correctness,
checks=all_checks, findings_count=len(non_score),
)
# Section heading patterns → document type mapping
# ONLY sections that are genuinely separate document types with their own checklists.
# Everything else (Social Media, Betroffenenrechte, Dienste von Drittanbietern)
# is part of the parent DSI and inherits its checks.
SECTION_TYPE_MAP = [
(r"^cookie", "cookie"),
(r"widerrufsrecht|widerrufsbelehrung", "widerruf"),
(r"^impressum$", "impressum"),
(r"^(?:agb|allgemeine geschäftsbedingungen|nutzungsbedingungen)$", "agb"),
# DSFA MUST be checked BEFORE social_media (both can contain "Social Media")
(r"datenschutzfolge|dsfa|risikoanalyse", "dsfa"),
(r"^social\s*media$|^soziale\s+(?:medien|netzwerke)$", "social_media"),
(r"datenschutzerkl(?:ae|ä)rung.*social|datenschutz\s+f(?:ue|ü)r\s+social", "social_media"),
(r"(?:verordnung|regulation)\s*\(?eu\)?\s*2018\s*/?\s*1725", "eu_institution"),
]
def _split_into_sections(text: str, parent_label: str, url: str) -> list[dict]:
"""Split document text at major headings into sub-sections.
Detects sections like 'Cookies', 'Social Media', 'Dienste von Drittanbietern'
and classifies each by document type for separate checking.
Deduplicates: if the same doc_type appears twice, texts are merged.
"""
import re as _re
sections: list[dict] = []
seen_types: dict[str, int] = {} # doc_type -> index in sections
lines = text.split("\n")
current_heading = ""
current_text: list[str] = []
def _save_section(heading: str, text_lines: list[str]) -> None:
sec_text = "\n".join(text_lines)
if len(sec_text.split()) < 100:
return
sec_type = _classify_section(heading)
if not sec_type:
return
# Merge duplicate doc_types (e.g. two "Social Media" headings)
if sec_type in seen_types:
idx = seen_types[sec_type]
sections[idx]["text"] += "\n\n" + sec_text
sections[idx]["word_count"] = len(sections[idx]["text"].split())
else:
seen_types[sec_type] = len(sections)
sections.append({
"title": f"{parent_label} > {heading}",
"text": sec_text,
"doc_type": sec_type,
"word_count": len(sec_text.split()),
})
for line in lines:
stripped = line.strip()
# Only split at headings that classify as a known document type.
# This prevents table content ("Funktionale Cookies", "Typen")
# from triggering section splits.
is_heading = (
5 < len(stripped) < 80
and not stripped.endswith(".")
and not stripped.endswith(",")
and (stripped[0].isupper() or stripped[0].isdigit())
)
classified = _classify_section(stripped) if is_heading else None
is_real_heading = is_heading and classified is not None
is_skip = is_real_heading and stripped.lower().strip() in SKIP_HEADINGS
if is_real_heading and not is_skip and current_heading:
_save_section(current_heading, current_text)
if is_real_heading and not is_skip:
current_heading = stripped
current_text = []
else:
current_text.append(line)
# Last section
if current_heading:
_save_section(current_heading, current_text)
return sections
# Headings to skip — sub-sections of other documents, not standalone
SKIP_HEADINGS = {
"nutzungskonzept social media", # Internal concept, no legal checklist
"risikoabwägung und datenschutzfolgenabschätzung", # Sub-section of DSFA
"risikoabwaegung und datenschutzfolgenabschaetzung",
}
# Track already-seen section types to avoid duplicate sub-documents
# (e.g. two "Social Media" headings on the same page)
_DEDUP_TYPES = {"social_media", "cookie", "dsfa", "widerruf", "impressum"}
def _classify_section(heading: str) -> str | None:
"""Classify a section heading into a document type."""
import re as _re
heading_lower = heading.lower().strip()
# Strip leading numbers/bullets: "5. Soziale Medien" → "soziale medien"
heading_lower = _re.sub(r"^[\d\.\)\-]+\s*", "", heading_lower).strip()
# Skip known sub-sections
if heading_lower in SKIP_HEADINGS:
return None
for pattern, doc_type in SECTION_TYPE_MAP:
if _re.search(pattern, heading_lower):
return doc_type
return None
async def _check_cookie_banner(url: str) -> dict | None:
"""Run cookie banner consent test on a URL."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={"url": url, "timeout_per_phase": 8},
)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning("Cookie banner check failed: %s", e)
return None
def _build_report(results: list[DocCheckResult], cookie_result: dict | None) -> str:
from .agent_doc_check_report import build_html_report
return build_html_report(results, cookie_result)