From 081e4f057a3cf4862ceebf3e6b1000ef0a6ae49d Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 21 May 2026 23:36:45 +0200 Subject: [PATCH] feat(audit): Cookie-Compliance-Audit (3-Quellen-Vergleich) + Vendor-Dedup + Block-Parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ZENTRALER USP: cookie_compliance_audit.py vergleicht 3 Quellen * DEKLARIERT in Cookie-Richtlinie (parse_cookie_table + parse_flat) * TATSAECHLICH im Browser geladen (banner_result.phases.after_accept) * LIBRARY-Metadaten (cookie_library lookup) Liefert 3 Listen mit Compliance-Verdict: * compliant (deklariert UND geladen) — gruener Block * undeclared_in_browser (geladen NICHT deklariert) — ROTER HIGH-Block → Art. 13(1)(c) DSGVO + § 25 TDDDG Verstoss * declared_not_loaded (deklariert NICHT geladen) — gelber Hinweis → Tabelle moeglicherweise veraltet parse_cookie_table erweitert um Block-Format (5 Zeilen pro Cookie wie beim User-Copy aus VW). Findet 35+ Cookies aus Copy-Paste statt 0. vendor_normalizer.py: 50+ Aliases (Google-Familie, Adobe-Familie, Trade Desk, AdForm, ...) + Garbage-Filter (URLs, leere Strings, 'click to select', 'Mehrere OEMs'). Mergt cookies-Listen beim Dedup. _guess_vendor erweitert: Adobe-Familie (s_ecid/AMCV/demdex/mbox/...), Trade Desk (TDID/TDCPM/TTDOptOut), AdForm (uid/cid/otsid), Salesforce LiveAgent, etracker, Akamai, EDAA. audit_quality_checks: vendor-thin-Threshold jetzt dynamisch nach Cookie-Doc-Wörter (3k→10 / 6k→20 / 10k→30 / 15k+→40). VW-Test-Fixture: tests/fixtures/cookie_gt/vw_cookie_richtlinie.txt (36-Cookie-Sample fuer Regression-Tests). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../api/agent_compliance_check_routes.py | 45 +++- .../services/audit_quality_checks.py | 45 ++-- .../services/cookie_compliance_audit.py | 221 ++++++++++++++++++ .../services/cookies_table_parser.py | 167 ++++++++++++- .../compliance/services/vendor_normalizer.py | 167 +++++++++++++ .../cookie_gt/vw_cookie_richtlinie.txt | 55 +++++ 6 files changed, 678 insertions(+), 22 deletions(-) create mode 100644 backend-compliance/compliance/services/cookie_compliance_audit.py create mode 100644 backend-compliance/compliance/services/vendor_normalizer.py create mode 100644 backend-compliance/tests/fixtures/cookie_gt/vw_cookie_richtlinie.txt diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 979d855f..7e91c9d2 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -948,6 +948,15 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("Cookie-Library-Fallback skipped: %s", e) + # Vendor-Normalizer: Dedup (Google-Familie etc) + Garbage-Filter + try: + from compliance.services.vendor_normalizer import ( + normalize_vendors as _norm_v, + ) + cmp_vendors = _norm_v(cmp_vendors) + except Exception as e: + logger.warning("vendor_normalizer skipped: %s", e) + # P50: enrich vendors with per-vendor detail-modal-extracts # (description, opt-out URL, privacy URL, cookies). Detail # comes from Phase G Info-button-click-through in /scan. @@ -1276,6 +1285,38 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("Scope-disclaimer block skipped: %s", e) + # COOKIE-COMPLIANCE-AUDIT (3-Quellen-Vergleich) — das ist der + # zentrale USP: deklariert in Richtlinie vs tatsaechlich im + # Browser geladen vs Library-Match. + cookie_audit = {} + cookie_audit_html = "" + try: + from compliance.services.cookie_compliance_audit import ( + audit_cookie_compliance, build_cookie_audit_block_html, + ) + from database import SessionLocal as _SLca + _ca_db = _SLca() + try: + cookie_audit = audit_cookie_compliance( + _ca_db, doc_texts.get("cookie") or doc_texts.get("dse"), + banner_result, + ) + if cookie_audit and (cookie_audit.get("declared_count") or + cookie_audit.get("browser_count")): + cookie_audit_html = build_cookie_audit_block_html(cookie_audit) + logger.info( + "Cookie-Audit: %d deklariert, %d im Browser, " + "%d undokumentiert, %d compliant", + cookie_audit.get("declared_count"), + cookie_audit.get("browser_count"), + len(cookie_audit.get("undeclared_in_browser") or []), + len(cookie_audit.get("compliant") or []), + ) + finally: + _ca_db.close() + except Exception as e: + logger.warning("cookie-compliance-audit skipped: %s", e) + # P102: Cookie-Klassifikations-Pruefung (deklariert vs Library) library_mismatch_html = "" mismatches: list[dict] = [] @@ -1481,7 +1522,9 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): + critical_html + scope_disclaimer_html + exec_summary_html + cookie_arch_html + summary_html + scanned_html + profile_html + scorecard_html + redundancy_html - + providers_html + banner_deep_html + library_mismatch_html + + providers_html + banner_deep_html + + cookie_audit_html + + library_mismatch_html + consistency_html + signals_html + solutions_html + jc_decision_html + vvt_html + report_html diff --git a/backend-compliance/compliance/services/audit_quality_checks.py b/backend-compliance/compliance/services/audit_quality_checks.py index 8470c0f8..57858e55 100644 --- a/backend-compliance/compliance/services/audit_quality_checks.py +++ b/backend-compliance/compliance/services/audit_quality_checks.py @@ -67,33 +67,48 @@ def check_vendor_extract_incomplete( cookie_doc_text: str | None, cmp_vendors: list | None, ) -> dict | None: - """2) Cookie-Doc gross aber wenig Vendors → Extract unvollstaendig.""" + """2) Cookie-Doc gross aber wenig Vendors → Extract unvollstaendig. + + Dynamische Schwelle nach Doc-Groesse: + * 3k-6k Wörter → mind. 10 Vendors erwartet + * 6k-10k Wörter → mind. 20 Vendors + * 10k-15k Wörter → mind. 30 Vendors + * 15k+ Wörter → mind. 40 Vendors + """ wc = _word_count(cookie_doc_text) n_vendors = len(cmp_vendors or []) - # Heuristik: Cookie-Doc >= 5000 Wörter (~30k chars) sollte zu mind. 15 - # Vendors fuehren. Wenn weniger → Vendor-Extraktion hat den Text nicht - # vollstaendig verarbeitet. - if wc < 5000 or n_vendors >= 15: + if wc < 3000: + return None + # Erwartete Vendor-Anzahl heuristisch nach Doc-Groesse + if wc >= 15000: + expected = 40 + elif wc >= 10000: + expected = 30 + elif wc >= 6000: + expected = 20 + else: + expected = 10 + if n_vendors >= expected: return None - # Verhaeltniszahl bilden — je groesser das Doc, desto auffaelliger return { "severity": "HIGH" if wc >= 8000 else "MEDIUM", "code": "audit_vendor_extract_thin", "label": ( f"Audit-Vorbehalt: Cookie-Richtlinie hat {wc:,} Wörter, " - f"wir konnten aber nur {n_vendors} Vendor" - f"{'en' if n_vendors != 1 else ''} extrahieren" + f"erwartet ~{expected} Vendors, extrahiert nur {n_vendors}" ).replace(",", "."), "area": "Vendor-Liste / VVT", "owner": "DSB + Marketing", "detail": ( - "Bei dieser Doc-Groesse erwarten wir typischerweise 20-50+ " - "Vendors in einer Cookie-Richtlinie. Die niedrige extrahierte " - "Zahl deutet auf eine Tabelle die unser LLM nicht vollstaendig " - "parsen konnte. Empfehlung: VVT-Tabelle mit DSB / Marketing " - "manuell abgleichen, oder die Cookie-Tabelle im Copy-Paste-Modus " - "neu einreichen — dort parsen wir Spalten deterministisch." - ), + f"Bei einer Cookie-Richtlinie mit {wc:,} Woertern erwarten wir " + f"typischerweise {expected}+ unique Vendors. Die extrahierte Zahl " + f"({n_vendors}) ist auffaellig niedrig — entweder hat unser " + "Parser/LLM die Tabelle nicht vollstaendig erfasst oder " + "Vendors wurden zu konservativ erkannt. Empfehlung: Cookie-" + "Tabelle im Copy-Paste-Modus einreichen (Frontend-Toggle " + "'Text einfuegen' pro Cookie-Doc-Zeile) — dort parsen wir " + "Spalten deterministisch." + ).replace(",", "."), "legal_basis": "Art. 13(1)(e) DSGVO — die Empfaengerliste muss " "vollstaendig sein; ein unvollstaendiger Audit darf " "nicht als vollstaendig dargestellt werden.", diff --git a/backend-compliance/compliance/services/cookie_compliance_audit.py b/backend-compliance/compliance/services/cookie_compliance_audit.py new file mode 100644 index 00000000..976f9ea2 --- /dev/null +++ b/backend-compliance/compliance/services/cookie_compliance_audit.py @@ -0,0 +1,221 @@ +""" +Cookie-Compliance-Audit — 3-Quellen-Vergleich. + +DAS ist der eigentliche Mehrwert des Tools: +* A. Was in der Cookie-Richtlinie DEKLARIERT ist (Text-Parse) +* B. Was im Browser TATSAECHLICH GELADEN wurde (after_accept) +* C. Was unsere LIBRARY ueber den Cookie weiss (Vendor, Kategorie) + +Daraus 3 Listen: +1. ✓ deklariert + geladen + library-bekannt → compliant +2. ❌ geladen aber NICHT deklariert → HIGH-Verstoss (Art. 13(1)(c) DSGVO) +3. ⚠️ deklariert aber NICHT geladen → Tabelle veraltet (LOW) +4. 🔍 deklariert + Library-Kategorie weicht ab → Pruefanlass +""" + +from __future__ import annotations + +import logging +import re +from typing import Iterable + +from sqlalchemy import text as sa_text +from sqlalchemy.orm import Session + +logger = logging.getLogger(__name__) + + +def _normalize_cookie_name(name: str) -> str: + """Wildcard-Cookies wie 'AMCV_*', 'pm_sess_NNN' werden auf Prefix + reduziert damit '_ga' und '_ga_GTM-XXX' als ein Cookie zaehlen.""" + if not name: + return "" + s = name.strip() + # AMCV_*, sc_v44, etc. + s = re.sub(r"[<\[].*?[>\]]", "", s) # entferne , [...] + s = s.rstrip("*").rstrip("_") + s = re.sub(r"_NNN$|_\d+$", "", s) + return s.lower() + + +def _extract_declared_cookies(cookie_doc_text: str | None) -> set[str]: + """Liest Cookie-Namen aus dem Cookie-Richtlinien-Text. + Nutzt zuerst parse_cookie_table (Block/Tab-Format), dann + parse_flat_cookie_text (Anchor-Pattern). + """ + if not cookie_doc_text: + return set() + declared: set[str] = set() + try: + from compliance.services.cookies_table_parser import ( + parse_cookie_table, parse_flat_cookie_text, + ) + for v in parse_cookie_table(cookie_doc_text): + for c in (v.get("cookies") or []): + if isinstance(c, dict) and c.get("name"): + declared.add(_normalize_cookie_name(c["name"])) + for v in parse_flat_cookie_text(cookie_doc_text): + for c in (v.get("cookies") or []): + if isinstance(c, dict) and c.get("name"): + declared.add(_normalize_cookie_name(c["name"])) + except Exception as e: + logger.warning("declared-cookie-extract failed: %s", e) + return {n for n in declared if n} + + +def _extract_browser_cookies(banner_result: dict | None) -> set[str]: + """Liest Cookie-Namen aus banner_result.phases.after_accept.cookies.""" + out: set[str] = set() + if not isinstance(banner_result, dict): + return out + phases = banner_result.get("phases") or {} + for ph_name in ("after_accept", "before_consent", "after_reject"): + ph = phases.get(ph_name) or {} + if not isinstance(ph, dict): + continue + for c in (ph.get("cookies") or []): + if isinstance(c, str): + out.add(_normalize_cookie_name(c)) + elif isinstance(c, dict) and c.get("name"): + out.add(_normalize_cookie_name(c["name"])) + return {n for n in out if n} + + +def _lookup_library(db: Session, names: Iterable[str]) -> dict[str, dict]: + """Liefert {normalized_name: {category, vendor}} aus cookie_library.""" + nl = [n for n in names if n] + if not nl: + return {} + try: + rows = db.execute(sa_text( + "SELECT cookie_name, actual_category, vendor_name " + "FROM compliance.cookie_library " + "WHERE LOWER(cookie_name) = ANY(:lc)" + ), {"lc": nl}).fetchall() + return {r[0].lower(): {"category": r[1], "vendor": r[2]} for r in rows} + except Exception as e: + logger.warning("library lookup failed: %s", e) + return {} + + +def audit_cookie_compliance( + db: Session | None, + cookie_doc_text: str | None, + banner_result: dict | None, +) -> dict: + """Hauptfunktion: liefert dict mit 4 Listen + counts.""" + declared = _extract_declared_cookies(cookie_doc_text) + browser = _extract_browser_cookies(banner_result) + + all_names = declared | browser + library = _lookup_library(db, all_names) if db else {} + + declared_only = declared - browser + browser_only = browser - declared + both = declared & browser + + return { + "declared_count": len(declared), + "browser_count": len(browser), + "library_count": len(library), + "compliant": sorted(both), + "undeclared_in_browser": sorted(browser_only), + "declared_not_loaded": sorted(declared_only), + "library_metadata": library, + "high_findings": len(browser_only), + "low_findings": len(declared_only), + } + + +def build_cookie_audit_block_html(audit: dict) -> str: + """Rendert den 3-Spalten-Vergleichs-Block in die Mail.""" + if not audit: + return "" + n_dec = audit.get("declared_count", 0) + n_brw = audit.get("browser_count", 0) + n_undecl = len(audit.get("undeclared_in_browser") or []) + n_dec_only = len(audit.get("declared_not_loaded") or []) + n_both = len(audit.get("compliant") or []) + + sev_color = "#dc2626" if n_undecl else "#16a34a" + + undecl_html = "" + if audit.get("undeclared_in_browser"): + undecl_html = ( + '
' + f'❌ {n_undecl} Cookie' + f'{"s" if n_undecl != 1 else ""} im Browser geladen, ' + 'aber NICHT in der Cookie-Richtlinie deklariert:' + '
' + + ", ".join(audit["undeclared_in_browser"][:50]) + + (f' ... +{n_undecl - 50} weitere' + if n_undecl > 50 else '') + + '
' + '
Art. 13(1)(c) DSGVO + § 25 TDDDG — ' + 'die Empfaengerliste muss vollstaendig sein. Diese Cookies ' + 'sind potenziell ungenannte Verarbeitungen.
' + '
' + ) + + dec_only_html = "" + if audit.get("declared_not_loaded"): + dec_only_html = ( + '
' + f'⚠️ {n_dec_only} Cookie' + f'{"s" if n_dec_only != 1 else ""} in der Richtlinie ' + 'deklariert, aber bei diesem Audit NICHT im Browser gesehen:' + '
' + + ", ".join(audit["declared_not_loaded"][:50]) + + (f' ... +{n_dec_only - 50} weitere' + if n_dec_only > 50 else '') + + '
' + '
Kein direkter Verstoss — die Cookies ' + 'koennen nur in bestimmten User-Journeys / Geo-Regionen / ' + 'eingeloggten Zustaenden geladen werden. Empfehlung: ' + 'pruefen ob die Cookie-Richtlinie veraltet ist.
' + '
' + ) + + compliant_html = "" + if audit.get("compliant"): + compliant_html = ( + '
' + f'✓ {n_both} Cookie' + f'{"s" if n_both != 1 else ""} sowohl deklariert als auch geladen ' + '(compliant):' + '
' + + ", ".join(audit["compliant"][:50]) + + (f' ... +{n_both - 50} weitere' + if n_both > 50 else '') + + '
' + '
' + ) + + return ( + '
' + f'
' + 'Cookie-Compliance-Audit — 3-Quellen-Vergleich
' + '

' + f'{n_dec} in Richtlinie · {n_brw} im Browser · ' + f'{n_both} compliant · {n_undecl} undokumentiert · ' + f'{n_dec_only} nicht geladen

' + '

' + 'Wir vergleichen die in der Cookie-Richtlinie genannten Cookies ' + 'mit dem was der Browser nach Akzeptieren tatsaechlich laed. ' + 'Undokumentierte Cookies im Browser sind ein direkter Verstoss ' + 'gegen die DSGVO-Informationspflicht.' + '

' + + undecl_html + dec_only_html + compliant_html + + '
' + ) diff --git a/backend-compliance/compliance/services/cookies_table_parser.py b/backend-compliance/compliance/services/cookies_table_parser.py index 2d47ec21..0ce67068 100644 --- a/backend-compliance/compliance/services/cookies_table_parser.py +++ b/backend-compliance/compliance/services/cookies_table_parser.py @@ -79,10 +79,116 @@ def _parse_persistence(s: str) -> str: return "" +_CATEGORY_INDICATORS = ( + "funktionscookie", "tracking cookie", "trackingcookie", + "marketing", "analytics", "necessary", "notwendig", + "performance", "session cookie", "persistent cookie", + "permanent cookie", "permanent/protokoll", "sitzungs-cookie", +) + + +def parse_block_format(text: str) -> list[dict]: + """Block-Format (Browser-Copy aus VW/BMW/Mercedes ohne Tab-Trenner): + Pro Cookie 5 Zeilen: Name / Kategorie / Zweck / Speicherdauer / Art. + + Heuristik: gehe ueber alle Zeilen. Wenn eine Zeile NICHT eine + Kategorie/Dauer/Art ist und die naechste eine Kategorie enthaelt + → das ist ein Cookie-Name. Sammle die naechsten 4 Zeilen als + Kategorie/Zweck/Dauer/Art. + """ + if not text or len(text) < 100: + return [] + raw_lines = [ln.strip() for ln in text.splitlines()] + # Aggressive newline-collapse: leere Zeilen entfernen, aber Zeilen + # die Teil eines mehrzeiligen Zwecks sind moegen separat bleiben. + lines = [ln for ln in raw_lines if ln] + if len(lines) < 10: + return [] + + # Drop the header row(s) if present + start = 0 + if lines[0].lower() in ("name des cookies", "cookie name", "name"): + start = 5 if len(lines) > 5 else 1 + + by_vendor: dict[str, dict] = {} + seen_names: set[str] = set() + i = start + while i < len(lines) - 2: + name_line = lines[i] + cat_line = lines[i + 1] if i + 1 < len(lines) else "" + # Verify cat_line is a category indicator (otherwise the + # block is malformed — skip 1 line and try again). + if not any(c in cat_line.lower() for c in _CATEGORY_INDICATORS): + i += 1 + continue + # Cookie-Name validation + nl = name_line.lower().strip() + if (not name_line or len(name_line) > 80 + or len(name_line) < 2 + or any(c in nl for c in _CATEGORY_INDICATORS) + or nl in seen_names + or nl in ("name des cookies", "kategorie", + "verwendungszweck", "speicherdauer", + "art des cookies")): + i += 1 + continue + # Look ahead for the Art-Cookie line (max 8 lines forward) + purpose_parts: list[str] = [] + persistence = "" + art = "" + j = i + 2 + while j < min(i + 12, len(lines)): + ln = lines[j] + ll = ln.lower() + if any(t in ll for t in ( + "permanent/protokoll", "session cookie", + "persistent cookie", "permanent cookie", + "sitzungs-cookie", "permanent/ protokoll", + )): + art = ln + if not persistence and j > i + 2: + persistence = lines[j - 1] + break + purpose_parts.append(ln) + j += 1 + purpose = " ".join(purpose_parts[:-1]) if len(purpose_parts) > 1 else " ".join(purpose_parts) + purpose = purpose[:500].strip() + + seen_names.add(nl) + provider = _guess_vendor(name_line) or "Unbekannter Anbieter (VW-intern)" + # Marketing-Cookies = Drittanbieter + if "marketing" in cat_line.lower() or "tracking" in cat_line.lower(): + if provider == "Unbekannter Anbieter (VW-intern)": + provider = "Unbekannter Drittanbieter (Marketing)" + entry = by_vendor.setdefault(provider, { + "name": provider, "country": "", + "purpose": "", "category": _normalize_category(cat_line), + "opt_out_url": "", "privacy_policy_url": "", + "persistence": "", + "cookies": [], + "source": "block_paste", + }) + entry["cookies"].append({ + "name": name_line, + "purpose": purpose[:300], + "expiry": persistence, + "is_third_party": "tracking" in cat_line.lower() or "marketing" in cat_line.lower(), + }) + i = j + 1 if art else i + 5 + + out = list(by_vendor.values()) + logger.info("parse_block_format: %d vendors / %d cookies", + len(out), sum(len(v["cookies"]) for v in out)) + return out + + def parse_cookie_table(text: str) -> list[dict]: """Returns vendor-records aus einer copy-pasted Cookie-Tabelle. - Bei nicht-tabellarischem Text: return []. + Probiert in dieser Reihenfolge: + 1. Tab/Pipe/Komma-getrennt (klassisches Tabellen-Layout) + 2. 5-Zeilen-Block-Format (VW Browser-Copy) + 3. return [] """ if not text or len(text) < 100: return [] @@ -98,6 +204,10 @@ def parse_cookie_table(text: str) -> list[dict]: if sep: sep_counts[sep] = sep_counts.get(sep, 0) + 1 if not sep_counts or max(sep_counts.values()) < 3: + # Kein Separator-Format → versuche Block-Format + block_vendors = parse_block_format(text) + if block_vendors: + return block_vendors return [] sep = max(sep_counts, key=sep_counts.get) @@ -257,22 +367,67 @@ def parse_flat_cookie_text(text: str) -> list[dict]: _VENDOR_GUESS = ( + # Google-Familie (alles unter "Google" zusammenfassen — Dedup kuemmert sich) ("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"), ("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"), - ("IDE", "Google DoubleClick"), ("DSID", "Google"), - ("_fbp", "Meta / Facebook"), ("fr", "Meta / Facebook"), + ("FPAU", "Google"), ("FLC", "Google"), ("APC", "Google"), + ("IDE", "Google"), ("DSID", "Google"), ("TAID", "Google"), + ("NID", "Google"), ("1P_JAR", "Google"), + # Meta / Facebook + ("_fbp", "Meta / Facebook"), ("_fbc", "Meta / Facebook"), + # fr ist Meta-Cookie, nur wenn keine andere Site-eigene Verwendung + # Microsoft / Bing ("_pin_unauth", "Pinterest"), ("_uetsid", "Microsoft Bing"), ("_uetvid", "Microsoft Bing"), ("MUID", "Microsoft"), + # Soziale Netzwerke ("tt_", "TikTok"), ("li_at", "LinkedIn"), + # CMP ("OptanonConsent", "OneTrust"), ("cookieconsent", "Borlabs / Cookie-CMP"), + ("CookieConsentPolicy", "Borlabs / Cookie-CMP"), + # Analytics ("eta_", "etracker"), ("matomo", "Matomo"), ("_hjid", "Hotjar"), ("_hj", "Hotjar"), - ("__cf", "Cloudflare"), ("datadome", "DataDome"), - ("incap_", "Imperva Incapsula"), ("ajs_", "Segment"), ("amp_", "Amplitude"), + # Adobe-Familie ("sat_track", "Adobe Experience Cloud"), - ("AMCV_", "Adobe Experience Cloud"), + ("AMCV", "Adobe Experience Cloud"), + ("AMCVS", "Adobe Experience Cloud"), + ("demdex", "Adobe Experience Cloud"), + ("dextp", "Adobe Experience Cloud"), + ("dpm", "Adobe Experience Cloud"), + ("mbox", "Adobe Target"), + ("smartSignals", "Adobe Experience Cloud"), + ("adbCDP", "Adobe Experience Cloud"), ("s_cc", "Adobe Analytics"), ("s_sq", "Adobe Analytics"), + ("s_ecid", "Adobe Analytics"), ("s_vi", "Adobe Analytics"), + ("s_fid", "Adobe Analytics"), ("s_plt", "Adobe Analytics"), + ("s_pltp", "Adobe Analytics"), ("s_invisit", "Adobe Analytics"), + ("s_vnc365", "Adobe Analytics"), ("s_ivc", "Adobe Analytics"), + ("sc_appvn", "Adobe Analytics"), ("sc_pCmp", "Adobe Analytics"), + ("sc_prevpage", "Adobe Analytics"), ("sc_prop", "Adobe Analytics"), + ("sc_v17", "Adobe Analytics"), ("sc_v44", "Adobe Analytics"), + ("sc_v49", "Adobe Analytics"), + # The Trade Desk + ("TDID", "The Trade Desk"), ("TDCPM", "The Trade Desk"), + ("TTDOptOut", "The Trade Desk"), + # AdForm + ("uid", "AdForm"), ("cid", "AdForm"), ("otsid", "AdForm"), + # everest + ("everest", "Adobe Advertising Cloud (everest)"), + # Infra/CDN + ("__cf", "Cloudflare"), ("datadome", "DataDome"), + ("incap_", "Imperva Incapsula"), ("awsalb", "AWS Load Balancer"), + # Salesforce + ("sfdc-", "Salesforce"), ("X-Salesforce", "Salesforce"), + ("liveagent_", "Salesforce LiveAgent"), + # Inbenta + ("inbenta", "Inbenta"), + # Sonstige Tracker + ("_pk_", "Matomo / Piwik"), + ("hmt_", "Akamai mPulse"), + # EDAA / Industry Self-regulation + ("EDAAT", "EDAA / Online Choices"), + ("Eboptout", "EDAA / Online Choices"), ) diff --git a/backend-compliance/compliance/services/vendor_normalizer.py b/backend-compliance/compliance/services/vendor_normalizer.py new file mode 100644 index 00000000..0d93a51b --- /dev/null +++ b/backend-compliance/compliance/services/vendor_normalizer.py @@ -0,0 +1,167 @@ +""" +Vendor-Deduplizierung und Garbage-Filter. + +Normalisiert Vendor-Namen (Google + Google DoubleClick + DoubleClick/Google +Marketing → eine Eintragung) und entfernt Garbage-Eintraege die fälschlich +als Vendor erkannt wurden ('click to select a dealership', 'Mehrere OEMs', +URL-Fragmente, etc.). + +Wird nach allen Vendor-Sources (LLM, Library, Pattern, Phase-G) angewandt +bevor die VVT-Tabelle gerendert wird. +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + + +# Aliase: alle Schreibweisen → kanonischer Name +_VENDOR_ALIASES: dict[str, str] = { + # Google-Familie + "google": "Google", + "google llc": "Google", + "google inc": "Google", + "google marketing platform": "Google", + "google ads": "Google", + "google adsense": "Google", + "google analytics": "Google Analytics", + "google tag manager": "Google Tag Manager", + "google doubleclick": "Google", + "doubleclick": "Google", + "doubleclick/google marketing": "Google", + "doubleclick by google": "Google", + # Adobe-Familie + "adobe": "Adobe", + "adobe inc": "Adobe", + "adobe systems": "Adobe", + "adobe analytics": "Adobe Analytics", + "adobe audience manager": "Adobe Audience Manager", + "adobe experience cloud": "Adobe Experience Cloud", + "adobe target": "Adobe Target", + "adobe advertising cloud (everest)": "Adobe Advertising Cloud", + # Trade Desk + "the trade desk": "The Trade Desk", + "tradedesk": "The Trade Desk", + "the tradedesk": "The Trade Desk", + "trade desk": "The Trade Desk", + # Meta + "meta": "Meta / Facebook", + "meta platforms": "Meta / Facebook", + "facebook": "Meta / Facebook", + "meta / facebook": "Meta / Facebook", + # AdForm + "adform": "AdForm", + "adform dsp": "AdForm", + # Microsoft + "microsoft": "Microsoft", + "microsoft bing": "Microsoft Bing", + "linkedin": "LinkedIn (Microsoft)", + "linkedin corporation": "LinkedIn (Microsoft)", + # CMP + "onetrust": "OneTrust", + "cookiebot": "Cookiebot", + "usercentrics": "Usercentrics", + "borlabs": "Borlabs", + "borlabs / cookie-cmp": "Borlabs", + # Salesforce + "salesforce": "Salesforce", + "salesforce liveagent": "Salesforce", + "liveagent": "Salesforce", + # Cloudflare + "cloudflare": "Cloudflare", +} + + +# Garbage-Patterns: wenn der Vendor-Name darauf matched → wegfiltern +_GARBAGE_PATTERNS = ( + re.compile(r"^click to ", re.I), + re.compile(r"^mehrere oems", re.I), + re.compile(r"^breakpilot[-_ ]?snapshot", re.I), + re.compile(r"^https?://", re.I), # URLs + re.compile(r"^https?$", re.I), + re.compile(r"^javascript:", re.I), + re.compile(r"^undefined$|^null$|^none$", re.I), + re.compile(r"^[\d\W]+$"), # nur Zahlen/Symbole + re.compile(r"^.{1,2}$"), # Ein-/Zwei-Zeichen-"Namen" + re.compile(r"^(ein|der|die|das|von|und|aber|oder)$", re.I), + re.compile(r"^cookie$|^cookies$", re.I), +) + + +def _is_garbage(name: str) -> bool: + if not name or len(name.strip()) < 2: + return True + if len(name) > 120: + return True + return any(p.search(name) for p in _GARBAGE_PATTERNS) + + +def _canonical_name(name: str) -> str: + nl = name.strip().lower() + if nl in _VENDOR_ALIASES: + return _VENDOR_ALIASES[nl] + # Sub-token-Match: 'doubleclick by google' → enthaelt 'doubleclick' + for alias, canonical in _VENDOR_ALIASES.items(): + if alias in nl and len(alias) >= 6: + return canonical + return name.strip() + + +def normalize_vendors(vendors: list[dict]) -> list[dict]: + """Filtert Garbage + dedupliziert anhand kanonischer Aliase. + + Mergt cookies-Listen wenn der gleiche Vendor mehrfach erscheint + (z.B. aus LLM + Library + Phase-G). Behaelt Metadaten des Eintrags + mit der laengsten cookies-Liste. + """ + if not vendors: + return [] + by_canon: dict[str, dict] = {} + dropped_garbage = 0 + merged = 0 + for v in vendors: + if not isinstance(v, dict): + continue + raw_name = (v.get("name") or "").strip() + if _is_garbage(raw_name): + dropped_garbage += 1 + continue + canon = _canonical_name(raw_name) + if canon in by_canon: + # Merge: cookies vereinen, source-Tags joinen + ex = by_canon[canon] + ex_cookies = ex.get("cookies") or [] + new_cookies = v.get("cookies") or [] + seen_ck = {(c.get("name") or "").lower() for c in ex_cookies if isinstance(c, dict)} + for c in new_cookies: + if isinstance(c, dict): + nm = (c.get("name") or "").strip().lower() + if nm and nm not in seen_ck: + ex_cookies.append(c) + seen_ck.add(nm) + ex["cookies"] = ex_cookies + # Source-Tag merging (semicolon-separated) + ex_src = (ex.get("source") or "").split(";") + new_src = v.get("source") or "" + if new_src and new_src not in ex_src: + ex_src.append(new_src) + ex["source"] = ";".join([s for s in ex_src if s]) + # Bessere Metadaten uebernehmen (falls leer) + for k in ("country", "opt_out_url", "privacy_policy_url", + "purpose", "category", "persistence"): + if not ex.get(k) and v.get(k): + ex[k] = v[k] + merged += 1 + else: + v["name"] = canon + by_canon[canon] = v + if dropped_garbage or merged: + logger.info( + "Vendor-Normalizer: %d garbage dropped, %d duplicate merges, " + "%d unique vendors (input: %d)", + dropped_garbage, merged, len(by_canon), len(vendors), + ) + return list(by_canon.values()) diff --git a/backend-compliance/tests/fixtures/cookie_gt/vw_cookie_richtlinie.txt b/backend-compliance/tests/fixtures/cookie_gt/vw_cookie_richtlinie.txt new file mode 100644 index 00000000..38a11c94 --- /dev/null +++ b/backend-compliance/tests/fixtures/cookie_gt/vw_cookie_richtlinie.txt @@ -0,0 +1,55 @@ +Name des Cookies +Kategorie +Verwendungszweck +Speicherdauer +Art des Cookies +VWD6_ENSIGHTEN_PRIVACY_MODAL_LOADED +Funktionscookie +Dieses Cookie speichert, ob für den User der Cookie Manager angezeigt wurde. +1 Jahr +Permanent/Protokoll +VWD6_ENSIGHTEN_PRIVACY_MODAL_VIEWED +Funktionscookie +Dieses Cookie speichert, ob für der User Einstellung im Cookie Manager vorgenommen hat. +1 Jahr +Permanent/Protokoll +VWD6_ENSIGHTEN_PRIVACY_ +Funktionscookie +Dieses Cookie speichert, ob der User sein Einverständnis für die entsprechende Cookie Kategorie gegeben hat. +1 Jahr +Permanent/Protokoll +UZ_TI_dc_value +Funktionscookie +Dieses Cookie verfolgt die Studien-ID oder die Segment-ID in Abhängigkeit vom Wert von UZ_TI_dc_value. +20 Tage +Persistent cookie +awsalb +Funktionscookie +Der Cookie prüft, welcher Load Balancer für die aktuelle Session verwendet wird. +7 Tage +Persistent cookie +UZ_TI_S_ +Funktionscookie +Der Cookie erfasst, ob ein anderer Cookie für jedes Segment verwendet wird. +20 Tage +Persistent cookie +smartSignals2UiD +Trackingcookie (Analytics & Personalisierung) +Dieses Cookie enthält eine eindeutige, zufällig generierte ID für einen Webseiten User. +1 Jahr +Permanent/Protokoll +smartSignals2sUiD +Trackingcookie (Analytics & Personalisierung) +userId verbesserter Mechanismus zur Browser-Tracking-Einschraenkungen +1 Jahr +Permanent/Protokoll +smartSignals2CP +Trackingcookie (Analytics & Personalisierung) +Personalisierte Inhalte angezeigt +30 Minuten +Session Cookie +s_ecid +Trackingcookie (Analytics & Personalisierung) +First-Party-Cookie Besucherkennung +13 Monate nach dem letzten Besuch +Permanent/Protokoll