Files
breakpilot-compliance/backend-compliance/compliance/services/cookies_table_parser.py
T
Benjamin Admin 4434e3827b
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / detect-changes (push) Successful in 10s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 17s
CI / loc-budget (push) Failing after 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 40s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
fix(audit): parse_flat_cookie_text — Anchor-Pattern fuer VW-textContent
VW Cookie-Doc-textContent verkettet HTML-Tabellen-Zellen OHNE Whitespace:
'Permanent/Protokoll_fbcTracking Cookies (Marketing)...'

Neues Pattern hat 2 Anker:
* Davor: typisches End-Token einer vorherigen Zelle (Permanent/Protokoll,
  Session Cookie, Persistent Cookie, TagePersistent, ...)
* Danach: Kategorie-Token (Tracking Cookies, Funktionscookie, Marketing,
  Analytics, Necessary)
Dazwischen: Cookie-Name (3-50 Zeichen, alphanum/_/-)

VW-Test (snapshot 4a465783): findet jetzt 40 unique Cookie-Namen,
aggregiert zu 6 Vendors (Google, DoubleClick, Cloudflare, Borlabs,
Meta, Unbekannter Anbieter mit 22 VW-internen Cookies).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 21:33:58 +02:00

285 lines
10 KiB
Python

"""
Parst Cookie-Tabellen die der User direkt ins Frontend kopiert.
Typische Quellen:
* Browser-Copy aus VW/BMW/Mercedes Cookie-Richtlinie (Tab-getrennt)
* Excel-Export aus Borlabs / OneTrust / Cookiebot Admin (CSV / Pipe)
* Markdown-Tabelle aus interner Doku
Erkennt 4 Spalten-Layouts (heuristisch):
1. [Name, Kategorie, Beschreibung, Speicherdauer, Provider]
2. [Name, Provider, Zweck, Speicherdauer]
3. [Name, Beschreibung, Speicherdauer]
4. nur [Name, Speicherdauer]
Output: gleiche Vendor-Record-Struktur wie vendor_extractor / LLM —
damit der Rest der Pipeline (VVT-Tabelle, Library-Mismatch-Check) ohne
Aenderung weiterlaeuft.
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
_CATEGORY_LABELS = (
"notwendig", "essential", "funktional", "tracking", "marketing",
"statistik", "analyse", "analytics", "performance", "werbung",
"advertising", "targeting", "preferences", "social_media",
"strictly necessary", "personalisierung",
)
def _looks_like_separator(line: str) -> str | None:
"""Detect the column-separator of a tabular line."""
if "\t" in line and line.count("\t") >= 2:
return "\t"
if " | " in line and line.count(" | ") >= 2:
return " | "
if ";" in line and line.count(";") >= 2 and "," not in line[:20]:
return ";"
if "," in line and line.count(",") >= 3:
return ","
return None
def _normalize_category(s: str) -> str:
sl = s.lower().strip()
for cat in _CATEGORY_LABELS:
if cat in sl:
if cat in ("notwendig", "essential", "strictly necessary"):
return "essential"
if cat in ("tracking", "marketing", "werbung",
"advertising", "targeting"):
return "marketing"
if cat in ("statistik", "analyse", "analytics", "performance"):
return "statistics"
if cat == "funktional":
return "functional"
if cat == "social_media":
return "social_media"
return sl[:30]
def _parse_persistence(s: str) -> str:
"""Extracts 'Speicherdauer' notation."""
m = re.search(
r"(\d+\s*(sekunde|minute|stunde|tag|woche|monat|jahr|day|month|year)[^\s,;|]{0,5})",
s, re.I,
)
if m:
return m.group(1).strip()[:80]
if re.search(r"\bsession\b", s, re.I):
return "Session"
if re.search(r"permanent", s, re.I):
return "Permanent"
return ""
def parse_cookie_table(text: str) -> list[dict]:
"""Returns vendor-records aus einer copy-pasted Cookie-Tabelle.
Bei nicht-tabellarischem Text: return [].
"""
if not text or len(text) < 100:
return []
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
if not lines:
return []
# Sample 30 lines to detect separator
sample = lines[:60]
sep_counts: dict[str, int] = {}
for ln in sample:
sep = _looks_like_separator(ln)
if sep:
sep_counts[sep] = sep_counts.get(sep, 0) + 1
if not sep_counts or max(sep_counts.values()) < 3:
return []
sep = max(sep_counts, key=sep_counts.get)
logger.info("cookies_table_parser: detected separator '%s' (%d hits)",
sep, sep_counts[sep])
# Parse rows
rows: list[list[str]] = []
for ln in lines:
if sep in ln:
parts = [p.strip().strip('"') for p in ln.split(sep)]
if len(parts) >= 2 and parts[0]:
rows.append(parts)
if len(rows) < 3:
return []
# Detect column layout from header (first row) or by content
header_row = [c.lower() for c in rows[0]]
has_header = any(h in " ".join(header_row) for h in
("cookie", "name", "anbieter", "provider", "zweck",
"kategorie", "speicherdauer", "dauer"))
data_rows = rows[1:] if has_header else rows
# Map columns by header keyword or by position
col_idx = {"name": 0, "provider": -1, "category": -1,
"purpose": -1, "persistence": -1}
if has_header:
for i, h in enumerate(header_row):
if "name" in h or "cookie" in h:
col_idx["name"] = i
elif "anbieter" in h or "provider" in h or "domain" in h:
col_idx["provider"] = i
elif "kategorie" in h or "type" in h or "art" in h:
col_idx["category"] = i
elif "zweck" in h or "purpose" in h or "beschreib" in h:
col_idx["purpose"] = i
elif "speicher" in h or "dauer" in h or "lebens" in h or "expir" in h:
col_idx["persistence"] = i
# Aggregate by vendor (or by name if no vendor column)
by_vendor: dict[str, dict] = {}
for r in data_rows:
if len(r) < 2:
continue
name = r[col_idx["name"]] if col_idx["name"] < len(r) else r[0]
name = (name or "").strip()
if not name or len(name) > 120 or len(name) < 2:
continue
provider = ""
if col_idx["provider"] >= 0 and col_idx["provider"] < len(r):
provider = r[col_idx["provider"]].strip()
if not provider:
# Heuristik: wenn Spalte 'Anbieter' fehlt, raten aus Cookie-Name
provider = _guess_vendor(name)
if not provider:
provider = "Unbekannter Anbieter"
category = ""
purpose = ""
persistence = ""
if col_idx["category"] >= 0 and col_idx["category"] < len(r):
category = _normalize_category(r[col_idx["category"]])
if col_idx["purpose"] >= 0 and col_idx["purpose"] < len(r):
purpose = r[col_idx["purpose"]][:500]
if col_idx["persistence"] >= 0 and col_idx["persistence"] < len(r):
persistence = _parse_persistence(r[col_idx["persistence"]])
if not category:
# Inferieren aus purpose-Text
category = _normalize_category(purpose)
entry = by_vendor.setdefault(provider, {
"name": provider, "country": "",
"purpose": purpose[:300] if purpose else "",
"category": category,
"opt_out_url": "", "privacy_policy_url": "",
"persistence": persistence,
"cookies": [],
"source": "table_paste",
})
entry["cookies"].append({
"name": name, "purpose": purpose[:200],
"expiry": persistence, "is_third_party": True,
})
out = list(by_vendor.values())
logger.info("cookies_table_parser: %d vendors / %d cookies parsed",
len(out), sum(len(v["cookies"]) for v in out))
return out
# textContent-Output von HTML-Tabellen verkettet Zellen ohne Whitespace
# (z.B. VW: "Permanent/Protokoll_fbcTracking Cookies (Marketing)..."). Wir
# erkennen Cookie-Eintraege ueber 2 Anker:
# - Davor: typisches End-Token einer vorherigen Tabellen-Zelle
# (Speicherdauer-Suffix wie Permanent/Protokoll, Session Cookie, ...)
# - Danach: Kategorie-Token (Tracking Cookies, Funktionscookie, ...)
# Dazwischen: der Cookie-Name (3-50 Zeichen, alphanum/underscore/dash).
_FLAT_ROW_RE = re.compile(
r"(?:Permanent/Protokoll|Session Cookie|Persistent Cookie|"
r"TagePersistent|TageSitzungs-Cookie|TageSession Cookie|"
r"MinutenPersistent|MinutenSession Cookie|StundenPersistent|"
r"MonatePersistent|JahrePersistent)"
r"([A-Za-z_][A-Za-z0-9_\-\.]{1,40}?)"
r"(?=Tracking Cookies|Session Cookies|Funktionscookie|Funktional|"
r"Marketing|Analytics|Necessary)",
re.I,
)
def parse_flat_cookie_text(text: str) -> list[dict]:
"""Variante fuer Sites wie VW die ihre Cookie-Tabelle als flachen
Text liefern (textContent-Output ohne Whitespace zwischen Zellen).
Regex anchored auf vorherige Speicherdauer-Suffixe + folgende
Kategorie-Token → extrahiert den Cookie-Namen dazwischen.
"""
if not text or len(text) < 500:
return []
names = _FLAT_ROW_RE.findall(text)
if len(names) < 3:
return []
by_vendor: dict[str, dict] = {}
seen_names: set[str] = set()
for raw in names:
name = raw.strip()
nl = name.lower()
if nl in seen_names:
continue
if nl in ("dieser", "diese", "ein", "der", "die", "das",
"session", "permanent", "funktional", "notwendig",
"marketing", "analytics", "werbung", "anbieter",
"tracking", "cookie", "cookies", "und", "von",
"einer", "ist", "alle", "noch", "auch", "name",
"art", "zweck", "dauer", "test"):
continue
if len(name) < 3 or len(name) > 60:
continue
seen_names.add(nl)
vendor = _guess_vendor(name) or "Unbekannter Anbieter"
entry = by_vendor.setdefault(vendor, {
"name": vendor, "country": "",
"purpose": "", "category": "",
"opt_out_url": "", "privacy_policy_url": "",
"persistence": "",
"cookies": [],
"source": "flat_pattern",
})
entry["cookies"].append({
"name": name, "purpose": "",
"expiry": "", "is_third_party": True,
})
out = list(by_vendor.values())
logger.info("parse_flat_cookie_text: %d vendors / %d cookies",
len(out), sum(len(v["cookies"]) for v in out))
return out
_VENDOR_GUESS = (
("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"),
("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"),
("IDE", "Google DoubleClick"), ("DSID", "Google"),
("_fbp", "Meta / Facebook"), ("fr", "Meta / Facebook"),
("_pin_unauth", "Pinterest"), ("_uetsid", "Microsoft Bing"),
("_uetvid", "Microsoft Bing"), ("MUID", "Microsoft"),
("tt_", "TikTok"), ("li_at", "LinkedIn"),
("OptanonConsent", "OneTrust"), ("cookieconsent", "Borlabs / Cookie-CMP"),
("eta_", "etracker"), ("matomo", "Matomo"),
("_hjid", "Hotjar"), ("_hj", "Hotjar"),
("__cf", "Cloudflare"), ("datadome", "DataDome"),
("incap_", "Imperva Incapsula"),
("ajs_", "Segment"), ("amp_", "Amplitude"),
("sat_track", "Adobe Experience Cloud"),
("AMCV_", "Adobe Experience Cloud"),
("s_cc", "Adobe Analytics"), ("s_sq", "Adobe Analytics"),
)
def _guess_vendor(cookie_name: str) -> str:
nl = cookie_name.lower()
for prefix, vendor in _VENDOR_GUESS:
if nl.startswith(prefix.lower()) or prefix.lower() in nl:
return vendor
return ""