diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 273c4126..979d855f 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -862,16 +862,19 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("html_table parse failed: %s", e) - # B — cookies_table_parser auch auf gecrawltem Cookie-Text - # (nicht nur bei User-Paste). Wenn der Crawler Tab/Pipe- - # getrennte Tabellen-Reihen erhalten hat, parsen wir sie - # deterministisch und mergen die Vendor-Records. + # B — cookies_table_parser auch auf gecrawltem Cookie-Text. + # Erst Standard-Parse (Tab/Pipe-getrennt). Wenn der nichts + # findet (kein Separator), Flat-Pattern-Parse fuer Sites wie + # VW die ihre Tabelle als flachen Text liefern. if cookie_text and len(cookie_text) >= 500: try: from compliance.services.cookies_table_parser import ( parse_cookie_table as _parse_ct, + parse_flat_cookie_text as _parse_flat, ) crawled_table_vendors = _parse_ct(cookie_text) + if not crawled_table_vendors: + crawled_table_vendors = _parse_flat(cookie_text) if crawled_table_vendors: existing = {(v.get("name") or "").strip().lower() for v in cmp_vendors} diff --git a/backend-compliance/compliance/services/cookies_table_parser.py b/backend-compliance/compliance/services/cookies_table_parser.py index 62053412..66f0a576 100644 --- a/backend-compliance/compliance/services/cookies_table_parser.py +++ b/backend-compliance/compliance/services/cookies_table_parser.py @@ -189,6 +189,74 @@ def parse_cookie_table(text: str) -> list[dict]: return out +_FLAT_ROW_RE = re.compile( + r"\b([A-Za-z_][A-Za-z0-9_\-\.]{1,40})\s+" + r"((?:Tracking|Session|Funktional|Marketing|Analytics|Performance|" + r"Notwendig|Strictly\s+Necessary|Statistik|Personalisierung)" + r"[A-Za-zäöüÄÖÜß \-\(\)]*?Cookies?[^A-Z]{0,400}?)" + r"(?:(\d+)\s*(Sekunde|Minute|Stunde|Tag|Woche|Monat|Jahr|day|month|year)|" + r"\b(Session|Permanent)\b)", + re.I | re.S, +) + + +def parse_flat_cookie_text(text: str) -> list[dict]: + """Variante fuer Sites wie VW die ihre Cookie-Tabelle als flachen + Text liefern (Cookie-Name + Kategorie + Beschreibung + Dauer in + einem Block hintereinander, ohne klare Trenner). + + Regex sucht nach 'NAME [Tracking|Session|Funktional...] Cookies + ... [13 Monate|Session|Permanent]' und behandelt jeden Match als + eine Tabellen-Zeile. + """ + if not text or len(text) < 500: + return [] + matches = list(_FLAT_ROW_RE.finditer(text)) + if len(matches) < 3: + return [] + by_vendor: dict[str, dict] = {} + seen_names: set[str] = set() + for m in matches: + name = m.group(1).strip() + nl = name.lower() + if nl in seen_names: + continue + if nl in ("dieser", "diese", "ein", "der", "die", "das", + "session", "permanent", "funktional", "notwendig", + "marketing", "analytics", "werbung", "anbieter", + "tracking", "cookie", "cookies", "und", "von", + "einer", "ist", "alle", "noch", "auch", "name", + "art", "zweck", "dauer"): + continue + if len(name) < 3 or len(name) > 60: + continue + seen_names.add(nl) + category = _normalize_category(m.group(2) or "") + persistence = "" + if m.group(3): + persistence = f"{m.group(3)} {m.group(4)}" + elif m.group(5): + persistence = m.group(5) + purpose = (m.group(2) or "").strip()[:300] + vendor = _guess_vendor(name) or "Unbekannter Anbieter" + entry = by_vendor.setdefault(vendor, { + "name": vendor, "country": "", + "purpose": purpose, "category": category, + "opt_out_url": "", "privacy_policy_url": "", + "persistence": persistence, + "cookies": [], + "source": "flat_pattern", + }) + entry["cookies"].append({ + "name": name, "purpose": purpose[:200], + "expiry": persistence, "is_third_party": True, + }) + out = list(by_vendor.values()) + logger.info("parse_flat_cookie_text: %d vendors / %d cookies", + len(out), sum(len(v["cookies"]) for v in out)) + return out + + _VENDOR_GUESS = ( ("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"), ("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"), diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index 8d47e95b..a0583a27 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -182,6 +182,35 @@ class DSIDiscoveryResult: # not the homepage navigation that DOM extraction returns. cmp_cookie_text: str = "" +async def _extract_dom_tables(page) -> list[list[str]]: + """D — extrahiert alle -Elemente aus dem aktuellen DOM als + list[list[str]] (jede Tabelle = Array von Tab-getrennten Zeilen). + + Wird VOR der Navigation woandershin von jeder Document-Loading- + Funktion aufgerufen damit jede DiscoveredDSI ihre Tabellen behaelt. + """ + try: + return await page.evaluate(""" + () => { + const out = []; + document.querySelectorAll('table').forEach(t => { + const rows = []; + t.querySelectorAll('tr').forEach(tr => { + const cells = []; + tr.querySelectorAll('th, td').forEach(c => { + cells.push((c.innerText || c.textContent || '').trim().replace(/\\s+/g, ' ')); + }); + if (cells.length >= 2) rows.push(cells.join('\\t')); + }); + if (rows.length >= 3) out.push(rows); + }); + return out.slice(0, 10); + } + """) or [] + except Exception: + return [] + + def _matches_dsi_keyword(text: str) -> tuple[bool, str]: """Check if text contains any DSI keyword. Returns (match, language).""" text_lower = text.lower().strip()