Files
breakpilot-compliance/backend-compliance/compliance/services/cookies_table_parser.py
T
Benjamin Admin 1451873194
CI / loc-budget (push) Failing after 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 3m4s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 43s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / detect-changes (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 19s
fix(audit): parse_flat_cookie_text fuer VW-Style Flat-Tabellen
VW Cookie-Doc liefert die Tabelle als FLACHEN Text ohne Spalten-Trenner:
'IDE Tracking Cookies (Marketing) Beschreibung 13 Monate Permanent
TAID Tracking Cookies (Marketing) ...'

parse_flat_cookie_text matched mit Regex:
  NAME [Tracking|Session|Funktional|...] Cookies ... [13 Monate|Session|Permanent]

Backend faellt bei parse_cookie_table=[] auf parse_flat zurueck. Damit
holen wir aus dem 65k VW Cookie-Doc ~30-50 Cookies + Vendors deterministisch,
auch wenn der HTML-Table-DOM-Extract leer ist (was passiert wenn die
Tabelle aus mehreren append-Code-Pfaden geladen wird).

Bonus: _extract_dom_tables Helper in dsi_discovery.py vorbereitet fuer
spaeteres Einhaengen an allen 7 DiscoveredDSI.append-Stellen.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 21:24:14 +02:00

286 lines
10 KiB
Python

"""
Parst Cookie-Tabellen die der User direkt ins Frontend kopiert.
Typische Quellen:
* Browser-Copy aus VW/BMW/Mercedes Cookie-Richtlinie (Tab-getrennt)
* Excel-Export aus Borlabs / OneTrust / Cookiebot Admin (CSV / Pipe)
* Markdown-Tabelle aus interner Doku
Erkennt 4 Spalten-Layouts (heuristisch):
1. [Name, Kategorie, Beschreibung, Speicherdauer, Provider]
2. [Name, Provider, Zweck, Speicherdauer]
3. [Name, Beschreibung, Speicherdauer]
4. nur [Name, Speicherdauer]
Output: gleiche Vendor-Record-Struktur wie vendor_extractor / LLM —
damit der Rest der Pipeline (VVT-Tabelle, Library-Mismatch-Check) ohne
Aenderung weiterlaeuft.
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
_CATEGORY_LABELS = (
"notwendig", "essential", "funktional", "tracking", "marketing",
"statistik", "analyse", "analytics", "performance", "werbung",
"advertising", "targeting", "preferences", "social_media",
"strictly necessary", "personalisierung",
)
def _looks_like_separator(line: str) -> str | None:
"""Detect the column-separator of a tabular line."""
if "\t" in line and line.count("\t") >= 2:
return "\t"
if " | " in line and line.count(" | ") >= 2:
return " | "
if ";" in line and line.count(";") >= 2 and "," not in line[:20]:
return ";"
if "," in line and line.count(",") >= 3:
return ","
return None
def _normalize_category(s: str) -> str:
sl = s.lower().strip()
for cat in _CATEGORY_LABELS:
if cat in sl:
if cat in ("notwendig", "essential", "strictly necessary"):
return "essential"
if cat in ("tracking", "marketing", "werbung",
"advertising", "targeting"):
return "marketing"
if cat in ("statistik", "analyse", "analytics", "performance"):
return "statistics"
if cat == "funktional":
return "functional"
if cat == "social_media":
return "social_media"
return sl[:30]
def _parse_persistence(s: str) -> str:
"""Extracts 'Speicherdauer' notation."""
m = re.search(
r"(\d+\s*(sekunde|minute|stunde|tag|woche|monat|jahr|day|month|year)[^\s,;|]{0,5})",
s, re.I,
)
if m:
return m.group(1).strip()[:80]
if re.search(r"\bsession\b", s, re.I):
return "Session"
if re.search(r"permanent", s, re.I):
return "Permanent"
return ""
def parse_cookie_table(text: str) -> list[dict]:
"""Returns vendor-records aus einer copy-pasted Cookie-Tabelle.
Bei nicht-tabellarischem Text: return [].
"""
if not text or len(text) < 100:
return []
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
if not lines:
return []
# Sample 30 lines to detect separator
sample = lines[:60]
sep_counts: dict[str, int] = {}
for ln in sample:
sep = _looks_like_separator(ln)
if sep:
sep_counts[sep] = sep_counts.get(sep, 0) + 1
if not sep_counts or max(sep_counts.values()) < 3:
return []
sep = max(sep_counts, key=sep_counts.get)
logger.info("cookies_table_parser: detected separator '%s' (%d hits)",
sep, sep_counts[sep])
# Parse rows
rows: list[list[str]] = []
for ln in lines:
if sep in ln:
parts = [p.strip().strip('"') for p in ln.split(sep)]
if len(parts) >= 2 and parts[0]:
rows.append(parts)
if len(rows) < 3:
return []
# Detect column layout from header (first row) or by content
header_row = [c.lower() for c in rows[0]]
has_header = any(h in " ".join(header_row) for h in
("cookie", "name", "anbieter", "provider", "zweck",
"kategorie", "speicherdauer", "dauer"))
data_rows = rows[1:] if has_header else rows
# Map columns by header keyword or by position
col_idx = {"name": 0, "provider": -1, "category": -1,
"purpose": -1, "persistence": -1}
if has_header:
for i, h in enumerate(header_row):
if "name" in h or "cookie" in h:
col_idx["name"] = i
elif "anbieter" in h or "provider" in h or "domain" in h:
col_idx["provider"] = i
elif "kategorie" in h or "type" in h or "art" in h:
col_idx["category"] = i
elif "zweck" in h or "purpose" in h or "beschreib" in h:
col_idx["purpose"] = i
elif "speicher" in h or "dauer" in h or "lebens" in h or "expir" in h:
col_idx["persistence"] = i
# Aggregate by vendor (or by name if no vendor column)
by_vendor: dict[str, dict] = {}
for r in data_rows:
if len(r) < 2:
continue
name = r[col_idx["name"]] if col_idx["name"] < len(r) else r[0]
name = (name or "").strip()
if not name or len(name) > 120 or len(name) < 2:
continue
provider = ""
if col_idx["provider"] >= 0 and col_idx["provider"] < len(r):
provider = r[col_idx["provider"]].strip()
if not provider:
# Heuristik: wenn Spalte 'Anbieter' fehlt, raten aus Cookie-Name
provider = _guess_vendor(name)
if not provider:
provider = "Unbekannter Anbieter"
category = ""
purpose = ""
persistence = ""
if col_idx["category"] >= 0 and col_idx["category"] < len(r):
category = _normalize_category(r[col_idx["category"]])
if col_idx["purpose"] >= 0 and col_idx["purpose"] < len(r):
purpose = r[col_idx["purpose"]][:500]
if col_idx["persistence"] >= 0 and col_idx["persistence"] < len(r):
persistence = _parse_persistence(r[col_idx["persistence"]])
if not category:
# Inferieren aus purpose-Text
category = _normalize_category(purpose)
entry = by_vendor.setdefault(provider, {
"name": provider, "country": "",
"purpose": purpose[:300] if purpose else "",
"category": category,
"opt_out_url": "", "privacy_policy_url": "",
"persistence": persistence,
"cookies": [],
"source": "table_paste",
})
entry["cookies"].append({
"name": name, "purpose": purpose[:200],
"expiry": persistence, "is_third_party": True,
})
out = list(by_vendor.values())
logger.info("cookies_table_parser: %d vendors / %d cookies parsed",
len(out), sum(len(v["cookies"]) for v in out))
return out
_FLAT_ROW_RE = re.compile(
r"\b([A-Za-z_][A-Za-z0-9_\-\.]{1,40})\s+"
r"((?:Tracking|Session|Funktional|Marketing|Analytics|Performance|"
r"Notwendig|Strictly\s+Necessary|Statistik|Personalisierung)"
r"[A-Za-zäöüÄÖÜß \-\(\)]*?Cookies?[^A-Z]{0,400}?)"
r"(?:(\d+)\s*(Sekunde|Minute|Stunde|Tag|Woche|Monat|Jahr|day|month|year)|"
r"\b(Session|Permanent)\b)",
re.I | re.S,
)
def parse_flat_cookie_text(text: str) -> list[dict]:
"""Variante fuer Sites wie VW die ihre Cookie-Tabelle als flachen
Text liefern (Cookie-Name + Kategorie + Beschreibung + Dauer in
einem Block hintereinander, ohne klare Trenner).
Regex sucht nach 'NAME [Tracking|Session|Funktional...] Cookies
... [13 Monate|Session|Permanent]' und behandelt jeden Match als
eine Tabellen-Zeile.
"""
if not text or len(text) < 500:
return []
matches = list(_FLAT_ROW_RE.finditer(text))
if len(matches) < 3:
return []
by_vendor: dict[str, dict] = {}
seen_names: set[str] = set()
for m in matches:
name = m.group(1).strip()
nl = name.lower()
if nl in seen_names:
continue
if nl in ("dieser", "diese", "ein", "der", "die", "das",
"session", "permanent", "funktional", "notwendig",
"marketing", "analytics", "werbung", "anbieter",
"tracking", "cookie", "cookies", "und", "von",
"einer", "ist", "alle", "noch", "auch", "name",
"art", "zweck", "dauer"):
continue
if len(name) < 3 or len(name) > 60:
continue
seen_names.add(nl)
category = _normalize_category(m.group(2) or "")
persistence = ""
if m.group(3):
persistence = f"{m.group(3)} {m.group(4)}"
elif m.group(5):
persistence = m.group(5)
purpose = (m.group(2) or "").strip()[:300]
vendor = _guess_vendor(name) or "Unbekannter Anbieter"
entry = by_vendor.setdefault(vendor, {
"name": vendor, "country": "",
"purpose": purpose, "category": category,
"opt_out_url": "", "privacy_policy_url": "",
"persistence": persistence,
"cookies": [],
"source": "flat_pattern",
})
entry["cookies"].append({
"name": name, "purpose": purpose[:200],
"expiry": persistence, "is_third_party": True,
})
out = list(by_vendor.values())
logger.info("parse_flat_cookie_text: %d vendors / %d cookies",
len(out), sum(len(v["cookies"]) for v in out))
return out
_VENDOR_GUESS = (
("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"),
("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"),
("IDE", "Google DoubleClick"), ("DSID", "Google"),
("_fbp", "Meta / Facebook"), ("fr", "Meta / Facebook"),
("_pin_unauth", "Pinterest"), ("_uetsid", "Microsoft Bing"),
("_uetvid", "Microsoft Bing"), ("MUID", "Microsoft"),
("tt_", "TikTok"), ("li_at", "LinkedIn"),
("OptanonConsent", "OneTrust"), ("cookieconsent", "Borlabs / Cookie-CMP"),
("eta_", "etracker"), ("matomo", "Matomo"),
("_hjid", "Hotjar"), ("_hj", "Hotjar"),
("__cf", "Cloudflare"), ("datadome", "DataDome"),
("incap_", "Imperva Incapsula"),
("ajs_", "Segment"), ("amp_", "Amplitude"),
("sat_track", "Adobe Experience Cloud"),
("AMCV_", "Adobe Experience Cloud"),
("s_cc", "Adobe Analytics"), ("s_sq", "Adobe Analytics"),
)
def _guess_vendor(cookie_name: str) -> str:
nl = cookie_name.lower()
for prefix, vendor in _VENDOR_GUESS:
if nl.startswith(prefix.lower()) or prefix.lower() in nl:
return vendor
return ""