feat(agent): progress_pct + 6 BMW-Run Verbesserungen

Backend (agent_compliance_check_routes.py):
- progress_pct (0-100%) im Job-State, ueber alle Phasen verteilt
  (Laden 0-30, Profil 35-40, Pruefen 40-80, Banner 80-92, Report 95-100)
- Status-Texte vereinheitlicht ("Texte laden X/N", "Pruefen X/N")
- Firmenname fuer Email-Subject jetzt aus URL abgeleitet
  (bmw.de -> "BMW", mercedes-benz.de -> "Mercedes-Benz") statt
  unzuverlaessigem extracted_profile.companyName (matchte oft juris.de)
- E-Mail-Report enthaelt jetzt Banner+TCF-Vendor-Liste (build_provider_list_html)

Backend (agent_doc_check_extras.py — neu):
- build_scanned_urls_html: gepruefte URLs als Tabelle oben im Report
  (transparent fuer GF, welche Quellen wirklich gezogen wurden)
- Cross-Domain-Hinweis bei >1 netloc (BMW: bmw.de / bmwgroup.com /
  bmwgroup.jobs — Auffindbarkeit nach Art. 12 DSGVO)
- build_provider_list_html: Banner-Box + TCF-Vendor-Tabelle mit Spalten
  Name | Kategorie | Zweck | Drittland | Rechtsgrundlage

Backend (business_profiler.py):
- §34d-GewO Versicherungsvermittler-Hinweise zaehlen nicht mehr als
  "finance"-Industrie (BMW wurde dadurch falsch als B2B/finance erkannt)
- Neue Industry "automotive" (Fahrzeug/KFZ/Konfigurator/Modellpalette)
- B2B-Keywords: generische Begriffe wie "unternehmen", "beratung",
  "consulting" entfernt (matchten in jedem Konzerntext)
- B2C-Fallback: bei Verbraucher-Signalen ("widerruf", "kunde",
  redaktioneller Inhalt) tendiert auf b2c statt b2b

Frontend (ComplianceCheckTab.tsx):
- Progress-Balken mit Width-% und XX%-Anzeige rechts
- liest data.progress_pct aus Polling-Response

Consent-Tester (dsi_discovery.py):
- Cookie-Policy-Extraktion kritisch fixt: wait_for_function bis
  body.innerText > 500 chars (BMW SPA-Rendering brauchte mehr Zeit)
- _extract_text_robust: 3-Strategien-Extraktion (Selektoren -> Body-
  Cleanup -> P/LI/TD-Tags)
- _extract_text_from_iframes: liest OneTrust/Sourcepoint/Usercentrics
  Iframe-Inhalte (manche Cookie-Policies leben dort)

Adressiert alle Findings aus dem BMW-Ground-Truth-Vergleich.
This commit is contained in:
Benjamin Admin
2026-05-16 17:53:14 +02:00
parent 4d1e0a7f8e
commit e61e9d9e2a
6 changed files with 515 additions and 53 deletions
@@ -56,6 +56,7 @@ class ComplianceCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
progress_pct: int = 0
result: dict | None = None
error: str = ""
@@ -124,6 +125,7 @@ async def start_compliance_check(req: ComplianceCheckRequest):
_compliance_check_jobs[check_id] = {
"status": "running",
"progress": "Pruefung gestartet...",
"progress_pct": 0,
"result": None,
"error": "",
}
@@ -141,6 +143,7 @@ async def get_compliance_check_status(check_id: str):
check_id=check_id,
status=job["status"],
progress=job.get("progress", ""),
progress_pct=job.get("progress_pct", 0),
result=job.get("result"),
error=job.get("error", ""),
)
@@ -155,16 +158,18 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
from .agent_doc_check_routes import CheckItem, DocCheckResult
from .agent_doc_check_report import build_html_report
# Step 1: Resolve texts (fetch from URL if needed)
_update(check_id, "Texte werden geladen...")
# Step 1: Resolve texts (fetch from URL if needed) — 0-30%
_update(check_id, "Texte werden geladen...", 1)
doc_texts: dict[str, str] = {}
doc_entries: list[dict] = []
# Cache fetched URLs to detect duplicates
url_text_cache: dict[str, str] = {}
n_docs = max(1, len(req.documents))
for i, doc in enumerate(req.documents):
_update(check_id, f"Dokument {i+1}/{len(req.documents)}: {doc.doc_type}...")
pct = int(1 + (i / n_docs) * 29)
_update(check_id, f"Texte laden {i+1}/{n_docs}: {doc.doc_type}...", pct)
text = doc.text
if not text and doc.url:
url_key = doc.url.strip().rstrip("/").lower()
@@ -192,8 +197,8 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
split_shared_texts(doc_entries, url_text_cache)
auto_fill_from_dsi(doc_entries)
# Step 1c: Cross-document search — find doc_types in wrong documents
_update(check_id, "Dokumente werden uebergreifend durchsucht...")
# Step 1c: Cross-document search — find doc_types in wrong documents (30-35%)
_update(check_id, "Dokumente werden uebergreifend durchsucht...", 32)
placement_findings = cross_search_documents(doc_entries)
# Refresh doc_texts after all splitting/searching
@@ -201,8 +206,8 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
if entry.get("text"):
doc_texts[entry["doc_type"]] = entry["text"]
# Step 2: Detect business profile
_update(check_id, "Geschaeftsmodell wird erkannt...")
# Step 2: Detect business profile (35-40%)
_update(check_id, "Geschaeftsmodell wird erkannt...", 37)
profile = await detect_business_profile(doc_texts)
profile_dict = asdict(profile)
@@ -216,6 +221,8 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
# Filter out doc_types that don't apply to this business profile
skip_types = _get_skip_types(profile)
# Document checks: 40-80%
n_entries = max(1, len(doc_entries))
for i, entry in enumerate(doc_entries):
text = entry["text"]
doc_type = entry["doc_type"]
@@ -229,7 +236,8 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
))
continue
_update(check_id, f"Pruefe {label} ({i+1}/{len(doc_entries)})...")
pct = int(40 + (i / n_entries) * 40)
_update(check_id, f"Pruefen {i+1}/{n_entries}: {label}...", pct)
if not text or len(text) < 50:
results.append(DocCheckResult(
@@ -268,7 +276,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
parsed = urlparse(banner_url)
banner_url = f"{parsed.scheme}://{parsed.netloc}"
if banner_url:
_update(check_id, "Cookie-Banner wird geprueft...")
_update(check_id, "Cookie-Banner wird geprueft...", 82)
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
@@ -280,9 +288,9 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
except Exception as e:
logger.warning("Banner check failed: %s", e)
# Step 3c: Cross-check Banner vs Cookie-Richtlinie
# Step 3c: Cross-check Banner vs Cookie-Richtlinie (88-90%)
if banner_result and "cookie" in doc_texts:
_update(check_id, "Banner vs. Cookie-Richtlinie abgleichen...")
_update(check_id, "Banner vs. Cookie-Richtlinie abgleichen...", 89)
cross_findings = _cross_check_banner_vs_cookie(
banner_result, doc_texts["cookie"],
)
@@ -299,7 +307,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
tcf_vendors = banner_result.get("tcf_vendors", []) if banner_result else []
vvt_entries: list[dict] = []
if tcf_vendors and "dse" in doc_texts:
_update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...")
_update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...", 91)
from compliance.services.banner_cookie_cross_check import cross_check_vendors_vs_dsi
from compliance.services.vendor_vvt_mapper import map_vendors_to_vvt
vendor_findings = cross_check_vendors_vs_dsi(tcf_vendors, doc_texts["dse"])
@@ -310,8 +318,8 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
r.checks.append(CheckItem(**vf))
vvt_entries = map_vendors_to_vvt(tcf_vendors)
# Step 4: Extract profile hints from documents
_update(check_id, "Profil wird aus Dokumenten extrahiert...")
# Step 4: Extract profile hints from documents (92-95%)
_update(check_id, "Profil wird aus Dokumenten extrahiert...", 93)
from compliance.services.profile_extractor import extract_profile_from_documents
extracted_profile = extract_profile_from_documents(doc_texts, profile_dict)
@@ -326,21 +334,32 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
else:
r.scenario = "import"
# Step 5: Build report with management summary
_update(check_id, "Report wird erstellt...")
from .agent_doc_check_report import build_management_summary
# Step 5: Build report with management summary (95-98%)
_update(check_id, "Report wird erstellt...", 96)
from .agent_doc_check_report import (
build_management_summary,
build_scanned_urls_html,
build_provider_list_html,
)
summary_html = build_management_summary(results)
scanned_html = build_scanned_urls_html(doc_entries)
providers_html = build_provider_list_html(banner_result, vvt_entries)
report_html = build_html_report(results, None)
profile_html = _build_profile_html(profile)
full_html = summary_html + profile_html + report_html
# Step 6: Send email — include website/company name in subject
doc_count = len([r for r in results if not r.error])
site_name = (
extracted_profile.get("company_profile", {}).get("companyName")
or _extract_domain(doc_entries)
or "Unbekannt"
full_html = (
summary_html + scanned_html + profile_html
+ providers_html + report_html
)
# Step 6: Send email — derive site name primarily from entered URL.
# The extracted_profile.companyName is often noisy (e.g. picks up
# juris.de from legal references). Domain-derived name is more
# predictable for the GF email subject.
doc_count = len([r for r in results if not r.error])
url_company = _company_name_from_url(doc_entries)
domain = _extract_domain(doc_entries)
site_name = url_company or domain or "Unbekannt"
_update(check_id, "E-Mail wird versendet...", 98)
email_result = send_email(
recipient=req.recipient,
subject=f"[COMPLIANCE-CHECK] {site_name}{doc_count} Dokumente geprueft",
@@ -368,6 +387,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
_compliance_check_jobs[check_id]["status"] = "completed"
_compliance_check_jobs[check_id]["result"] = response
_compliance_check_jobs[check_id]["progress"] = "Fertig"
_compliance_check_jobs[check_id]["progress_pct"] = 100
except Exception as e:
logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True)
@@ -375,8 +395,11 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
def _update(check_id: str, msg: str):
_compliance_check_jobs[check_id]["progress"] = msg
def _update(check_id: str, msg: str, pct: int | None = None):
job = _compliance_check_jobs[check_id]
job["progress"] = msg
if pct is not None:
job["progress_pct"] = max(0, min(100, int(pct)))
async def _fetch_text(url: str) -> str:
@@ -503,14 +526,59 @@ async def _check_single(
)
_COMPOUND_TLDS = {
"co.uk", "co.jp", "co.nz", "co.kr", "co.za", "co.in",
"com.au", "com.br", "com.mx", "com.tr", "com.sg",
}
def _extract_domain(doc_entries: list[dict]) -> str | None:
"""Extract domain name from first URL for email subject."""
"""Extract base domain (without www) from first URL."""
for entry in doc_entries:
url = entry.get("url", "")
if url and "://" in url:
from urllib.parse import urlparse
host = urlparse(url).netloc
return host.replace("www.", "") if host else None
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
return host or None
return None
def _company_name_from_url(doc_entries: list[dict]) -> str | None:
"""Derive a display company name from the entered URLs.
Heuristic: take the second-level domain (e.g. "bmw" from "www.bmw.de"),
uppercase short acronyms (<=4 chars, no hyphens), title-case the rest.
Examples:
www.bmw.de -> BMW
mercedes-benz.de -> Mercedes-Benz
shop.example.co.uk -> Example
juris.de -> Juris
"""
from urllib.parse import urlparse
for entry in doc_entries:
url = entry.get("url", "")
if not url or "://" not in url:
continue
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
parts = host.split(".")
if len(parts) < 2:
continue
# Handle compound TLDs (.co.uk etc.)
if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS:
sld = parts[-3]
else:
sld = parts[-2]
if not sld:
continue
if len(sld) <= 4 and "-" not in sld:
return sld.upper()
return "-".join(p.capitalize() for p in sld.split("-"))
return None
@@ -0,0 +1,226 @@
"""
Extras for the agent doc-check email report.
Split out from agent_doc_check_report.py to keep both files under the
500-line hard cap. Contains:
- build_scanned_urls_html (list of fetched URLs + cross-domain notice)
- build_provider_list_html (cookie banner + TCF vendor table)
"""
from __future__ import annotations
def build_scanned_urls_html(doc_entries: list[dict]) -> str:
"""Render the list of scanned URLs at the top of the report.
Transparent for the GF which sources were actually fetched/analysed.
Skips empty URLs (text-only uploads). Adds a cross-domain warning when
legal texts are distributed across multiple domains (e.g. BMW spreads
across bmw.de, bmwgroup.com, bmwgroup.jobs).
"""
from urllib.parse import urlparse
rows: list[str] = []
seen: set[str] = set()
domains: dict[str, list[str]] = {} # netloc -> list of doc_types
for entry in doc_entries:
url = (entry.get("url") or "").strip()
if not url or url in seen:
continue
seen.add(url)
label = _doc_type_label(entry.get("doc_type", ""))
words = entry.get("word_count") or 0
try:
netloc = urlparse(url).netloc.lower().lstrip("www.")
if netloc:
domains.setdefault(netloc, []).append(label)
except Exception:
pass
rows.append(
f'<tr>'
f'<td style="padding:3px 12px 3px 0;color:#475569;font-size:12px">{label}</td>'
f'<td style="padding:3px 12px 3px 0;font-size:12px;'
f'font-family:ui-monospace,monospace;color:#1e293b;word-break:break-all">'
f'<a href="{url}" style="color:#2563eb;text-decoration:none">{url}</a></td>'
f'<td style="padding:3px 0;color:#94a3b8;font-size:11px;text-align:right;'
f'white-space:nowrap">{words} Woerter</td>'
f'</tr>'
)
if not rows:
return ""
cross_domain_html = _cross_domain_notice(domains) if len(domains) >= 2 else ""
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:700px;margin:0 auto 16px;padding:12px 16px;'
'background:#fafafa;border:1px solid #e5e7eb;border-radius:8px">'
'<h3 style="margin:0 0 8px;font-size:14px;color:#334155">'
f'Gepruefte Quellen ({len(rows)})</h3>'
'<table style="width:100%;border-collapse:collapse">'
+ "".join(rows)
+ '</table>'
+ cross_domain_html
+ '</div>'
)
def _cross_domain_notice(domains: dict[str, list[str]]) -> str:
"""Warning box when legal texts are spread across multiple domains.
Relevant for big corporate groups (BMW Group: bmw.de / bmwgroup.com /
bmwgroup.jobs). Affects findability for data subjects and may indicate
incomplete disclosure on the main site.
"""
items = []
for netloc, labels in sorted(domains.items()):
labels_str = ", ".join(sorted(set(labels)))
items.append(
f'<li style="margin-bottom:2px"><strong>{netloc}</strong> '
f'<span style="color:#92400e;font-size:11px">&rarr; {labels_str}</span></li>'
)
return (
'<div style="margin-top:12px;padding:10px 12px;background:#fffbeb;'
'border-left:3px solid #f59e0b;border-radius:4px;font-size:12px;'
'color:#78350f">'
'<strong>Hinweis: Rechtstexte verteilt auf '
f'{len(domains)} Domains.</strong> '
'Erschwert die Auffindbarkeit fuer Betroffene (Art. 12 Abs. 1 DSGVO &mdash; '
'transparente Information). Pruefen Sie, ob alle Texte auch von der '
'Hauptdomain aus klar verlinkt sind.'
'<ul style="margin:6px 0 0 16px;padding-left:0">'
+ "".join(items) +
'</ul></div>'
)
def _doc_type_label(doc_type: str) -> str:
"""Lazy resolver — avoids circular import with agent_compliance_check_routes."""
labels = {
"dse": "Datenschutzerklaerung",
"datenschutz": "Datenschutzerklaerung",
"privacy": "Datenschutzerklaerung",
"impressum": "Impressum",
"agb": "AGB",
"widerruf": "Widerrufsbelehrung",
"cookie": "Cookie-Richtlinie",
"avv": "Auftragsverarbeitung",
"loeschkonzept": "Loeschkonzept",
"dsfa": "Datenschutz-Folgenabschaetzung",
"social_media": "Social Media Datenschutz",
"nutzungsbedingungen": "Nutzungsbedingungen",
"dsb": "DSB-Kontakt",
}
return labels.get(doc_type, doc_type.upper() if doc_type else "Dokument")
def build_provider_list_html(
banner_result: dict | None,
vvt_entries: list[dict] | None,
) -> str:
"""Render the cookie banner result + TCF vendor table for the email.
Sections:
1. Banner summary (provider, violations count)
2. Vendor table: Name | Kategorie | Zweck | Drittland | Rechtsgrundlage
"""
if not banner_result and not vvt_entries:
return ""
parts: list[str] = [
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:700px;margin:0 auto 16px;padding:12px 16px;'
'background:#fafafa;border:1px solid #e5e7eb;border-radius:8px">'
'<h3 style="margin:0 0 10px;font-size:14px;color:#334155">'
'Cookie-Banner &amp; Verarbeiter</h3>'
]
if banner_result:
detected = banner_result.get("banner_detected", False)
provider = banner_result.get("banner_provider") or "unbekannt"
violations = banner_result.get("banner_checks", {}).get("violations", [])
n_viol = len(violations) if isinstance(violations, list) else int(violations or 0)
status_color = "#16a34a" if detected and n_viol == 0 else (
"#d97706" if detected else "#6b7280"
)
parts.append(
f'<div style="font-size:13px;color:#374151;margin-bottom:10px">'
f'<span style="display:inline-block;width:8px;height:8px;'
f'border-radius:50%;background:{status_color};margin-right:8px"></span>'
f'Banner erkannt: <strong>{"Ja" if detected else "Nein"}</strong>'
f' &nbsp;&middot;&nbsp; Anbieter: <strong>{provider}</strong>'
f' &nbsp;&middot;&nbsp; Auffaelligkeiten: <strong>{n_viol}</strong>'
f'</div>'
)
vendors = vvt_entries or []
if vendors:
parts.append(
f'<div style="font-size:12px;color:#475569;margin:8px 0 6px">'
f'<strong>{len(vendors)} TCF-Verarbeiter ueber das Banner eingebunden:</strong>'
f'</div>'
'<table style="width:100%;border-collapse:collapse;font-size:11px">'
'<thead><tr style="background:#f1f5f9;color:#475569;text-align:left">'
'<th style="padding:5px 8px">Name</th>'
'<th style="padding:5px 8px">Kategorie</th>'
'<th style="padding:5px 8px">Zweck</th>'
'<th style="padding:5px 8px">Drittland</th>'
'<th style="padding:5px 8px">Rechtsgrundlage</th>'
'</tr></thead><tbody>'
)
for v in vendors[:50]:
parts.append(_render_vendor_row(v))
parts.append('</tbody></table>')
if len(vendors) > 50:
parts.append(
f'<div style="font-size:11px;color:#94a3b8;margin-top:4px">'
f'... und {len(vendors) - 50} weitere</div>'
)
elif banner_result and banner_result.get("banner_detected"):
parts.append(
'<div style="font-size:11px;color:#94a3b8">'
'Keine TCF-Verarbeiter erkannt (Banner nutzt kein TCF v2 Framework '
'oder Vendor-Liste konnte nicht ausgelesen werden).</div>'
)
parts.append('</div>')
return "".join(parts)
def _render_vendor_row(v: dict) -> str:
name = v.get("name") or "Unbekannt"
kategorie = _category_label(v.get("kategorie", ""))
zweck = v.get("zweck_kurz") or ", ".join((v.get("zweck") or [])[:2])
drittland = v.get("drittland")
land = v.get("land") or ""
if drittland is True:
drittland_str = (f'<span style="color:#dc2626">Ja ({land})</span>'
if land else '<span style="color:#dc2626">Ja</span>')
elif drittland is False:
drittland_str = (f'<span style="color:#16a34a">Nein ({land})</span>'
if land else '<span style="color:#16a34a">Nein</span>')
else:
drittland_str = '<span style="color:#94a3b8">unbekannt</span>'
rg = v.get("rechtsgrundlage", "")
rg_short = "Einwilligung" if "Einwilligung" in rg else (
"Berechtigtes Interesse" if "Berechtigtes" in rg else rg[:40]
)
return (
f'<tr style="border-top:1px solid #e2e8f0">'
f'<td style="padding:4px 8px;color:#1e293b">{name}</td>'
f'<td style="padding:4px 8px;color:#475569">{kategorie}</td>'
f'<td style="padding:4px 8px;color:#475569">{zweck}</td>'
f'<td style="padding:4px 8px">{drittland_str}</td>'
f'<td style="padding:4px 8px;color:#475569">{rg_short}</td>'
f'</tr>'
)
def _category_label(kat: str) -> str:
return {
"necessary": "Notwendig",
"functional": "Funktional",
"statistics": "Statistik",
"marketing": "Marketing",
}.get(kat, kat or "")
@@ -290,6 +290,15 @@ def _render_cookie_banner(html: list[str], cookie_result: dict) -> None:
html.append('</div>')
# Re-export the helpers extracted to agent_doc_check_extras.py so existing
# callers that did `from .agent_doc_check_report import build_scanned_urls_html`
# keep working.
from .agent_doc_check_extras import ( # noqa: E402,F401
build_provider_list_html,
build_scanned_urls_html,
)
def build_profile_html(profile) -> str:
"""Build a small HTML block summarizing the detected business profile."""
service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt"