""" Parst Cookie-Tabellen die der User direkt ins Frontend kopiert. Typische Quellen: * Browser-Copy aus VW/BMW/Mercedes Cookie-Richtlinie (Tab-getrennt) * Excel-Export aus Borlabs / OneTrust / Cookiebot Admin (CSV / Pipe) * Markdown-Tabelle aus interner Doku Erkennt 4 Spalten-Layouts (heuristisch): 1. [Name, Kategorie, Beschreibung, Speicherdauer, Provider] 2. [Name, Provider, Zweck, Speicherdauer] 3. [Name, Beschreibung, Speicherdauer] 4. nur [Name, Speicherdauer] Output: gleiche Vendor-Record-Struktur wie vendor_extractor / LLM — damit der Rest der Pipeline (VVT-Tabelle, Library-Mismatch-Check) ohne Aenderung weiterlaeuft. """ from __future__ import annotations import logging import re logger = logging.getLogger(__name__) _CATEGORY_LABELS = ( "notwendig", "essential", "funktional", "tracking", "marketing", "statistik", "analyse", "analytics", "performance", "werbung", "advertising", "targeting", "preferences", "social_media", "strictly necessary", "personalisierung", ) def _looks_like_separator(line: str) -> str | None: """Detect the column-separator of a tabular line.""" if "\t" in line and line.count("\t") >= 2: return "\t" if " | " in line and line.count(" | ") >= 2: return " | " if ";" in line and line.count(";") >= 2 and "," not in line[:20]: return ";" if "," in line and line.count(",") >= 3: return "," return None def _normalize_category(s: str) -> str: sl = s.lower().strip() for cat in _CATEGORY_LABELS: if cat in sl: if cat in ("notwendig", "essential", "strictly necessary"): return "essential" if cat in ("tracking", "marketing", "werbung", "advertising", "targeting"): return "marketing" if cat in ("statistik", "analyse", "analytics", "performance"): return "statistics" if cat == "funktional": return "functional" if cat == "social_media": return "social_media" return sl[:30] def _parse_persistence(s: str) -> str: """Extracts 'Speicherdauer' notation.""" m = re.search( r"(\d+\s*(sekunde|minute|stunde|tag|woche|monat|jahr|day|month|year)[^\s,;|]{0,5})", s, re.I, ) if m: return m.group(1).strip()[:80] if re.search(r"\bsession\b", s, re.I): return "Session" if re.search(r"permanent", s, re.I): return "Permanent" return "" def parse_cookie_table(text: str) -> list[dict]: """Returns vendor-records aus einer copy-pasted Cookie-Tabelle. Bei nicht-tabellarischem Text: return []. """ if not text or len(text) < 100: return [] lines = [ln.strip() for ln in text.splitlines() if ln.strip()] if not lines: return [] # Sample 30 lines to detect separator sample = lines[:60] sep_counts: dict[str, int] = {} for ln in sample: sep = _looks_like_separator(ln) if sep: sep_counts[sep] = sep_counts.get(sep, 0) + 1 if not sep_counts or max(sep_counts.values()) < 3: return [] sep = max(sep_counts, key=sep_counts.get) logger.info("cookies_table_parser: detected separator '%s' (%d hits)", sep, sep_counts[sep]) # Parse rows rows: list[list[str]] = [] for ln in lines: if sep in ln: parts = [p.strip().strip('"') for p in ln.split(sep)] if len(parts) >= 2 and parts[0]: rows.append(parts) if len(rows) < 3: return [] # Detect column layout from header (first row) or by content header_row = [c.lower() for c in rows[0]] has_header = any(h in " ".join(header_row) for h in ("cookie", "name", "anbieter", "provider", "zweck", "kategorie", "speicherdauer", "dauer")) data_rows = rows[1:] if has_header else rows # Map columns by header keyword or by position col_idx = {"name": 0, "provider": -1, "category": -1, "purpose": -1, "persistence": -1} if has_header: for i, h in enumerate(header_row): if "name" in h or "cookie" in h: col_idx["name"] = i elif "anbieter" in h or "provider" in h or "domain" in h: col_idx["provider"] = i elif "kategorie" in h or "type" in h or "art" in h: col_idx["category"] = i elif "zweck" in h or "purpose" in h or "beschreib" in h: col_idx["purpose"] = i elif "speicher" in h or "dauer" in h or "lebens" in h or "expir" in h: col_idx["persistence"] = i # Aggregate by vendor (or by name if no vendor column) by_vendor: dict[str, dict] = {} for r in data_rows: if len(r) < 2: continue name = r[col_idx["name"]] if col_idx["name"] < len(r) else r[0] name = (name or "").strip() if not name or len(name) > 120 or len(name) < 2: continue provider = "" if col_idx["provider"] >= 0 and col_idx["provider"] < len(r): provider = r[col_idx["provider"]].strip() if not provider: # Heuristik: wenn Spalte 'Anbieter' fehlt, raten aus Cookie-Name provider = _guess_vendor(name) if not provider: provider = "Unbekannter Anbieter" category = "" purpose = "" persistence = "" if col_idx["category"] >= 0 and col_idx["category"] < len(r): category = _normalize_category(r[col_idx["category"]]) if col_idx["purpose"] >= 0 and col_idx["purpose"] < len(r): purpose = r[col_idx["purpose"]][:500] if col_idx["persistence"] >= 0 and col_idx["persistence"] < len(r): persistence = _parse_persistence(r[col_idx["persistence"]]) if not category: # Inferieren aus purpose-Text category = _normalize_category(purpose) entry = by_vendor.setdefault(provider, { "name": provider, "country": "", "purpose": purpose[:300] if purpose else "", "category": category, "opt_out_url": "", "privacy_policy_url": "", "persistence": persistence, "cookies": [], "source": "table_paste", }) entry["cookies"].append({ "name": name, "purpose": purpose[:200], "expiry": persistence, "is_third_party": True, }) out = list(by_vendor.values()) logger.info("cookies_table_parser: %d vendors / %d cookies parsed", len(out), sum(len(v["cookies"]) for v in out)) return out _VENDOR_GUESS = ( ("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"), ("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"), ("IDE", "Google DoubleClick"), ("DSID", "Google"), ("_fbp", "Meta / Facebook"), ("fr", "Meta / Facebook"), ("_pin_unauth", "Pinterest"), ("_uetsid", "Microsoft Bing"), ("_uetvid", "Microsoft Bing"), ("MUID", "Microsoft"), ("tt_", "TikTok"), ("li_at", "LinkedIn"), ("OptanonConsent", "OneTrust"), ("cookieconsent", "Borlabs / Cookie-CMP"), ("eta_", "etracker"), ("matomo", "Matomo"), ("_hjid", "Hotjar"), ("_hj", "Hotjar"), ("__cf", "Cloudflare"), ("datadome", "DataDome"), ("incap_", "Imperva Incapsula"), ("ajs_", "Segment"), ("amp_", "Amplitude"), ("sat_track", "Adobe Experience Cloud"), ("AMCV_", "Adobe Experience Cloud"), ("s_cc", "Adobe Analytics"), ("s_sq", "Adobe Analytics"), ) def _guess_vendor(cookie_name: str) -> str: nl = cookie_name.lower() for prefix, vendor in _VENDOR_GUESS: if nl.startswith(prefix.lower()) or prefix.lower() in nl: return vendor return ""