""" Parst Cookie-Tabellen die der User direkt ins Frontend kopiert. Typische Quellen: * Browser-Copy aus VW/BMW/Mercedes Cookie-Richtlinie (Tab-getrennt) * Excel-Export aus Borlabs / OneTrust / Cookiebot Admin (CSV / Pipe) * Markdown-Tabelle aus interner Doku Erkennt 4 Spalten-Layouts (heuristisch): 1. [Name, Kategorie, Beschreibung, Speicherdauer, Provider] 2. [Name, Provider, Zweck, Speicherdauer] 3. [Name, Beschreibung, Speicherdauer] 4. nur [Name, Speicherdauer] Output: gleiche Vendor-Record-Struktur wie vendor_extractor / LLM — damit der Rest der Pipeline (VVT-Tabelle, Library-Mismatch-Check) ohne Aenderung weiterlaeuft. """ from __future__ import annotations import logging import re logger = logging.getLogger(__name__) _CATEGORY_LABELS = ( "notwendig", "essential", "funktional", "tracking", "marketing", "statistik", "analyse", "analytics", "performance", "werbung", "advertising", "targeting", "preferences", "social_media", "strictly necessary", "personalisierung", ) def _looks_like_separator(line: str) -> str | None: """Detect the column-separator of a tabular line.""" if "\t" in line and line.count("\t") >= 2: return "\t" if " | " in line and line.count(" | ") >= 2: return " | " if ";" in line and line.count(";") >= 2 and "," not in line[:20]: return ";" if "," in line and line.count(",") >= 3: return "," return None def _normalize_category(s: str) -> str: sl = s.lower().strip() for cat in _CATEGORY_LABELS: if cat in sl: if cat in ("notwendig", "essential", "strictly necessary"): return "essential" if cat in ("tracking", "marketing", "werbung", "advertising", "targeting"): return "marketing" if cat in ("statistik", "analyse", "analytics", "performance"): return "statistics" if cat == "funktional": return "functional" if cat == "social_media": return "social_media" return sl[:30] def _parse_persistence(s: str) -> str: """Extracts 'Speicherdauer' notation.""" m = re.search( r"(\d+\s*(sekunde|minute|stunde|tag|woche|monat|jahr|day|month|year)[^\s,;|]{0,5})", s, re.I, ) if m: return m.group(1).strip()[:80] if re.search(r"\bsession\b", s, re.I): return "Session" if re.search(r"permanent", s, re.I): return "Permanent" return "" _CATEGORY_INDICATORS = ( "funktionscookie", "tracking cookie", "trackingcookie", "marketing", "analytics", "necessary", "notwendig", "performance", "session cookie", "persistent cookie", "permanent cookie", "permanent/protokoll", "sitzungs-cookie", ) def parse_block_format(text: str) -> list[dict]: """Block-Format (Browser-Copy aus VW/BMW/Mercedes ohne Tab-Trenner): Pro Cookie 5 Zeilen: Name / Kategorie / Zweck / Speicherdauer / Art. Heuristik: gehe ueber alle Zeilen. Wenn eine Zeile NICHT eine Kategorie/Dauer/Art ist und die naechste eine Kategorie enthaelt → das ist ein Cookie-Name. Sammle die naechsten 4 Zeilen als Kategorie/Zweck/Dauer/Art. """ if not text or len(text) < 100: return [] raw_lines = [ln.strip() for ln in text.splitlines()] # Aggressive newline-collapse: leere Zeilen entfernen, aber Zeilen # die Teil eines mehrzeiligen Zwecks sind moegen separat bleiben. lines = [ln for ln in raw_lines if ln] if len(lines) < 10: return [] # Drop the header row(s) if present start = 0 if lines[0].lower() in ("name des cookies", "cookie name", "name"): start = 5 if len(lines) > 5 else 1 by_vendor: dict[str, dict] = {} seen_names: set[str] = set() i = start while i < len(lines) - 2: name_line = lines[i] cat_line = lines[i + 1] if i + 1 < len(lines) else "" # Verify cat_line is a category indicator (otherwise the # block is malformed — skip 1 line and try again). if not any(c in cat_line.lower() for c in _CATEGORY_INDICATORS): i += 1 continue # Cookie-Name validation nl = name_line.lower().strip() if (not name_line or len(name_line) > 80 or len(name_line) < 2 or any(c in nl for c in _CATEGORY_INDICATORS) or nl in seen_names or nl in ("name des cookies", "kategorie", "verwendungszweck", "speicherdauer", "art des cookies")): i += 1 continue # Look ahead for the Art-Cookie line (max 8 lines forward) purpose_parts: list[str] = [] persistence = "" art = "" j = i + 2 while j < min(i + 12, len(lines)): ln = lines[j] ll = ln.lower() if any(t in ll for t in ( "permanent/protokoll", "session cookie", "persistent cookie", "permanent cookie", "sitzungs-cookie", "permanent/ protokoll", )): art = ln if not persistence and j > i + 2: persistence = lines[j - 1] break purpose_parts.append(ln) j += 1 purpose = " ".join(purpose_parts[:-1]) if len(purpose_parts) > 1 else " ".join(purpose_parts) purpose = purpose[:500].strip() seen_names.add(nl) provider = _guess_vendor(name_line) or "Unbekannter Anbieter (VW-intern)" # Marketing-Cookies = Drittanbieter if "marketing" in cat_line.lower() or "tracking" in cat_line.lower(): if provider == "Unbekannter Anbieter (VW-intern)": provider = "Unbekannter Drittanbieter (Marketing)" entry = by_vendor.setdefault(provider, { "name": provider, "country": "", "purpose": "", "category": _normalize_category(cat_line), "opt_out_url": "", "privacy_policy_url": "", "persistence": "", "cookies": [], "source": "block_paste", }) entry["cookies"].append({ "name": name_line, "purpose": purpose[:300], "expiry": persistence, "is_third_party": "tracking" in cat_line.lower() or "marketing" in cat_line.lower(), }) i = j + 1 if art else i + 5 out = list(by_vendor.values()) logger.info("parse_block_format: %d vendors / %d cookies", len(out), sum(len(v["cookies"]) for v in out)) return out def parse_cookie_table(text: str) -> list[dict]: """Returns vendor-records aus einer copy-pasted Cookie-Tabelle. Probiert in dieser Reihenfolge: 1. Tab/Pipe/Komma-getrennt (klassisches Tabellen-Layout) 2. 5-Zeilen-Block-Format (VW Browser-Copy) 3. return [] """ if not text or len(text) < 100: return [] lines = [ln.strip() for ln in text.splitlines() if ln.strip()] if not lines: return [] # Sample 30 lines to detect separator sample = lines[:60] sep_counts: dict[str, int] = {} for ln in sample: sep = _looks_like_separator(ln) if sep: sep_counts[sep] = sep_counts.get(sep, 0) + 1 if not sep_counts or max(sep_counts.values()) < 3: # Kein Separator-Format → versuche Block-Format block_vendors = parse_block_format(text) if block_vendors: return block_vendors return [] sep = max(sep_counts, key=sep_counts.get) logger.info("cookies_table_parser: detected separator '%s' (%d hits)", sep, sep_counts[sep]) # Parse rows rows: list[list[str]] = [] for ln in lines: if sep in ln: parts = [p.strip().strip('"') for p in ln.split(sep)] if len(parts) >= 2 and parts[0]: rows.append(parts) if len(rows) < 3: return [] # Detect column layout from header (first row) or by content header_row = [c.lower() for c in rows[0]] has_header = any(h in " ".join(header_row) for h in ("cookie", "name", "anbieter", "provider", "zweck", "kategorie", "speicherdauer", "dauer")) data_rows = rows[1:] if has_header else rows # Map columns by header keyword or by position col_idx = {"name": 0, "provider": -1, "category": -1, "purpose": -1, "persistence": -1} if has_header: for i, h in enumerate(header_row): if "name" in h or "cookie" in h: col_idx["name"] = i elif "anbieter" in h or "provider" in h or "domain" in h: col_idx["provider"] = i elif "kategorie" in h or "type" in h or "art" in h: col_idx["category"] = i elif "zweck" in h or "purpose" in h or "beschreib" in h: col_idx["purpose"] = i elif "speicher" in h or "dauer" in h or "lebens" in h or "expir" in h: col_idx["persistence"] = i # Aggregate by vendor (or by name if no vendor column) by_vendor: dict[str, dict] = {} for r in data_rows: if len(r) < 2: continue name = r[col_idx["name"]] if col_idx["name"] < len(r) else r[0] name = (name or "").strip() if not name or len(name) > 120 or len(name) < 2: continue provider = "" if col_idx["provider"] >= 0 and col_idx["provider"] < len(r): provider = r[col_idx["provider"]].strip() if not provider: # Heuristik: wenn Spalte 'Anbieter' fehlt, raten aus Cookie-Name provider = _guess_vendor(name) if not provider: provider = "Unbekannter Anbieter" category = "" purpose = "" persistence = "" if col_idx["category"] >= 0 and col_idx["category"] < len(r): category = _normalize_category(r[col_idx["category"]]) if col_idx["purpose"] >= 0 and col_idx["purpose"] < len(r): purpose = r[col_idx["purpose"]][:500] if col_idx["persistence"] >= 0 and col_idx["persistence"] < len(r): persistence = _parse_persistence(r[col_idx["persistence"]]) if not category: # Inferieren aus purpose-Text category = _normalize_category(purpose) entry = by_vendor.setdefault(provider, { "name": provider, "country": "", "purpose": purpose[:300] if purpose else "", "category": category, "opt_out_url": "", "privacy_policy_url": "", "persistence": persistence, "cookies": [], "source": "table_paste", }) entry["cookies"].append({ "name": name, "purpose": purpose[:200], "expiry": persistence, "is_third_party": True, }) out = list(by_vendor.values()) logger.info("cookies_table_parser: %d vendors / %d cookies parsed", len(out), sum(len(v["cookies"]) for v in out)) return out # textContent-Output von HTML-Tabellen verkettet Zellen ohne Whitespace # (z.B. VW: "Permanent/Protokoll_fbcTracking Cookies (Marketing)..."). Wir # erkennen Cookie-Eintraege ueber 2 Anker: # - Davor: typisches End-Token einer vorherigen Tabellen-Zelle # (Speicherdauer-Suffix wie Permanent/Protokoll, Session Cookie, ...) # - Danach: Kategorie-Token (Tracking Cookies, Funktionscookie, ...) # Dazwischen: der Cookie-Name (3-50 Zeichen, alphanum/underscore/dash). _FLAT_ROW_RE = re.compile( r"(?:Permanent/Protokoll|Session Cookie|Persistent Cookie|" r"TagePersistent|TageSitzungs-Cookie|TageSession Cookie|" r"MinutenPersistent|MinutenSession Cookie|StundenPersistent|" r"MonatePersistent|JahrePersistent)" r"([A-Za-z_][A-Za-z0-9_\-\.]{1,40}?)" r"(?=Tracking Cookies|Session Cookies|Funktionscookie|Funktional|" r"Marketing|Analytics|Necessary)", re.I, ) def parse_flat_cookie_text(text: str) -> list[dict]: """Variante fuer Sites wie VW die ihre Cookie-Tabelle als flachen Text liefern (textContent-Output ohne Whitespace zwischen Zellen). Regex anchored auf vorherige Speicherdauer-Suffixe + folgende Kategorie-Token → extrahiert den Cookie-Namen dazwischen. """ if not text or len(text) < 500: return [] names = _FLAT_ROW_RE.findall(text) if len(names) < 3: return [] by_vendor: dict[str, dict] = {} seen_names: set[str] = set() for raw in names: name = raw.strip() nl = name.lower() if nl in seen_names: continue if nl in ("dieser", "diese", "ein", "der", "die", "das", "session", "permanent", "funktional", "notwendig", "marketing", "analytics", "werbung", "anbieter", "tracking", "cookie", "cookies", "und", "von", "einer", "ist", "alle", "noch", "auch", "name", "art", "zweck", "dauer", "test"): continue if len(name) < 3 or len(name) > 60: continue seen_names.add(nl) vendor = _guess_vendor(name) or "Unbekannter Anbieter" entry = by_vendor.setdefault(vendor, { "name": vendor, "country": "", "purpose": "", "category": "", "opt_out_url": "", "privacy_policy_url": "", "persistence": "", "cookies": [], "source": "flat_pattern", }) entry["cookies"].append({ "name": name, "purpose": "", "expiry": "", "is_third_party": True, }) out = list(by_vendor.values()) logger.info("parse_flat_cookie_text: %d vendors / %d cookies", len(out), sum(len(v["cookies"]) for v in out)) return out _VENDOR_GUESS = ( # Google-Familie (alles unter "Google" zusammenfassen — Dedup kuemmert sich) ("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"), ("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"), ("FPAU", "Google"), ("FLC", "Google"), ("APC", "Google"), ("IDE", "Google"), ("DSID", "Google"), ("TAID", "Google"), ("NID", "Google"), ("1P_JAR", "Google"), # Meta / Facebook ("_fbp", "Meta / Facebook"), ("_fbc", "Meta / Facebook"), # fr ist Meta-Cookie, nur wenn keine andere Site-eigene Verwendung # Microsoft / Bing ("_pin_unauth", "Pinterest"), ("_uetsid", "Microsoft Bing"), ("_uetvid", "Microsoft Bing"), ("MUID", "Microsoft"), # Soziale Netzwerke ("tt_", "TikTok"), ("li_at", "LinkedIn"), # CMP ("OptanonConsent", "OneTrust"), ("cookieconsent", "Borlabs / Cookie-CMP"), ("CookieConsentPolicy", "Borlabs / Cookie-CMP"), # Analytics ("eta_", "etracker"), ("matomo", "Matomo"), ("_hjid", "Hotjar"), ("_hj", "Hotjar"), ("ajs_", "Segment"), ("amp_", "Amplitude"), # Adobe-Familie ("sat_track", "Adobe Experience Cloud"), ("AMCV", "Adobe Experience Cloud"), ("AMCVS", "Adobe Experience Cloud"), ("demdex", "Adobe Experience Cloud"), ("dextp", "Adobe Experience Cloud"), ("dpm", "Adobe Experience Cloud"), ("mbox", "Adobe Target"), ("smartSignals", "Adobe Experience Cloud"), ("adbCDP", "Adobe Experience Cloud"), ("s_cc", "Adobe Analytics"), ("s_sq", "Adobe Analytics"), ("s_ecid", "Adobe Analytics"), ("s_vi", "Adobe Analytics"), ("s_fid", "Adobe Analytics"), ("s_plt", "Adobe Analytics"), ("s_pltp", "Adobe Analytics"), ("s_invisit", "Adobe Analytics"), ("s_vnc365", "Adobe Analytics"), ("s_ivc", "Adobe Analytics"), ("sc_appvn", "Adobe Analytics"), ("sc_pCmp", "Adobe Analytics"), ("sc_prevpage", "Adobe Analytics"), ("sc_prop", "Adobe Analytics"), ("sc_v17", "Adobe Analytics"), ("sc_v44", "Adobe Analytics"), ("sc_v49", "Adobe Analytics"), # The Trade Desk ("TDID", "The Trade Desk"), ("TDCPM", "The Trade Desk"), ("TTDOptOut", "The Trade Desk"), # AdForm ("uid", "AdForm"), ("cid", "AdForm"), ("otsid", "AdForm"), # everest ("everest", "Adobe Advertising Cloud (everest)"), # Infra/CDN ("__cf", "Cloudflare"), ("datadome", "DataDome"), ("incap_", "Imperva Incapsula"), ("awsalb", "AWS Load Balancer"), # Salesforce ("sfdc-", "Salesforce"), ("X-Salesforce", "Salesforce"), ("liveagent_", "Salesforce LiveAgent"), # Inbenta ("inbenta", "Inbenta"), # Sonstige Tracker ("_pk_", "Matomo / Piwik"), ("hmt_", "Akamai mPulse"), # EDAA / Industry Self-regulation ("EDAAT", "EDAA / Online Choices"), ("Eboptout", "EDAA / Online Choices"), ) def _guess_vendor(cookie_name: str) -> str: nl = cookie_name.lower() for prefix, vendor in _VENDOR_GUESS: if nl.startswith(prefix.lower()) or prefix.lower() in nl: return vendor return ""