Files
breakpilot-compliance/backend-compliance/compliance/api/agent_compliance_check_routes.py
T
Benjamin Admin b6ad958b69
Build + Deploy / build-admin-compliance (push) Successful in 1m57s
Build + Deploy / build-backend-compliance (push) Successful in 3m20s
Build + Deploy / build-ai-sdk (push) Successful in 48s
Build + Deploy / build-developer-portal (push) Successful in 1m6s
Build + Deploy / build-tts (push) Successful in 1m43s
Build + Deploy / build-document-crawler (push) Successful in 44s
Build + Deploy / build-dsms-gateway (push) Successful in 31s
Build + Deploy / build-dsms-node (push) Successful in 18s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 16s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m40s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 47s
CI / test-python-backend (push) Successful in 38s
CI / test-python-document-crawler (push) Successful in 28s
CI / test-python-dsms-gateway (push) Successful in 20s
CI / validate-canonical-controls (push) Successful in 14s
Build + Deploy / trigger-orca (push) Successful in 3m26s
feat(compliance-check): integrate banner cross-check + extract to module
Add automatic banner check (Step 3b) and banner-vs-cookie cross-check
(Step 3c) to unified compliance check. Extract cross-check logic to
banner_cookie_cross_check.py to keep routes under 500 LOC.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-12 00:08:47 +02:00

487 lines
18 KiB
Python

"""
Unified Compliance Check Routes — check all documents in one request.
POST /compliance/agent/extract-text — extract text from a URL
POST /compliance/agent/compliance-check — unified check for all documents
GET /compliance/agent/compliance-check/{check_id} — poll status
"""
import asyncio
import logging
import os
import uuid as _uuid
from dataclasses import asdict
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
# In-memory job store (same pattern as doc-check)
_compliance_check_jobs: dict[str, dict] = {}
# ── Models ───────────────────────────────────────────────────────────
class ExtractTextRequest(BaseModel):
url: str
class DocumentInput(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, avv, loeschkonzept, etc.
url: str = ""
text: str = "" # text has priority over URL
class ComplianceCheckRequest(BaseModel):
documents: list[DocumentInput]
use_agent: bool = False
recipient: str = "dsb@breakpilot.local"
class ComplianceCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class ComplianceCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
result: dict | None = None
error: str = ""
# ── Extract text endpoint ────────────────────────────────────────────
@router.post("/extract-text")
async def extract_text(req: ExtractTextRequest):
"""Extract text from a URL via consent-tester DSI discovery."""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": req.url, "max_documents": 1},
)
if resp.status_code != 200:
return {
"text": "", "word_count": 0, "title": "",
"error": f"HTTP {resp.status_code} von Consent-Tester",
}
data = resp.json()
docs = data.get("documents", [])
if not docs:
return {
"text": "", "word_count": 0, "title": "",
"error": "Kein Text extrahierbar",
}
doc = docs[0]
text = doc.get("full_text", "") or doc.get("text_preview", "") or doc.get("text", "")
title = doc.get("title", "") or doc.get("doc_type", "")
word_count = doc.get("word_count", 0) or len(text.split())
return {
"text": text,
"word_count": word_count,
"title": title,
"error": "",
}
except Exception as e:
logger.warning("extract-text failed for %s: %s", req.url, e)
return {
"text": "", "word_count": 0, "title": "",
"error": str(e)[:200],
}
# ── Unified compliance check ────────────────────────────────────────
@router.post("/compliance-check")
async def start_compliance_check(req: ComplianceCheckRequest):
"""Start async compliance check for all documents."""
check_id = str(_uuid.uuid4())[:8]
_compliance_check_jobs[check_id] = {
"status": "running",
"progress": "Pruefung gestartet...",
"result": None,
"error": "",
}
asyncio.create_task(_run_compliance_check(check_id, req))
return ComplianceCheckStartResponse(check_id=check_id, status="running")
@router.get("/compliance-check/{check_id}")
async def get_compliance_check_status(check_id: str):
"""Poll compliance check status."""
job = _compliance_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return ComplianceCheckStatusResponse(
check_id=check_id,
status=job["status"],
progress=job.get("progress", ""),
result=job.get("result"),
error=job.get("error", ""),
)
async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
"""Background task: check all documents with business-profile context."""
try:
from compliance.services.business_profiler import detect_business_profile
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
from .agent_doc_check_report import build_html_report
# Step 1: Resolve texts (fetch from URL if needed)
_update(check_id, "Texte werden geladen...")
doc_texts: dict[str, str] = {}
doc_entries: list[dict] = []
for i, doc in enumerate(req.documents):
_update(check_id, f"Dokument {i+1}/{len(req.documents)}: {doc.doc_type}...")
text = doc.text
if not text and doc.url:
text = await _fetch_text(doc.url)
if text:
doc_texts[doc.doc_type] = text
doc_entries.append({
"doc_type": doc.doc_type,
"url": doc.url,
"text": text,
"word_count": len(text.split()) if text else 0,
})
# Step 2: Detect business profile
_update(check_id, "Geschaeftsmodell wird erkannt...")
profile = await detect_business_profile(doc_texts)
profile_dict = asdict(profile)
# Step 3: Check each document
results: list[DocCheckResult] = []
total_findings = 0
use_agent_flag = req.use_agent or os.getenv(
"COMPLIANCE_USE_AGENT", "false"
).lower() == "true"
for i, entry in enumerate(doc_entries):
text = entry["text"]
doc_type = entry["doc_type"]
label = _doc_type_label(doc_type)
url = entry["url"]
_update(check_id, f"Pruefe {label} ({i+1}/{len(doc_entries)})...")
if not text or len(text) < 50:
results.append(DocCheckResult(
label=label, url=url, doc_type=doc_type,
error="Kein Text vorhanden oder zu kurz",
))
continue
result = await _check_single(
text, doc_type, label, url,
entry["word_count"], use_agent_flag,
)
# Apply profile context filter
result = _apply_profile_filter(result, profile, doc_type)
results.append(result)
total_findings += result.findings_count
# Step 3b: Banner-Check (automatic, uses first URL or homepage)
banner_result = None
banner_url = req.documents[0].url if req.documents and req.documents[0].url else ""
# Use the homepage (strip path) for banner check
if banner_url:
from urllib.parse import urlparse
parsed = urlparse(banner_url)
banner_url = f"{parsed.scheme}://{parsed.netloc}"
if banner_url:
_update(check_id, "Cookie-Banner wird geprueft...")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={"url": banner_url, "timeout_per_phase": 10},
)
if resp.status_code == 200:
banner_result = resp.json()
except Exception as e:
logger.warning("Banner check failed: %s", e)
# Step 3c: Cross-check Banner vs Cookie-Richtlinie
if banner_result and "cookie" in doc_texts:
_update(check_id, "Banner vs. Cookie-Richtlinie abgleichen...")
cross_findings = _cross_check_banner_vs_cookie(
banner_result, doc_texts["cookie"],
)
if cross_findings:
# Add cross-check findings to cookie results
for r in results:
if r.doc_type == "cookie":
for cf in cross_findings:
r.checks.append(CheckItem(**cf))
# Recompute
l2 = [c for c in r.checks if c.level == 2 and not c.skipped]
l2p = sum(1 for c in l2 if c.passed)
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
# Step 4: Build report
_update(check_id, "Report wird erstellt...")
report_html = build_html_report(results, None)
# Prepend profile summary to report
profile_html = _build_profile_html(profile)
full_html = profile_html + report_html
# Step 5: Send email
doc_count = len([r for r in results if not r.error])
email_result = send_email(
recipient=req.recipient,
subject=f"[COMPLIANCE-CHECK] {doc_count} Dokumente geprueft",
body_html=full_html,
)
# Step 6: Store result
response = {
"results": [_result_to_dict(r) for r in results],
"business_profile": profile_dict,
"banner_result": {
"detected": banner_result.get("banner_detected", False) if banner_result else False,
"provider": banner_result.get("banner_provider", "") if banner_result else "",
"violations": len(banner_result.get("banner_checks", {}).get("violations", [])) if banner_result else 0,
} if banner_result else None,
"total_documents": len(results),
"total_findings": total_findings,
"email_status": email_result.get("status", "failed"),
"checked_at": datetime.now(timezone.utc).isoformat(),
}
_compliance_check_jobs[check_id]["status"] = "completed"
_compliance_check_jobs[check_id]["result"] = response
_compliance_check_jobs[check_id]["progress"] = "Fertig"
except Exception as e:
logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True)
_compliance_check_jobs[check_id]["status"] = "failed"
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
def _update(check_id: str, msg: str):
_compliance_check_jobs[check_id]["progress"] = msg
async def _fetch_text(url: str) -> str:
"""Fetch text from URL via consent-tester."""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": url, "max_documents": 1},
)
if resp.status_code != 200:
return ""
docs = resp.json().get("documents", [])
if not docs:
return ""
doc = docs[0]
return doc.get("full_text", "") or doc.get("text_preview", "") or ""
except Exception as e:
logger.warning("Text fetch failed for %s: %s", url, e)
return ""
async def _check_single(
text: str, doc_type: str, label: str, url: str,
word_count: int, use_agent: bool,
):
"""Run regex + MC checks on a single document."""
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
# Regex checklist
findings = check_document_completeness(text, doc_type, label, url)
all_checks: list[CheckItem] = []
completeness = 0
correctness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
for c in f.get("all_checks", []):
all_checks.append(CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
level=c.get("level", 1), parent=c.get("parent"),
skipped=c.get("skipped", False), hint=c.get("hint", ""),
))
completeness = f.get("completeness_pct", 0)
correctness = f.get("correctness_pct", 0)
# Master Control checks
try:
mc_results = await check_document_with_controls(
text, doc_type, label, max_controls=0, use_agent=use_agent,
)
if mc_results:
for mc in mc_results:
all_checks.append(CheckItem(**mc))
l2 = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2 if c.passed)
correctness = round(l2_passed / len(l2) * 100) if l2 else 0
except Exception as e:
logger.warning("MC check skipped for %s: %s", label, e)
# LLM verification of regex fails
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
if failed:
try:
from compliance.services.doc_checks.llm_verify import verify_failed_checks
overturns = await verify_failed_checks(
text,
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
label,
)
for c in all_checks:
if c.id in overturns and overturns[c.id]["overturned"]:
c.passed = True
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2_active if c.passed)
if l2_active:
correctness = round(l2_passed / len(l2_active) * 100)
except Exception as e:
logger.warning("LLM verification skipped: %s", e)
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=label, url=url, doc_type=doc_type,
word_count=word_count or len(text.split()),
completeness_pct=completeness, correctness_pct=correctness,
checks=all_checks, findings_count=len(non_score),
)
def _apply_profile_filter(result, profile, doc_type: str):
"""Adjust INFO-level checks based on business profile context.
For example: ODR check only relevant for B2C online shops.
"""
from .agent_doc_check_routes import CheckItem
for check in result.checks:
cid = check.id.lower()
# ODR/OS-Link only relevant for B2C online shops
if "odr" in cid or "os-link" in cid or "streitbeilegung" in check.label.lower():
if not profile.needs_odr:
check.skipped = True
check.hint = "Nicht relevant (kein B2C Online-Shop)"
# Widerruf only relevant for B2C
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
if check.severity == "INFO":
check.skipped = True
# Regulated profession: check for Kammer info
if "kammer" in cid or "berufsordnung" in check.label.lower():
if not profile.is_regulated_profession:
check.skipped = True
check.hint = "Nicht relevant (kein regulierter Beruf)"
return result
# ── Helpers ──────────────────────────────────────────────────────────
_DOC_TYPE_LABELS = {
"dse": "Datenschutzerklaerung",
"datenschutz": "Datenschutzerklaerung",
"privacy": "Datenschutzerklaerung",
"impressum": "Impressum",
"agb": "AGB",
"widerruf": "Widerrufsbelehrung",
"cookie": "Cookie-Richtlinie",
"avv": "Auftragsverarbeitung",
"loeschkonzept": "Loeschkonzept",
"dsfa": "Datenschutz-Folgenabschaetzung",
"social_media": "Social Media Datenschutz",
}
def _doc_type_label(doc_type: str) -> str:
return _DOC_TYPE_LABELS.get(doc_type, doc_type.upper())
def _result_to_dict(r) -> dict:
"""Convert DocCheckResult to JSON-serializable dict."""
return {
"label": r.label, "url": r.url, "doc_type": r.doc_type,
"word_count": r.word_count, "completeness_pct": r.completeness_pct,
"correctness_pct": r.correctness_pct,
"checks": [
{
"id": c.id, "label": c.label, "passed": c.passed,
"severity": c.severity, "matched_text": c.matched_text,
"level": c.level, "parent": c.parent,
"skipped": c.skipped, "hint": c.hint,
}
for c in r.checks
],
"findings_count": r.findings_count, "error": r.error,
}
def _build_profile_html(profile) -> str:
"""Build a small HTML block summarizing the detected business profile."""
service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt"
flags = []
if profile.has_online_shop:
flags.append("Online-Shop")
if profile.has_editorial_content:
flags.append("Redaktionelle Inhalte")
if profile.is_regulated_profession:
flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})")
if profile.needs_odr:
flags.append("ODR-pflichtig")
flags_str = ", ".join(flags) or "keine"
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:700px;margin:0 auto 16px;padding:12px 16px;'
'background:#f0f9ff;border:1px solid #bae6fd;border-radius:8px">'
'<h3 style="margin:0 0 8px;font-size:14px;color:#0369a1">'
'Erkanntes Geschaeftsmodell</h3>'
'<table style="font-size:13px;color:#374151">'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Typ:</td>'
f'<td><strong>{profile.business_type.upper()}</strong>'
f' ({profile.industry})</td></tr>'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Merkmale:</td>'
f'<td>{flags_str}</td></tr>'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Dienste:</td>'
f'<td>{service_tags}</td></tr>'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Konfidenz:</td>'
f'<td>{int(profile.confidence * 100)}%</td></tr>'
'</table></div>'
)
# Cross-check extracted to compliance.services.banner_cookie_cross_check
from compliance.services.banner_cookie_cross_check import cross_check_banner_vs_cookie as _cross_check_banner_vs_cookie