feat: Unified Compliance-Check — 8 document types in one form

New 3-tab structure: Website-Scan, Compliance-Check, Banner-Check.

Compliance-Check Tab (replaces Dokumenten-Pruefung + Impressum-Check):
- 8 document rows: DSI, Impressum, Social Media, Cookie, AGB,
  Nutzungsbedingungen, Widerruf, DSB-Kontakt
- Each row: URL input + "Text laden" + file upload + manual text
- "Text laden" extracts via consent-tester, shows in editable textarea
- User verifies/corrects text before checking
- Empty fields = "not present" → own finding

Business Profiler (business_profiler.py):
- Detects B2B/B2C/B2G from all documents together
- Recognizes regulated professions, online shops, editorial content
- Context-aware: INFO checks become PASS/FAIL based on profile

Backend: /compliance-check + /extract-text endpoints
Frontend: ComplianceCheckTab.tsx + DocumentRow.tsx
API proxies: compliance-check/route.ts + extract-text/route.ts

Also: Impressum regex fixes (Telefon, AG, Geschaeftsfuehrung)
and INFO severity for context-dependent checks.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-11 20:56:10 +02:00
parent b214cbc003
commit 0d0e705117
8 changed files with 1252 additions and 8 deletions
@@ -0,0 +1,439 @@
"""
Unified Compliance Check Routes — check all documents in one request.
POST /compliance/agent/extract-text — extract text from a URL
POST /compliance/agent/compliance-check — unified check for all documents
GET /compliance/agent/compliance-check/{check_id} — poll status
"""
import asyncio
import logging
import os
import uuid as _uuid
from dataclasses import asdict
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
# In-memory job store (same pattern as doc-check)
_compliance_check_jobs: dict[str, dict] = {}
# ── Models ───────────────────────────────────────────────────────────
class ExtractTextRequest(BaseModel):
url: str
class DocumentInput(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, avv, loeschkonzept, etc.
url: str = ""
text: str = "" # text has priority over URL
class ComplianceCheckRequest(BaseModel):
documents: list[DocumentInput]
use_agent: bool = False
recipient: str = "dsb@breakpilot.local"
class ComplianceCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class ComplianceCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
result: dict | None = None
error: str = ""
# ── Extract text endpoint ────────────────────────────────────────────
@router.post("/extract-text")
async def extract_text(req: ExtractTextRequest):
"""Extract text from a URL via consent-tester DSI discovery."""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": req.url, "max_documents": 1},
)
if resp.status_code != 200:
return {
"text": "", "word_count": 0, "title": "",
"error": f"HTTP {resp.status_code} von Consent-Tester",
}
data = resp.json()
docs = data.get("documents", [])
if not docs:
return {
"text": "", "word_count": 0, "title": "",
"error": "Kein Text extrahierbar",
}
doc = docs[0]
text = doc.get("full_text", "") or doc.get("text_preview", "") or doc.get("text", "")
title = doc.get("title", "") or doc.get("doc_type", "")
word_count = doc.get("word_count", 0) or len(text.split())
return {
"text": text,
"word_count": word_count,
"title": title,
"error": "",
}
except Exception as e:
logger.warning("extract-text failed for %s: %s", req.url, e)
return {
"text": "", "word_count": 0, "title": "",
"error": str(e)[:200],
}
# ── Unified compliance check ────────────────────────────────────────
@router.post("/compliance-check")
async def start_compliance_check(req: ComplianceCheckRequest):
"""Start async compliance check for all documents."""
check_id = str(_uuid.uuid4())[:8]
_compliance_check_jobs[check_id] = {
"status": "running",
"progress": "Pruefung gestartet...",
"result": None,
"error": "",
}
asyncio.create_task(_run_compliance_check(check_id, req))
return ComplianceCheckStartResponse(check_id=check_id, status="running")
@router.get("/compliance-check/{check_id}")
async def get_compliance_check_status(check_id: str):
"""Poll compliance check status."""
job = _compliance_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return ComplianceCheckStatusResponse(
check_id=check_id,
status=job["status"],
progress=job.get("progress", ""),
result=job.get("result"),
error=job.get("error", ""),
)
async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
"""Background task: check all documents with business-profile context."""
try:
from compliance.services.business_profiler import detect_business_profile
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
from .agent_doc_check_report import build_html_report
# Step 1: Resolve texts (fetch from URL if needed)
_update(check_id, "Texte werden geladen...")
doc_texts: dict[str, str] = {}
doc_entries: list[dict] = []
for i, doc in enumerate(req.documents):
_update(check_id, f"Dokument {i+1}/{len(req.documents)}: {doc.doc_type}...")
text = doc.text
if not text and doc.url:
text = await _fetch_text(doc.url)
if text:
doc_texts[doc.doc_type] = text
doc_entries.append({
"doc_type": doc.doc_type,
"url": doc.url,
"text": text,
"word_count": len(text.split()) if text else 0,
})
# Step 2: Detect business profile
_update(check_id, "Geschaeftsmodell wird erkannt...")
profile = await detect_business_profile(doc_texts)
profile_dict = asdict(profile)
# Step 3: Check each document
results: list[DocCheckResult] = []
total_findings = 0
use_agent_flag = req.use_agent or os.getenv(
"COMPLIANCE_USE_AGENT", "false"
).lower() == "true"
for i, entry in enumerate(doc_entries):
text = entry["text"]
doc_type = entry["doc_type"]
label = _doc_type_label(doc_type)
url = entry["url"]
_update(check_id, f"Pruefe {label} ({i+1}/{len(doc_entries)})...")
if not text or len(text) < 50:
results.append(DocCheckResult(
label=label, url=url, doc_type=doc_type,
error="Kein Text vorhanden oder zu kurz",
))
continue
result = await _check_single(
text, doc_type, label, url,
entry["word_count"], use_agent_flag,
)
# Apply profile context filter
result = _apply_profile_filter(result, profile, doc_type)
results.append(result)
total_findings += result.findings_count
# Step 4: Build report
_update(check_id, "Report wird erstellt...")
report_html = build_html_report(results, None)
# Prepend profile summary to report
profile_html = _build_profile_html(profile)
full_html = profile_html + report_html
# Step 5: Send email
doc_count = len([r for r in results if not r.error])
email_result = send_email(
recipient=req.recipient,
subject=f"[COMPLIANCE-CHECK] {doc_count} Dokumente geprueft",
body_html=full_html,
)
# Step 6: Store result
response = {
"results": [_result_to_dict(r) for r in results],
"business_profile": profile_dict,
"total_documents": len(results),
"total_findings": total_findings,
"email_status": email_result.get("status", "failed"),
"checked_at": datetime.now(timezone.utc).isoformat(),
}
_compliance_check_jobs[check_id]["status"] = "completed"
_compliance_check_jobs[check_id]["result"] = response
_compliance_check_jobs[check_id]["progress"] = "Fertig"
except Exception as e:
logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True)
_compliance_check_jobs[check_id]["status"] = "failed"
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
def _update(check_id: str, msg: str):
_compliance_check_jobs[check_id]["progress"] = msg
async def _fetch_text(url: str) -> str:
"""Fetch text from URL via consent-tester."""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": url, "max_documents": 1},
)
if resp.status_code != 200:
return ""
docs = resp.json().get("documents", [])
if not docs:
return ""
doc = docs[0]
return doc.get("full_text", "") or doc.get("text_preview", "") or ""
except Exception as e:
logger.warning("Text fetch failed for %s: %s", url, e)
return ""
async def _check_single(
text: str, doc_type: str, label: str, url: str,
word_count: int, use_agent: bool,
):
"""Run regex + MC checks on a single document."""
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
# Regex checklist
findings = check_document_completeness(text, doc_type, label, url)
all_checks: list[CheckItem] = []
completeness = 0
correctness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
for c in f.get("all_checks", []):
all_checks.append(CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
level=c.get("level", 1), parent=c.get("parent"),
skipped=c.get("skipped", False), hint=c.get("hint", ""),
))
completeness = f.get("completeness_pct", 0)
correctness = f.get("correctness_pct", 0)
# Master Control checks
try:
mc_results = await check_document_with_controls(
text, doc_type, label, max_controls=0, use_agent=use_agent,
)
if mc_results:
for mc in mc_results:
all_checks.append(CheckItem(**mc))
l2 = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2 if c.passed)
correctness = round(l2_passed / len(l2) * 100) if l2 else 0
except Exception as e:
logger.warning("MC check skipped for %s: %s", label, e)
# LLM verification of regex fails
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
if failed:
try:
from compliance.services.doc_checks.llm_verify import verify_failed_checks
overturns = await verify_failed_checks(
text,
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
label,
)
for c in all_checks:
if c.id in overturns and overturns[c.id]["overturned"]:
c.passed = True
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2_active if c.passed)
if l2_active:
correctness = round(l2_passed / len(l2_active) * 100)
except Exception as e:
logger.warning("LLM verification skipped: %s", e)
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=label, url=url, doc_type=doc_type,
word_count=word_count or len(text.split()),
completeness_pct=completeness, correctness_pct=correctness,
checks=all_checks, findings_count=len(non_score),
)
def _apply_profile_filter(result, profile, doc_type: str):
"""Adjust INFO-level checks based on business profile context.
For example: ODR check only relevant for B2C online shops.
"""
from .agent_doc_check_routes import CheckItem
for check in result.checks:
cid = check.id.lower()
# ODR/OS-Link only relevant for B2C online shops
if "odr" in cid or "os-link" in cid or "streitbeilegung" in check.label.lower():
if not profile.needs_odr:
check.skipped = True
check.hint = "Nicht relevant (kein B2C Online-Shop)"
# Widerruf only relevant for B2C
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
if check.severity == "INFO":
check.skipped = True
# Regulated profession: check for Kammer info
if "kammer" in cid or "berufsordnung" in check.label.lower():
if not profile.is_regulated_profession:
check.skipped = True
check.hint = "Nicht relevant (kein regulierter Beruf)"
return result
# ── Helpers ──────────────────────────────────────────────────────────
_DOC_TYPE_LABELS = {
"dse": "Datenschutzerklaerung",
"datenschutz": "Datenschutzerklaerung",
"privacy": "Datenschutzerklaerung",
"impressum": "Impressum",
"agb": "AGB",
"widerruf": "Widerrufsbelehrung",
"cookie": "Cookie-Richtlinie",
"avv": "Auftragsverarbeitung",
"loeschkonzept": "Loeschkonzept",
"dsfa": "Datenschutz-Folgenabschaetzung",
"social_media": "Social Media Datenschutz",
}
def _doc_type_label(doc_type: str) -> str:
return _DOC_TYPE_LABELS.get(doc_type, doc_type.upper())
def _result_to_dict(r) -> dict:
"""Convert DocCheckResult to JSON-serializable dict."""
return {
"label": r.label, "url": r.url, "doc_type": r.doc_type,
"word_count": r.word_count, "completeness_pct": r.completeness_pct,
"correctness_pct": r.correctness_pct,
"checks": [
{
"id": c.id, "label": c.label, "passed": c.passed,
"severity": c.severity, "matched_text": c.matched_text,
"level": c.level, "parent": c.parent,
"skipped": c.skipped, "hint": c.hint,
}
for c in r.checks
],
"findings_count": r.findings_count, "error": r.error,
}
def _build_profile_html(profile) -> str:
"""Build a small HTML block summarizing the detected business profile."""
service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt"
flags = []
if profile.has_online_shop:
flags.append("Online-Shop")
if profile.has_editorial_content:
flags.append("Redaktionelle Inhalte")
if profile.is_regulated_profession:
flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})")
if profile.needs_odr:
flags.append("ODR-pflichtig")
flags_str = ", ".join(flags) or "keine"
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:700px;margin:0 auto 16px;padding:12px 16px;'
'background:#f0f9ff;border:1px solid #bae6fd;border-radius:8px">'
'<h3 style="margin:0 0 8px;font-size:14px;color:#0369a1">'
'Erkanntes Geschaeftsmodell</h3>'
'<table style="font-size:13px;color:#374151">'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Typ:</td>'
f'<td><strong>{profile.business_type.upper()}</strong>'
f' ({profile.industry})</td></tr>'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Merkmale:</td>'
f'<td>{flags_str}</td></tr>'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Dienste:</td>'
f'<td>{service_tags}</td></tr>'
f'<tr><td style="padding:2px 12px 2px 0;color:#6b7280">Konfidenz:</td>'
f'<td>{int(profile.confidence * 100)}%</td></tr>'
'</table></div>'
)
@@ -0,0 +1,223 @@
"""
Business Profiler — detect business model from document texts.
Pure keyword-based detection (deterministic, no LLM). Analyzes
DSE, Impressum, AGB, Widerruf etc. together to build a profile
that drives context-aware compliance checks.
Example:
profile = await detect_business_profile({"dse": "...", "impressum": "..."})
profile.business_type # "b2c"
profile.has_online_shop # True
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
@dataclass
class BusinessProfile:
business_type: str = "unknown" # b2b, b2c, b2g, nonprofit, unknown
industry: str = "unknown" # it_services, retail, healthcare, legal, craft, public, unknown
has_online_shop: bool = False
has_editorial_content: bool = False
is_regulated_profession: bool = False
regulated_profession_type: str = "" # arzt, anwalt, steuerberater, architekt, ""
needs_odr: bool = False # Online-Streitbeilegung
detected_services: list[str] = field(default_factory=list)
confidence: float = 0.0
# ── Keyword lists ────────────────────────────────────────────────────
_B2C_KEYWORDS = [
"verbraucher", "warenkorb", "bestellung", "lieferung", "widerruf",
"shop", "kaufpreis", "rueckgabe", "rückgabe", "endkunde", "kaeufer",
"käufer", "privatkunde", "zahlungspflichtig bestellen",
]
_B2B_KEYWORDS = [
"unternehmen", "geschaeftskunden", "geschäftskunden", "gewerblich",
"auftrag", "auftraggeber", "auftragnehmer", "geschaeftspartner",
"geschäftspartner", "firmenkunde", "b2b",
]
_B2G_KEYWORDS = [
"behoerde", "behörde", "koerperschaft", "körperschaft", "oeffentlich",
"öffentlich", "gemeinde", "amt", "stadtverwaltung", "landesbehoerde",
"landesbehörde", "kommunal",
]
_NONPROFIT_KEYWORDS = [
"gemeinnuetzig", "gemeinnützig", "verein", "stiftung", "e.v.",
"spende", "ehrenamtlich", "satzung",
]
_REGULATED_PROFESSIONS = {
"rechtsanwalt": "anwalt",
"anwalt": "anwalt",
"anwaeltin": "anwalt",
"anwältin": "anwalt",
"kanzlei": "anwalt",
"rechtsanwaltskammer": "anwalt",
"arzt": "arzt",
"ärztin": "arzt",
"aerztin": "arzt",
"praxis": "arzt",
"aerztekammer": "arzt",
"ärztekammer": "arzt",
"steuerberater": "steuerberater",
"steuerberaterin": "steuerberater",
"steuerberaterkammer": "steuerberater",
"architekt": "architekt",
"architektin": "architekt",
"architektenkammer": "architekt",
"notar": "notar",
"notariat": "notar",
"apotheke": "apotheker",
"apotheker": "apotheker",
}
_ONLINE_SHOP_KEYWORDS = [
"warenkorb", "checkout", "bestellung", "lieferung", "versand",
"paypal", "kreditkarte", "klarna", "sofortueberweisung",
"sofortüberweisung", "zahlungsarten", "versandkosten",
"lieferzeit", "retour", "paketdienst",
]
_EDITORIAL_KEYWORDS = [
"blog", "ratgeber", "news", "redaktion", "artikel", "magazin",
"beitrag", "kommentar", "podcast", "newsletter", "autor",
]
_INDUSTRY_KEYWORDS = {
"it_services": ["software", "saas", "cloud", "hosting", "server", "api", "app"],
"retail": ["shop", "warenkorb", "versand", "lieferung", "einzelhandel"],
"healthcare": ["arzt", "praxis", "patient", "gesundheit", "therapie", "klinik"],
"legal": ["kanzlei", "rechtsanwalt", "mandant", "anwalt"],
"craft": ["handwerk", "meister", "werkstatt", "montage", "gewerk"],
"public": ["behoerde", "behörde", "kommune", "verwaltung", "buerger", "bürger"],
"finance": ["bank", "versicherung", "finanz", "kredit", "anlage"],
"education": ["schule", "bildung", "unterricht", "lehrplan", "schueler", "schüler"],
}
_TRACKING_SERVICES = {
"google analytics": "Google Analytics",
"google tag manager": "Google Tag Manager",
"matomo": "Matomo",
"facebook pixel": "Facebook Pixel",
"meta pixel": "Meta Pixel",
"hotjar": "Hotjar",
"hubspot": "HubSpot",
"mailchimp": "Mailchimp",
"linkedin insight": "LinkedIn Insight",
"google ads": "Google Ads",
"google adsense": "Google AdSense",
"google maps": "Google Maps",
"youtube": "YouTube",
"vimeo": "Vimeo",
"cloudflare": "Cloudflare",
"sentry": "Sentry",
"intercom": "Intercom",
"zendesk": "Zendesk",
"stripe": "Stripe",
"paypal": "PayPal",
}
# ── Detection logic ──────────────────────────────────────────────────
def _count_hits(text: str, keywords: list[str]) -> int:
return sum(1 for kw in keywords if kw in text)
async def detect_business_profile(documents: dict[str, str]) -> BusinessProfile:
"""Analyze all document texts together to detect business model.
Args:
documents: dict mapping doc_type -> text (e.g. {"dse": "...", "impressum": "..."})
"""
profile = BusinessProfile()
if not documents:
return profile
# Merge all texts for keyword search
full_text = "\n".join(documents.values()).lower()
full_text = full_text.replace("\xad", "") # strip soft hyphens
# ── Tracking services ────────────────────────────────────────
for pattern, label in _TRACKING_SERVICES.items():
if pattern in full_text:
profile.detected_services.append(label)
# ── Online shop ──────────────────────────────────────────────
shop_hits = _count_hits(full_text, _ONLINE_SHOP_KEYWORDS)
profile.has_online_shop = shop_hits >= 3
# ── Editorial content ────────────────────────────────────────
editorial_hits = _count_hits(full_text, _EDITORIAL_KEYWORDS)
profile.has_editorial_content = editorial_hits >= 2
# ── Regulated profession ─────────────────────────────────────
for keyword, prof_type in _REGULATED_PROFESSIONS.items():
if keyword in full_text:
profile.is_regulated_profession = True
profile.regulated_profession_type = prof_type
break
# ── Business type ────────────────────────────────────────────
b2c_score = _count_hits(full_text, _B2C_KEYWORDS)
b2b_score = _count_hits(full_text, _B2B_KEYWORDS)
b2g_score = _count_hits(full_text, _B2G_KEYWORDS)
nonprofit_score = _count_hits(full_text, _NONPROFIT_KEYWORDS)
# Missing documents as signal
has_agb = "agb" in documents
has_widerruf = "widerruf" in documents
if not has_agb:
b2c_score -= 1 # No AGB → less likely B2C
if not has_widerruf:
b2c_score -= 1 # No Widerruf → less likely B2C shop
if profile.has_online_shop:
b2c_score += 3 # Strong B2C signal
scores = {
"b2c": b2c_score,
"b2b": b2b_score,
"b2g": b2g_score,
"nonprofit": nonprofit_score,
}
best = max(scores, key=scores.get) # type: ignore[arg-type]
best_val = scores[best]
if best_val >= 2:
profile.business_type = best
total = sum(max(0, v) for v in scores.values())
profile.confidence = round(best_val / total, 2) if total > 0 else 0.5
else:
profile.business_type = "unknown"
profile.confidence = 0.2
# ── ODR (Online-Streitbeilegung) ─────────────────────────────
# Required for B2C with online shop (EU Regulation 524/2013)
profile.needs_odr = (
profile.business_type == "b2c" and profile.has_online_shop
)
# ── Industry ─────────────────────────────────────────────────
industry_scores: dict[str, int] = {}
for industry, keywords in _INDUSTRY_KEYWORDS.items():
hits = _count_hits(full_text, keywords)
if hits >= 2:
industry_scores[industry] = hits
if industry_scores:
profile.industry = max(industry_scores, key=industry_scores.get) # type: ignore[arg-type]
elif profile.is_regulated_profession:
prof_map = {"anwalt": "legal", "arzt": "healthcare",
"steuerberater": "finance", "architekt": "craft"}
profile.industry = prof_map.get(profile.regulated_profession_type, "unknown")
return profile
+4
View File
@@ -48,6 +48,8 @@ from compliance.api.agent_scan_routes import router as agent_scan_router
from compliance.api.agent_history_routes import router as agent_history_router
from compliance.api.agent_recurring_routes import router as agent_recurring_router
from compliance.api.agent_compare_routes import router as agent_compare_router
from compliance.api.agent_doc_check_routes import router as agent_doc_check_router
from compliance.api.agent_compliance_check_routes import router as agent_compliance_check_router
# Middleware
from middleware import (
@@ -150,6 +152,8 @@ app.include_router(agent_scan_router, prefix="/api")
app.include_router(agent_history_router, prefix="/api")
app.include_router(agent_recurring_router, prefix="/api")
app.include_router(agent_compare_router, prefix="/api")
app.include_router(agent_doc_check_router, prefix="/api")
app.include_router(agent_compliance_check_router, prefix="/api")
if __name__ == "__main__":