"""
Extras for the agent doc-check email report.
Split out from agent_doc_check_report.py to keep both files under the
500-line hard cap. Contains:
- build_scanned_urls_html (list of fetched URLs + cross-domain notice)
- build_provider_list_html (cookie banner + TCF vendor table)
"""
from __future__ import annotations
def build_scanned_urls_html(doc_entries: list[dict]) -> str:
"""Render the list of scanned URLs at the top of the report.
Transparent for the GF which sources were actually fetched/analysed.
Skips empty URLs (text-only uploads). Adds a cross-domain warning when
legal texts are distributed across multiple domains (e.g. BMW spreads
across bmw.de, bmwgroup.com, bmwgroup.jobs).
"""
from urllib.parse import urlparse
rows: list[str] = []
seen: set[str] = set()
domains: dict[str, list[str]] = {} # netloc -> list of doc_types
for entry in doc_entries:
url = (entry.get("url") or "").strip()
if not url or url in seen:
continue
seen.add(url)
label = _doc_type_label(entry.get("doc_type", ""))
words = entry.get("word_count") or 0
auto = entry.get("auto_discovered")
try:
netloc = urlparse(url).netloc.lower().lstrip("www.")
if netloc:
domains.setdefault(netloc, []).append(label)
except Exception:
pass
badge = (''
'auto-entdeckt') if auto else ""
rows.append(
f'
'
f'| '
f'{label}{badge} | '
f''
f'{url} | '
f'{words} Woerter | '
f'
'
)
if not rows:
return ""
cross_domain_html = _cross_domain_notice(domains) if len(domains) >= 2 else ""
return (
''
'
'
f'Gepruefte Quellen ({len(rows)})
'
'
'
+ cross_domain_html
+ '
'
)
def _cross_domain_notice(domains: dict[str, list[str]]) -> str:
"""Warning box when legal texts are spread across multiple domains.
Relevant for big corporate groups (BMW Group: bmw.de / bmwgroup.com /
bmwgroup.jobs). Affects findability for data subjects and may indicate
incomplete disclosure on the main site.
"""
items = []
for netloc, labels in sorted(domains.items()):
labels_str = ", ".join(sorted(set(labels)))
items.append(
f'{netloc} '
f'→ {labels_str}'
)
return (
''
'
Hinweis: Rechtstexte verteilt auf '
f'{len(domains)} Domains. '
'Erschwert die Auffindbarkeit fuer Betroffene (Art. 12 Abs. 1 DSGVO — '
'transparente Information). Pruefen Sie, ob alle Texte auch von der '
'Hauptdomain aus klar verlinkt sind.'
'
'
)
def _doc_type_label(doc_type: str) -> str:
"""Lazy resolver — avoids circular import with agent_compliance_check_routes."""
labels = {
"dse": "Datenschutzerklaerung",
"datenschutz": "Datenschutzerklaerung",
"privacy": "Datenschutzerklaerung",
"impressum": "Impressum",
"agb": "AGB",
"widerruf": "Widerrufsbelehrung",
"cookie": "Cookie-Richtlinie",
"avv": "Auftragsverarbeitung",
"loeschkonzept": "Loeschkonzept",
"dsfa": "Datenschutz-Folgenabschaetzung",
"social_media": "Social Media Datenschutz",
"nutzungsbedingungen": "Nutzungsbedingungen",
"dsb": "DSB-Kontakt",
}
return labels.get(doc_type, doc_type.upper() if doc_type else "Dokument")
def build_provider_list_html(
banner_result: dict | None,
vvt_entries: list[dict] | None,
) -> str:
"""Render the cookie banner result + TCF vendor table for the email.
Sections:
1. Banner summary (provider, violations count)
2. Vendor table: Name | Kategorie | Zweck | Drittland | Rechtsgrundlage
"""
if not banner_result and not vvt_entries:
return ""
parts: list[str] = [
''
'
'
'Cookie-Banner & Verarbeiter
'
]
if banner_result:
detected = banner_result.get("banner_detected", False)
provider = banner_result.get("banner_provider") or "unbekannt"
violations = banner_result.get("banner_checks", {}).get("violations", [])
n_viol = len(violations) if isinstance(violations, list) else int(violations or 0)
status_color = "#16a34a" if detected and n_viol == 0 else (
"#d97706" if detected else "#6b7280"
)
parts.append(
f'
'
f''
f'Banner erkannt: {"Ja" if detected else "Nein"}'
f' · Anbieter: {provider}'
f' · Auffaelligkeiten: {n_viol}'
f'
'
)
vendors = vvt_entries or []
if vendors:
parts.append(
f'
'
f'{len(vendors)} TCF-Verarbeiter ueber das Banner eingebunden:'
f'
'
'
'
''
'| Name | '
'Kategorie | '
'Zweck | '
'Drittland | '
'Rechtsgrundlage | '
'
'
)
for v in vendors[:50]:
parts.append(_render_vendor_row(v))
parts.append('
')
if len(vendors) > 50:
parts.append(
f'
'
f'... und {len(vendors) - 50} weitere
'
)
elif banner_result and banner_result.get("banner_detected"):
parts.append(
'
'
'Keine TCF-Verarbeiter erkannt (Banner nutzt kein TCF v2 Framework '
'oder Vendor-Liste konnte nicht ausgelesen werden).
'
)
parts.append('
')
return "".join(parts)
def _render_vendor_row(v: dict) -> str:
name = v.get("name") or "Unbekannt"
kategorie = _category_label(v.get("kategorie", ""))
zweck = v.get("zweck_kurz") or ", ".join((v.get("zweck") or [])[:2])
drittland = v.get("drittland")
land = v.get("land") or ""
if drittland is True:
drittland_str = (f'Ja ({land})'
if land else 'Ja')
elif drittland is False:
drittland_str = (f'Nein ({land})'
if land else 'Nein')
else:
drittland_str = 'unbekannt'
rg = v.get("rechtsgrundlage", "")
rg_short = "Einwilligung" if "Einwilligung" in rg else (
"Berechtigtes Interesse" if "Berechtigtes" in rg else rg[:40]
)
return (
f''
f'| {name} | '
f'{kategorie} | '
f'{zweck} | '
f'{drittland_str} | '
f'{rg_short} | '
f'
'
)
def _category_label(kat: str) -> str:
return {
"necessary": "Notwendig",
"functional": "Funktional",
"statistics": "Statistik",
"marketing": "Marketing",
"strictlyNecessary": "Notwendig",
"advertising": "Marketing",
}.get(kat, kat or "—")
def build_vvt_table_html(vendors: list[dict]) -> str:
"""Render the per-vendor VVT-style table for the email report.
Splits vendors into 3-4 sections by recipient_type (Art. 30(1)(d)
DSGVO):
1. INTERNAL — own departments / own systems
2. GROUP_COMPANY — parent/subsidiary (if any)
3. PROCESSOR — Auftragsverarbeiter (AVV-pflichtig)
4. CONTROLLER — joint/independent controllers (Meta, Google,
LinkedIn — they build own profiles)
5. AUTHORITY / OTHER — rest
Within each section: rows sorted by compliance_score ascending so
the weakest entries surface first.
"""
if not vendors:
return ""
# Import here to avoid pulling backend service deps at module load
from compliance.services.vendor_classifier import RECIPIENT_TYPE_SECTIONS
# Bucket vendors by recipient_type
by_type: dict[str, list[dict]] = {}
for v in vendors:
rt = (v.get("recipient_type") or "OTHER").upper()
by_type.setdefault(rt, []).append(v)
# Top summary
n_total = len(vendors)
n_internal = sum(1 for v in vendors
if (v.get("recipient_type") or "").upper()
in ("INTERNAL", "GROUP_COMPANY"))
n_external = n_total - n_internal
n_critical = sum(1 for v in vendors if v.get("compliance_score", 0) < 50)
summary_parts = [f"{n_total} Verarbeitungen erfasst"]
if n_internal and n_external:
summary_parts.append(
f"— {n_internal} eigene + {n_external} externe Empfaenger"
)
if n_critical:
summary_parts.append(
f', {n_critical} unter 50%'
)
else:
summary_parts.append("— alle ueber 50%")
summary = " ".join(summary_parts)
out: list[str] = [
'',
'
'
'VVT-Vorschlag: Verarbeitungstaetigkeiten und Empfaenger aus der '
'Cookie-Richtlinie
',
f'
{summary}. '
'Gruppiert nach Empfaengerkategorie (Art. 30(1)(d) DSGVO). Innerhalb '
'jeder Gruppe nach Compliance-Score sortiert. Bei eigenen '
'Verarbeitungen (INTERNAL/GROUP) werden Opt-Out und Privacy-Link '
'NICHT als Pflicht gewertet — der Widerruf erfolgt ueber das '
'Cookie-Banner, Privacy ist in der Haupt-DSI dokumentiert.
',
]
for rtype, section_label in RECIPIENT_TYPE_SECTIONS:
rows = by_type.get(rtype) or []
if not rows:
continue
rows = sorted(rows, key=lambda v: v.get("compliance_score", 0))
n = len(rows)
n_bad = sum(1 for v in rows if v.get("compliance_score", 0) < 50)
bad_hint = (f'
({n_bad} unter 50%)'
if n_bad else "")
out.append(
f'
'
f'{section_label} '
f'({n}){bad_hint}
'
)
out.append(_render_vendor_section(rows))
out.append('
')
return "".join(out)
def _render_vendor_section(rows: list[dict]) -> str:
body: list[str] = [
''
''
'| Name | '
'Kategorie | '
'Sitz | '
'Cookies | '
'Opt-Out | '
'Privacy | '
'Score | '
'
',
]
for v in rows:
body.append(_render_vendor_row_full(v))
body.append('
')
return "".join(body)
def _render_vendor_row_full(v: dict) -> str:
rtype = (v.get("recipient_type") or "OTHER").upper()
is_own = rtype in ("INTERNAL", "GROUP_COMPANY")
cat = (v.get("category") or "").lower()
is_necessary = cat in ("necessary", "strictlynecessary")
name = v.get("name") or "Unbekannt"
category = _category_label(v.get("category", ""))
country = v.get("country") or ("—" if is_own else "—")
cookies = v.get("cookies") or []
n_cookies = len(cookies)
score = int(v.get("compliance_score", 0))
flags = v.get("compliance_flags") or []
# Opt-Out: nicht erforderlich fuer eigene Verarbeitung oder
# technisch notwendige Cookies (§25 Abs. 2 TDDDG).
opt_na_reason = ("Nicht erforderlich (eigene Verarbeitung — "
"Widerruf ueber Cookie-Banner)") if is_own else (
"Nicht erforderlich (§25 Abs. 2 TDDDG — technisch notwendig)"
if is_necessary else None
)
opt_status = _link_status_badge(
v.get("opt_out_url"), v.get("opt_out_ok"), v.get("opt_out_status"),
na_label=opt_na_reason,
)
# Privacy: nicht erforderlich fuer eigene Verarbeitung (Haupt-DSI).
privacy_na_reason = (
"Nicht erforderlich (eigene Verarbeitung — durch Haupt-DSI abgedeckt)"
if is_own else None
)
privacy_status = _link_status_badge(
v.get("privacy_policy_url"), v.get("privacy_ok"),
v.get("privacy_status"), na_label=privacy_na_reason,
)
score_color = ("#16a34a" if score >= 80 else
"#d97706" if score >= 50 else "#dc2626")
# Score-Erklaerung: was wurde gewertet, was fehlt
# Annahme: Score = bestandene Kriterien / Gesamtkriterien * 100.
# Typisch 5 Kriterien fuer EXT: country, cookies, opt_out, privacy, scoring.
# Bei INTERNAL/GROUP: opt_out + privacy nicht gewertet (3 Kriterien).
n_criteria = 3 if is_own else 5
n_failed = len(flags) if flags else 0
score_tooltip = (
f"{n_criteria - n_failed} von {n_criteria} Kriterien erfuellt"
+ (f" — fehlt: {', '.join(_flag_short(f) for f in flags[:3])}"
if flags else "")
)
# Inline-Aktions-Anweisungen pro Flag
actions_html = ""
if flags:
from compliance.services.finding_action_recipes import recipe_for
action_items = []
for f in flags:
rec = recipe_for(f)
if not rec:
continue
action_items.append(
f'{_flag_short(f)}: '
f'{rec.get("what", "")}
'
f'Was tun: '
f'{rec.get("fix_text", "").splitlines()[0][:200]}
'
f'Quelle: '
f'{rec.get("why", "")[:160]}'
)
if action_items:
actions_html = (
f'Was muss ich tun? '
f'({len(action_items)} Action{"s" if len(action_items) != 1 else ""})
'
f''
+ "".join(action_items)
+ '
'
)
flag_str = ""
if flags:
flag_str = (
f''
f'{", ".join(flags[:4])}
'
f'{actions_html}'
)
return (
f''
f'| '
f'{name}{flag_str} | '
f'{category} | '
f'{country} | '
f''
f'{n_cookies} | '
f'{opt_status} | '
f'{privacy_status} | '
f''
f'{score}% '
f'{n_criteria - n_failed}/{n_criteria} | '
f'
'
)
def _flag_short(f: str) -> str:
"""Lesbare deutsche Form fuer einen Flag-Token."""
labels = {
"no_cookies_listed": "Cookies fehlen",
"no_country": "Sitzland fehlt",
"no_privacy_url": "Privacy-Link fehlt",
"broken_privacy_url": "Privacy-Link broken",
"no_opt_out_url": "Opt-Out fehlt",
"broken_opt_out": "Opt-Out broken",
}
return labels.get(f, f)
def _link_status_badge(
url: str | None,
ok: bool | None,
status: int | None,
na_label: str | None = None,
) -> str:
"""Render the link-status cell.
- url + ok -> green check
- url + broken -> red cross with status
- no url + na_label -> neutral em-dash with explanation tooltip
(used for INTERNAL/necessary rows where the field isn't required)
- no url + no na_label -> red cross (real gap)
"""
if not url:
if na_label:
return ('—')
return ('✗')
if ok:
return ('✓')
status_str = str(status) if status else "?"
return ('✗ ({status_str})')