""" Extras for the agent doc-check email report. Split out from agent_doc_check_report.py to keep both files under the 500-line hard cap. Contains: - build_scanned_urls_html (list of fetched URLs + cross-domain notice) - build_provider_list_html (cookie banner + TCF vendor table) """ from __future__ import annotations def build_scanned_urls_html(doc_entries: list[dict]) -> str: """Render the list of scanned URLs at the top of the report. Transparent for the GF which sources were actually fetched/analysed. Skips empty URLs (text-only uploads). Adds a cross-domain warning when legal texts are distributed across multiple domains (e.g. BMW spreads across bmw.de, bmwgroup.com, bmwgroup.jobs). """ from urllib.parse import urlparse rows: list[str] = [] seen: set[str] = set() domains: dict[str, list[str]] = {} # netloc -> list of doc_types for entry in doc_entries: url = (entry.get("url") or "").strip() if not url or url in seen: continue seen.add(url) label = _doc_type_label(entry.get("doc_type", "")) words = entry.get("word_count") or 0 auto = entry.get("auto_discovered") try: netloc = urlparse(url).netloc.lower().lstrip("www.") if netloc: domains.setdefault(netloc, []).append(label) except Exception: pass badge = ('' 'auto-entdeckt') if auto else "" rows.append( f'' f'' f'{label}{badge}' f'' f'{url}' f'{words} Woerter' f'' ) if not rows: return "" cross_domain_html = _cross_domain_notice(domains) if len(domains) >= 2 else "" return ( '
' '

' f'Gepruefte Quellen ({len(rows)})

' '' + "".join(rows) + '
' + cross_domain_html + '
' ) def _cross_domain_notice(domains: dict[str, list[str]]) -> str: """Warning box when legal texts are spread across multiple domains. Relevant for big corporate groups (BMW Group: bmw.de / bmwgroup.com / bmwgroup.jobs). Affects findability for data subjects and may indicate incomplete disclosure on the main site. """ items = [] for netloc, labels in sorted(domains.items()): labels_str = ", ".join(sorted(set(labels))) items.append( f'
  • {netloc} ' f'→ {labels_str}
  • ' ) return ( '
    ' 'Hinweis: Rechtstexte verteilt auf ' f'{len(domains)} Domains. ' 'Erschwert die Auffindbarkeit fuer Betroffene (Art. 12 Abs. 1 DSGVO — ' 'transparente Information). Pruefen Sie, ob alle Texte auch von der ' 'Hauptdomain aus klar verlinkt sind.' '
    ' ) def _doc_type_label(doc_type: str) -> str: """Lazy resolver — avoids circular import with agent_compliance_check_routes.""" labels = { "dse": "Datenschutzerklaerung", "datenschutz": "Datenschutzerklaerung", "privacy": "Datenschutzerklaerung", "impressum": "Impressum", "agb": "AGB", "widerruf": "Widerrufsbelehrung", "cookie": "Cookie-Richtlinie", "avv": "Auftragsverarbeitung", "loeschkonzept": "Loeschkonzept", "dsfa": "Datenschutz-Folgenabschaetzung", "social_media": "Social Media Datenschutz", "nutzungsbedingungen": "Nutzungsbedingungen", "dsb": "DSB-Kontakt", } return labels.get(doc_type, doc_type.upper() if doc_type else "Dokument") def build_provider_list_html( banner_result: dict | None, vvt_entries: list[dict] | None, ) -> str: """Render the cookie banner result + TCF vendor table for the email. Sections: 1. Banner summary (provider, violations count) 2. Vendor table: Name | Kategorie | Zweck | Drittland | Rechtsgrundlage """ if not banner_result and not vvt_entries: return "" parts: list[str] = [ '
    ' '

    ' 'Cookie-Banner & Verarbeiter

    ' ] if banner_result: detected = banner_result.get("banner_detected", False) provider = banner_result.get("banner_provider") or "unbekannt" violations = banner_result.get("banner_checks", {}).get("violations", []) n_viol = len(violations) if isinstance(violations, list) else int(violations or 0) status_color = "#16a34a" if detected and n_viol == 0 else ( "#d97706" if detected else "#6b7280" ) parts.append( f'
    ' f'' f'Banner erkannt: {"Ja" if detected else "Nein"}' f'  ·  Anbieter: {provider}' f'  ·  Auffaelligkeiten: {n_viol}' f'
    ' ) vendors = vvt_entries or [] if vendors: parts.append( f'
    ' f'{len(vendors)} TCF-Verarbeiter ueber das Banner eingebunden:' f'
    ' '' '' '' '' '' '' '' '' ) for v in vendors[:50]: parts.append(_render_vendor_row(v)) parts.append('
    NameKategorieZweckDrittlandRechtsgrundlage
    ') if len(vendors) > 50: parts.append( f'
    ' f'... und {len(vendors) - 50} weitere
    ' ) elif banner_result and banner_result.get("banner_detected"): parts.append( '
    ' 'Keine TCF-Verarbeiter erkannt (Banner nutzt kein TCF v2 Framework ' 'oder Vendor-Liste konnte nicht ausgelesen werden).
    ' ) parts.append('
    ') return "".join(parts) def _render_vendor_row(v: dict) -> str: name = v.get("name") or "Unbekannt" kategorie = _category_label(v.get("kategorie", "")) zweck = v.get("zweck_kurz") or ", ".join((v.get("zweck") or [])[:2]) drittland = v.get("drittland") land = v.get("land") or "" if drittland is True: drittland_str = (f'Ja ({land})' if land else 'Ja') elif drittland is False: drittland_str = (f'Nein ({land})' if land else 'Nein') else: drittland_str = 'unbekannt' rg = v.get("rechtsgrundlage", "") rg_short = "Einwilligung" if "Einwilligung" in rg else ( "Berechtigtes Interesse" if "Berechtigtes" in rg else rg[:40] ) return ( f'' f'{name}' f'{kategorie}' f'{zweck}' f'{drittland_str}' f'{rg_short}' f'' ) def _category_label(kat: str) -> str: return { "necessary": "Notwendig", "functional": "Funktional", "statistics": "Statistik", "marketing": "Marketing", "strictlyNecessary": "Notwendig", "advertising": "Marketing", }.get(kat, kat or "—") def build_vvt_table_html(vendors: list[dict]) -> str: """Render the per-vendor VVT-style table for the email report. Splits vendors into 3-4 sections by recipient_type (Art. 30(1)(d) DSGVO): 1. INTERNAL — own departments / own systems 2. GROUP_COMPANY — parent/subsidiary (if any) 3. PROCESSOR — Auftragsverarbeiter (AVV-pflichtig) 4. CONTROLLER — joint/independent controllers (Meta, Google, LinkedIn — they build own profiles) 5. AUTHORITY / OTHER — rest Within each section: rows sorted by compliance_score ascending so the weakest entries surface first. """ if not vendors: return "" # Import here to avoid pulling backend service deps at module load from compliance.services.vendor_classifier import RECIPIENT_TYPE_SECTIONS # Bucket vendors by recipient_type by_type: dict[str, list[dict]] = {} for v in vendors: rt = (v.get("recipient_type") or "OTHER").upper() by_type.setdefault(rt, []).append(v) # Top summary n_total = len(vendors) n_internal = sum(1 for v in vendors if (v.get("recipient_type") or "").upper() in ("INTERNAL", "GROUP_COMPANY")) n_external = n_total - n_internal n_critical = sum(1 for v in vendors if v.get("compliance_score", 0) < 50) summary_parts = [f"{n_total} Verarbeitungen erfasst"] if n_internal and n_external: summary_parts.append( f"— {n_internal} eigene + {n_external} externe Empfaenger" ) if n_critical: summary_parts.append( f', {n_critical} unter 50%' ) else: summary_parts.append("— alle ueber 50%") summary = " ".join(summary_parts) out: list[str] = [ '
    ', '

    ' 'VVT-Vorschlag: Verarbeitungstaetigkeiten und Empfaenger aus der ' 'Cookie-Richtlinie

    ', f'

    {summary}. ' 'Gruppiert nach Empfaengerkategorie (Art. 30(1)(d) DSGVO). Innerhalb ' 'jeder Gruppe nach Compliance-Score sortiert. Bei eigenen ' 'Verarbeitungen (INTERNAL/GROUP) werden Opt-Out und Privacy-Link ' 'NICHT als Pflicht gewertet — der Widerruf erfolgt ueber das ' 'Cookie-Banner, Privacy ist in der Haupt-DSI dokumentiert.

    ', ] for rtype, section_label in RECIPIENT_TYPE_SECTIONS: rows = by_type.get(rtype) or [] if not rows: continue rows = sorted(rows, key=lambda v: v.get("compliance_score", 0)) n = len(rows) n_bad = sum(1 for v in rows if v.get("compliance_score", 0) < 50) bad_hint = (f' ({n_bad} unter 50%)' if n_bad else "") out.append( f'

    ' f'{section_label} ' f'({n}){bad_hint}

    ' ) out.append(_render_vendor_section(rows)) out.append('
    ') return "".join(out) def _render_vendor_section(rows: list[dict]) -> str: body: list[str] = [ '' '' '' '' '' '' '' '' '' '', ] for v in rows: body.append(_render_vendor_row_full(v)) body.append('
    NameKategorieSitzCookiesOpt-OutPrivacyScore
    ') return "".join(body) def _render_vendor_row_full(v: dict) -> str: rtype = (v.get("recipient_type") or "OTHER").upper() is_own = rtype in ("INTERNAL", "GROUP_COMPANY") cat = (v.get("category") or "").lower() is_necessary = cat in ("necessary", "strictlynecessary") name = v.get("name") or "Unbekannt" category = _category_label(v.get("category", "")) country = v.get("country") or ("—" if is_own else "—") cookies = v.get("cookies") or [] n_cookies = len(cookies) score = int(v.get("compliance_score", 0)) flags = v.get("compliance_flags") or [] # Opt-Out: nicht erforderlich fuer eigene Verarbeitung oder # technisch notwendige Cookies (§25 Abs. 2 TDDDG). opt_na_reason = ("Nicht erforderlich (eigene Verarbeitung — " "Widerruf ueber Cookie-Banner)") if is_own else ( "Nicht erforderlich (§25 Abs. 2 TDDDG — technisch notwendig)" if is_necessary else None ) opt_status = _link_status_badge( v.get("opt_out_url"), v.get("opt_out_ok"), v.get("opt_out_status"), na_label=opt_na_reason, ) # Privacy: nicht erforderlich fuer eigene Verarbeitung (Haupt-DSI). privacy_na_reason = ( "Nicht erforderlich (eigene Verarbeitung — durch Haupt-DSI abgedeckt)" if is_own else None ) privacy_status = _link_status_badge( v.get("privacy_policy_url"), v.get("privacy_ok"), v.get("privacy_status"), na_label=privacy_na_reason, ) score_color = ("#16a34a" if score >= 80 else "#d97706" if score >= 50 else "#dc2626") flag_str = "" if flags: flag_str = ( f'
    ' f'{", ".join(flags[:4])}
    ' ) return ( f'' f'' f'{name}{flag_str}' f'{category}' f'{country}' f'' f'{n_cookies}' f'{opt_status}' f'{privacy_status}' f'{score}%' f'' ) def _link_status_badge( url: str | None, ok: bool | None, status: int | None, na_label: str | None = None, ) -> str: """Render the link-status cell. - url + ok -> green check - url + broken -> red cross with status - no url + na_label -> neutral em-dash with explanation tooltip (used for INTERNAL/necessary rows where the field isn't required) - no url + no na_label -> red cross (real gap) """ if not url: if na_label: return ('') return ('') if ok: return ('') status_str = str(status) if status else "?" return ('✗ ({status_str})')