diff --git a/backend-compliance/compliance/api/agent_check/_b12_wiring.py b/backend-compliance/compliance/api/agent_check/_b12_wiring.py new file mode 100644 index 00000000..42e746db --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b12_wiring.py @@ -0,0 +1,73 @@ +"""B12 wiring — Chatbot-Cookie-Klassifikation. + +Hängt sich an `state["extra_findings"]` mit ähnlichem Render-Pattern wie +B9/B10. Wird vom Orchestrator nach B11 (run_b9b10) aufgerufen. +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.chatbot_cookie_classification_check import ( + check_chatbot_cookie_classification, +) + +logger = logging.getLogger(__name__) + + +def run_b12(state: dict) -> None: + new = check_chatbot_cookie_classification(state) + if not new: + return + extras = state.get("extra_findings") or [] + extras.extend(new) + state["extra_findings"] = extras + state["chatbot_cookie_html"] = _render(new) + logger.info("B12 chatbot-cookies: %d findings", len(new)) + + +def _render(findings: list[dict]) -> str: + cards = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else ( + "#f59e0b" if sev == "MEDIUM" else "#64748b" + ) + meta = ( + "
" + f"Provider: {html.escape(f.get('provider') or '?')} · " + f"Cookie: {html.escape(f.get('cookie_name') or '?')}" + "
" + ) + evidence = "" + if f.get("evidence"): + evidence = ( + "
" + f"{html.escape(f['evidence'])}
" + ) + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"{meta}{evidence}" + f"
" + f"→ Empfehlung: " + f"{html.escape(f.get('action') or '')}
" + "
" + ) + return ( + "
" + "

" + "💬 Chatbot-Cookie-Klassifikation (KB-basiert)" + "

" + + "".join(cards) + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 5d174030..6d0673a9 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -67,6 +67,7 @@ async def run_compliance_check(check_id: str, req) -> None: run_b5(state) # AI-Act Art. 50 transparency run_b6b7b8(state) # DPO-cross-doc + Doc-Staleness + CMP-fingerprint run_b9b10(state) # Multi-Entity-Impressum + Drittland-Mechanismus + run_b12(state) # Chatbot-Cookie-Klassifikation (B11 ist in B9B10) # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/services/chatbot_cookie_classification_check.py b/backend-compliance/compliance/services/chatbot_cookie_classification_check.py new file mode 100644 index 00000000..a2720bd3 --- /dev/null +++ b/backend-compliance/compliance/services/chatbot_cookie_classification_check.py @@ -0,0 +1,249 @@ +"""B12 — Chatbot-Cookie-Klassifikations-Check. + +Erkennt Chatbot-Cookies anhand der KB-Pattern und prüft 4 typische +Fehler in der DSGVO/TDDDG-Klassifikation: + + CHAT-COOKIE-CLASS-001 Cookie als "technisch notwendig" deklariert, + obwohl in derselben Tabelle Targeting/A-B/ + Analytics-Funktionen erwähnt werden. Falsche + Rechtsgrundlage → MEDIUM + CHAT-COOKIE-CLASS-002 Chatbot-Cookie mit nur EINER Klassifikation, + obwohl der Provider mehrere Funktionen + bietet (tn UND cp) → MEDIUM + CHAT-COOKIE-PURPOSE-001 Zweck-Beschreibung zu generisch ("Statistik", + "Cookie") — Art. 13 DSGVO verlangt konkreten + Verarbeitungszweck → LOW + CHAT-COOKIE-RETENTION-001 Deklarierte Retention <90 Tage, KB-typische + Retention >365 Tage — vermutlich unterdeklariert + → HIGH (verlinkt B3) + +KB-Quelle: specialist_agents/_kb/chat_providers.json +""" + +from __future__ import annotations + +import json +import logging +import os +import re + +logger = logging.getLogger(__name__) + +_KB_PATH = os.path.join( + os.path.dirname(__file__), + "specialist_agents", "_kb", "chat_providers.json", +) + + +def _load_kb() -> dict: + try: + with open(_KB_PATH, encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logger.warning("chatbot KB load failed: %s", e) + return {"providers": {}} + + +_KB = _load_kb() + + +def _detect_provider(cookie_name: str) -> tuple[str, dict] | None: + """Match a cookie name against KB patterns. Returns (provider_id, pattern_meta).""" + if not cookie_name: + return None + providers = _KB.get("providers") or {} + for prov_id, prov in providers.items(): + for pat in prov.get("patterns") or []: + try: + if re.match(pat["regex"], cookie_name): + return prov_id, pat + except re.error: + continue + return None + + +_TARGETING_HINTS = ( + "targeting", "engagement", "a/b", "ab-test", "ab test", + "analytics", "tracking", "marketing", "lead", "scoring", + "personalisierung", "personalization", "remarketing", + "retargeting", +) + + +_GENERIC_PURPOSES = { + "cookie", "statistik", "marketing", "tracking", "analyse", + "performance", "session", "essential", "essenziell", + "notwendig", "—", "?", "", +} + + +def _looks_targeting(text: str) -> bool: + if not text: + return False + t = text.lower() + return any(k in t for k in _TARGETING_HINTS) + + +def _is_generic_purpose(purpose: str) -> bool: + if not purpose: + return True + cleaned = re.sub(r"[\s\.,;:!?]+", " ", purpose.lower()).strip() + if cleaned in _GENERIC_PURPOSES: + return True + return len(cleaned.split()) < 4 # weniger als 4 Wörter = zu kurz + + +def check_chatbot_cookie_classification(state: dict) -> list[dict]: + """Iterate cmp_vendors + cookies, emit findings for chatbot-cookie + classification problems.""" + cmp_vendors = state.get("cmp_vendors") or [] + if not cmp_vendors: + return [] + findings: list[dict] = [] + for v in cmp_vendors: + vendor_name = (v.get("name") or "").strip() + vendor_purpose = (v.get("purpose") or "").strip() + vendor_category = (v.get("category") or "").strip().lower() + for c in (v.get("cookies") or []): + cname = (c.get("name") or "").strip() + if not cname: + continue + match = _detect_provider(cname) + if not match: + continue + prov_id, pat = match + prov = _KB["providers"][prov_id] + c_class = (c.get("category") or "").strip().lower() + c_purpose = (c.get("purpose") or pat.get("purpose") + or "").strip() + + # CLASS-001: TN deklariert + Targeting-Hint im Vendor-Purpose + tn_words = ("technisch notwendig", "essenziell", "essential", + "necessary", "strictly necessary") + declared_tn = any(t in (c_class + " " + c_purpose).lower() + for t in tn_words) + if declared_tn and _looks_targeting(vendor_purpose): + findings.append({ + "check_id": "CHAT-COOKIE-CLASS-001", + "severity": "MEDIUM", + "severity_reason": "misclassified", + "provider": prov.get("company") or prov_id, + "cookie_name": cname, + "title": ( + f"Chatbot-Cookie '{cname}' ({prov.get('company')}) " + "als technisch notwendig deklariert, Tabellen-Beschreibung " + "erwähnt Targeting/Analytics" + ), + "norm": "DSGVO Art. 6 Abs. 1 lit. a + § 25 TDDDG", + "evidence": ( + f"Vendor-Purpose: '{vendor_purpose[:120]}' — " + f"Klassifikation: '{c_class}'" + ), + "action": ( + "Rechtsgrundlage korrigieren: bei Targeting/Analytics/" + "A-B-Tests ist Einwilligung erforderlich. " + "Cookie aus 'technisch notwendig' herausnehmen ODER " + "die Tracking-Funktionen vom Chat-Kern trennen." + ), + }) + + # CLASS-002: nur EINE Klassifikation obwohl Provider hat tn UND cp + has_tn = bool(prov.get("tn_functions")) + has_cp = bool(prov.get("cp_functions")) + if has_tn and has_cp: + # Single-class declaration ohne Aufschlüsselung? + # Heuristik: vendor.purpose enthält weder "auch" / "sowie" / + # "und" zwischen tn und cp Begriffen + purp_lc = vendor_purpose.lower() + mentions_tn = any( + f.replace("-", " ") in purp_lc + or f.replace("-", "") in purp_lc + for f in prov["tn_functions"] + ) + mentions_cp = any( + f.replace("-", " ") in purp_lc + or f.replace("-", "") in purp_lc + for f in prov["cp_functions"] + ) + if mentions_tn != mentions_cp: + # nennt nur eine Seite + missing_side = "Targeting/Analytics" if mentions_tn else ( + "Chat-Kontext (technisch notwendig)" + ) + findings.append({ + "check_id": "CHAT-COOKIE-CLASS-002", + "severity": "MEDIUM", + "severity_reason": "incomplete", + "provider": prov.get("company") or prov_id, + "cookie_name": cname, + "title": ( + f"Chatbot-Cookie '{cname}' ({prov.get('company')}) " + "ohne Funktions-Differenzierung — fehlende Seite: " + f"{missing_side}" + ), + "norm": "DSGVO Art. 13 Abs. 1 lit. c + d", + "action": ( + f"In der Cookie-Tabelle für '{cname}' sowohl die " + "tn-Funktionen (Chat-Kontext) als auch die " + "cp-Funktionen (Targeting/Analytics) getrennt " + "ausweisen — sonst kann der Nutzer Consent nicht " + "informiert geben." + ), + }) + + # PURPOSE-001: zu generischer Zweck + if _is_generic_purpose(c_purpose): + findings.append({ + "check_id": "CHAT-COOKIE-PURPOSE-001", + "severity": "LOW", + "severity_reason": "incomplete", + "provider": prov.get("company") or prov_id, + "cookie_name": cname, + "title": ( + f"Chatbot-Cookie '{cname}' mit zu generischem Zweck" + ), + "norm": "DSGVO Art. 13 Abs. 1 lit. c", + "evidence": f"Zweck-Text: '{c_purpose}'", + "action": ( + f"Konkreten Verarbeitungszweck angeben — z.B. statt " + f"'{c_purpose or 'Cookie'}' " + f"'{pat.get('purpose')}' nach KB-Empfehlung." + ), + }) + + # RETENTION-001: deklariert <90d, KB sagt >365d + from .retention_comparator import parse_duration_to_days + declared_str = ( + c.get("duration") or c.get("persistence") + or c.get("expiry") or "" + ) + declared_days, _kind = parse_duration_to_days(declared_str) + typical = prov.get("typical_retention_days") or 0 + if declared_days is not None and typical: + if declared_days < 90 and typical >= 250: + findings.append({ + "check_id": "CHAT-COOKIE-RETENTION-001", + "severity": "HIGH", + "severity_reason": "factually_wrong", + "provider": prov.get("company") or prov_id, + "cookie_name": cname, + "title": ( + f"Chatbot-Cookie '{cname}' Speicherdauer " + f"vermutlich unterdeklariert" + ), + "norm": "DSGVO Art. 13 Abs. 2 lit. a", + "evidence": ( + f"Deklariert: {int(declared_days)} Tage — " + f"KB-typisch für {prov.get('company')}: " + f"{typical} Tage" + ), + "action": ( + f"Tatsächliche Cookie-Lifetime im Browser prüfen " + f"und mit '{declared_str}' abgleichen. " + f"Vermutung: real ~{typical} Tage statt deklariert " + f"{int(declared_days)}." + ), + }) + if findings: + logger.info("B12 chatbot-classification: %d findings", len(findings)) + return findings diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index 430c1f61..3afe3c4a 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -46,6 +46,8 @@ def compose_v2(state: dict) -> str: state.get("ai_act_html", ""), # B6/B7/B8/B9/B10 — DPO + Staleness + CMP + MultiEntity + Transfer state.get("extra_findings_html", ""), + # B12 Chatbot-Cookie-Klassifikation + state.get("chatbot_cookie_html", ""), # Browser-Matrix (Stage 1.c) state.get("browser_matrix_html", ""), # All legacy build_*_html() wrapped in V2 sections — preserves diff --git a/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py b/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py index e8e9a17c..ceca7eee 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py +++ b/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py @@ -77,6 +77,22 @@ def _country_third(country: str | None) -> tuple[str, bool, str | None]: return (code, True, tag) +def _vendor_type_tag(cookie_name: str) -> str: + """Lookup the cookie in the chatbot-KB and return a [Chat]/[Chat+AI] tag.""" + try: + from ..chatbot_cookie_classification_check import _detect_provider, _KB + match = _detect_provider(cookie_name) + if not match: + return "" + prov_id, _pat = match + prov = (_KB.get("providers") or {}).get(prov_id) or {} + if prov.get("ai_capable"): + return ' Chat+AI' + return ' Chat' + except Exception: + return "" + + def _src_chip(in_dse: bool, in_table: bool, in_browser: bool, in_ocr: bool) -> str: parts: list[str] = [] @@ -248,7 +264,7 @@ def render_inventory_rows(rows: list[dict]) -> list[list[str]]: f'font-weight:700;">[{tag}]' ) out.append([ - f'{h(r["name"])}', + f'{h(r["name"])}{_vendor_type_tag(r["name"])}', h(r["vendor"]) if r["vendor"] else '', _x_or(r["category"]), diff --git a/backend-compliance/compliance/services/specialist_agents/_kb/chat_providers.json b/backend-compliance/compliance/services/specialist_agents/_kb/chat_providers.json new file mode 100644 index 00000000..14df01f6 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_kb/chat_providers.json @@ -0,0 +1,158 @@ +{ + "_schema_version": "1.0", + "_last_updated": "2026-06-06", + "_notes": "Anonymisierte Cookie-Pattern + Funktions-Klassifizierung pro Chat-Provider. Quelle: Anbieter-Dokumentation + EDPB-Cookie-Sweep + § 25 TDDDG. Kein Roh-Mandantendatum.", + "providers": { + "iadvize": { + "company": "iAdvize SAS", + "country": "FR", + "type": "Chat & Conversational Platform", + "ai_capable": true, + "patterns": [ + {"regex": "^iadvize-\\d+-vuid$", "purpose": "Visitor-ID + Chat-Verlauf-Wiedererkennung", "default_class": "consent_required"}, + {"regex": "^iadvize-\\d+-consent$", "purpose": "Consent-State für iAdvize", "default_class": "technically_necessary"}, + {"regex": "^iadvize_test_cookie_top_domain$", "purpose": "Tech-Probe für Root-Domain-Detektion", "default_class": "technically_necessary"} + ], + "typical_retention_days": 390, + "tn_functions": ["chat-continuation", "session-context", "logged-in-chat", "consent-state"], + "cp_functions": ["visitor-targeting", "engagement-rules", "ab-tests", "chat-analytics"] + }, + "intercom": { + "company": "Intercom Inc", + "country": "US", + "type": "Chat & Customer-Messaging-Platform", + "ai_capable": true, + "patterns": [ + {"regex": "^intercom-id-[\\w-]+$", "purpose": "Identifier-Cookie für Wiedererkennung", "default_class": "consent_required"}, + {"regex": "^intercom-session-[\\w-]+$", "purpose": "Aktuelle Chat-Session", "default_class": "technically_necessary"}, + {"regex": "^intercom-device-id-[\\w-]+$", "purpose": "Device-Fingerprint", "default_class": "consent_required"} + ], + "typical_retention_days": 270, + "tn_functions": ["session-context"], + "cp_functions": ["device-tracking", "user-recognition-across-sites", "marketing-attribution"] + }, + "tidio": { + "company": "Tidio LLC", + "country": "US", + "type": "Chat-Widget + Chatbot", + "ai_capable": true, + "patterns": [ + {"regex": "^TidioStore_[\\w-]+$", "purpose": "Chat-Konfiguration + Verlauf", "default_class": "consent_required"}, + {"regex": "^tidio[_-]?identify[_-].*$", "purpose": "Visitor-Identifikation", "default_class": "consent_required"} + ], + "typical_retention_days": 365, + "tn_functions": ["chat-continuation"], + "cp_functions": ["visitor-tracking", "lead-scoring", "marketing-automation"] + }, + "drift": { + "company": "Drift.com Inc", + "country": "US", + "type": "Conversational-Marketing-Platform", + "ai_capable": true, + "patterns": [ + {"regex": "^driftt_aid$", "purpose": "Anonymous Visitor-ID", "default_class": "consent_required"}, + {"regex": "^driftt_uid$", "purpose": "Logged-in User-ID", "default_class": "technically_necessary"}, + {"regex": "^drift_eid$", "purpose": "Email-Address-Identifier", "default_class": "consent_required"} + ], + "typical_retention_days": 365, + "tn_functions": ["logged-in-chat", "session-context"], + "cp_functions": ["lead-generation", "conversational-marketing", "ab-testing"] + }, + "userlike": { + "company": "Userlike UG", + "country": "DE", + "type": "Chat-Widget + Chatbot", + "ai_capable": true, + "patterns": [ + {"regex": "^userlike-cookie-banner[\\w-]*$", "purpose": "Consent-State für Userlike", "default_class": "technically_necessary"}, + {"regex": "^userlike-[\\w-]+-id$", "purpose": "Visitor-Identifier", "default_class": "consent_required"} + ], + "typical_retention_days": 365, + "tn_functions": ["chat-continuation", "consent-state"], + "cp_functions": ["visitor-tracking"] + }, + "zendesk_chat": { + "company": "Zendesk Inc", + "country": "US", + "type": "Chat & Customer-Support", + "ai_capable": true, + "patterns": [ + {"regex": "^__zlcmid$", "purpose": "Live-Chat-Identifier", "default_class": "technically_necessary"}, + {"regex": "^_zendesk_[\\w-]+$", "purpose": "Session-/Tracking-Cookie", "default_class": "consent_required"} + ], + "typical_retention_days": 365, + "tn_functions": ["live-chat-session"], + "cp_functions": ["analytics", "marketing-tracking"] + }, + "liveperson": { + "company": "LivePerson Inc", + "country": "US", + "type": "Conversational-AI-Platform", + "ai_capable": true, + "patterns": [ + {"regex": "^LP_[\\w-]+$", "purpose": "LivePerson-Visitor-ID", "default_class": "consent_required"}, + {"regex": "^liveperson-[\\w-]+$", "purpose": "Session/Engagement", "default_class": "consent_required"} + ], + "typical_retention_days": 365, + "tn_functions": ["chat-session"], + "cp_functions": ["visitor-tracking", "engagement-engine", "ai-chat-analytics"] + }, + "hubspot_chat": { + "company": "HubSpot Inc", + "country": "US", + "type": "Chat + CRM-Integration", + "ai_capable": true, + "patterns": [ + {"regex": "^hubspotutk$", "purpose": "HubSpot Visitor-Token", "default_class": "consent_required"}, + {"regex": "^__hssc$", "purpose": "Session-Tracking", "default_class": "consent_required"}, + {"regex": "^__hssrc$", "purpose": "Browser-Restart-Detection", "default_class": "consent_required"}, + {"regex": "^__hstc$", "purpose": "Visitor-Tracking", "default_class": "consent_required"}, + {"regex": "^messagesUtk$", "purpose": "Chat-Conversation-Token", "default_class": "technically_necessary"} + ], + "typical_retention_days": 390, + "tn_functions": ["chat-conversation"], + "cp_functions": ["crm-integration", "marketing-attribution", "lead-scoring"] + }, + "vertex_ai_chatbot": { + "company": "Google Cloud (Vertex AI)", + "country": "US (EU-Hosting möglich)", + "type": "AI-Chatbot (LLM-basiert)", + "ai_capable": true, + "patterns": [ + {"regex": "^_GRECAPTCHA$", "purpose": "reCAPTCHA-Protection für Vertex-AI-Frontend", "default_class": "technically_necessary"}, + {"regex": "^GOOGLE_AUTH.*$", "purpose": "Google-Auth-Token (wenn embedded)", "default_class": "technically_necessary"} + ], + "typical_retention_days": 180, + "tn_functions": ["bot-protection", "auth-token"], + "cp_functions": ["chat-analytics", "improvement-feedback"], + "ai_act_disclosure_required": true + }, + "openai_chatbot": { + "company": "OpenAI LLC", + "country": "US", + "type": "AI-Chatbot (GPT-Modelle)", + "ai_capable": true, + "patterns": [ + {"regex": "^__cf_bm$", "purpose": "Cloudflare-Bot-Schutz", "default_class": "technically_necessary"}, + {"regex": "^_cfuvid$", "purpose": "Cloudflare-Visitor-ID", "default_class": "consent_required"} + ], + "typical_retention_days": 365, + "tn_functions": ["bot-protection"], + "cp_functions": ["visitor-tracking", "ai-conversation-analytics"], + "ai_act_disclosure_required": true + }, + "anthropic_claude": { + "company": "Anthropic PBC", + "country": "US", + "type": "AI-Chatbot (Claude-Modelle)", + "ai_capable": true, + "patterns": [ + {"regex": "^cf_clearance$", "purpose": "Cloudflare-Anti-Bot", "default_class": "technically_necessary"} + ], + "typical_retention_days": 30, + "tn_functions": ["bot-protection"], + "cp_functions": ["chat-analytics"], + "ai_act_disclosure_required": true + } + } +} diff --git a/consent-tester/main.py b/consent-tester/main.py index 216ebabe..d7c972b4 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -62,8 +62,10 @@ class ScanResponse(BaseModel): from routes_matrix import router as matrix_router from routes_mobile import router as mobile_router +from routes_cookie_matrix import router as cookie_matrix_router app.include_router(matrix_router) app.include_router(mobile_router) +app.include_router(cookie_matrix_router) @app.get("/health") diff --git a/consent-tester/routes_cookie_matrix.py b/consent-tester/routes_cookie_matrix.py new file mode 100644 index 00000000..f1008080 --- /dev/null +++ b/consent-tester/routes_cookie_matrix.py @@ -0,0 +1,28 @@ +"""POST /scan-cookie-matrix — fokussierter Multi-Browser Cookie-Test.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from fastapi import APIRouter +from pydantic import BaseModel + +from services.cookie_behavior_per_browser import run_cookie_matrix + +logger = logging.getLogger(__name__) +router = APIRouter() + + +class CookieMatrixReq(BaseModel): + url: str + browser_profiles: list[str] | None = None + + +@router.post("/scan-cookie-matrix") +async def scan_cookie_matrix(req: CookieMatrixReq): + logger.info("Cookie-matrix scan %s profiles=%s", + req.url, req.browser_profiles or "default") + res = await run_cookie_matrix(req.url, req.browser_profiles) + res["scanned_at"] = datetime.now(timezone.utc).isoformat() + return res diff --git a/consent-tester/services/cookie_behavior_per_browser.py b/consent-tester/services/cookie_behavior_per_browser.py new file mode 100644 index 00000000..659d5434 --- /dev/null +++ b/consent-tester/services/cookie_behavior_per_browser.py @@ -0,0 +1,209 @@ +"""Cookie behavior per browser — fokussierter Multi-Engine Cookie-Test. + +Stage 1.b ohne consent_scanner-Edit: + - Eigener kleiner Playwright-basierter Cookie-Scanner + - Pro Browser-Profile: cookies VOR Banner / NACH "Alle ablehnen" / + NACH "Alle akzeptieren" + - Echte Engine-Diversität: chromium / firefox / webkit / + iphone-mobile-safari nutzen jeweils `p.chromium` / `p.firefox` / + `p.webkit.launch()` + - Output: Cookie-Delta pro Phase pro Browser → Tabelle zeigt ob + Banner-Reject in allen Browsern gleich wirkt +""" + +from __future__ import annotations + +import logging +from typing import Any + +from .browser_profiles import resolve_profiles + +logger = logging.getLogger(__name__) + + +_ACCEPT_TEXTS = ( + "alle akzeptieren", "alles akzeptieren", "akzeptieren", + "zustimmen", "agree", "accept all", "accept", + "i agree", "ok", "got it", +) +_REJECT_TEXTS = ( + "alle ablehnen", "ablehnen", "nur essenzielle", + "nur notwendige", "reject all", "decline", "deny", + "only necessary", "essential only", +) + + +async def _try_click(page, texts: tuple[str, ...]) -> bool: + """Try clicking the first visible button/link matching any of the texts.""" + for txt in texts: + try: + loc = page.get_by_role("button", + name=__import__("re").compile(txt, 2)) + if await loc.count() > 0: + await loc.first.click(timeout=4000) + await page.wait_for_timeout(1500) + return True + except Exception: + pass + # fallback by text + try: + loc = page.locator(f"text=/{txt}/i").first + if await loc.count() > 0: + await loc.click(timeout=4000) + await page.wait_for_timeout(1500) + return True + except Exception: + continue + return False + + +def _cookie_summary(cookies: list[dict]) -> dict: + """Compact summary: count + sample names + by-domain.""" + names = [c.get("name", "") for c in cookies] + domains: dict[str, int] = {} + for c in cookies: + d = c.get("domain", "") + domains[d] = domains.get(d, 0) + 1 + return { + "count": len(cookies), + "names": names, + "by_domain": sorted(domains.items(), key=lambda x: -x[1])[:8], + } + + +async def _scan_one(p, url: str, profile: dict) -> dict[str, Any]: + engine = profile["engine"] + if engine == "blink": + bt = p.chromium + elif engine == "gecko": + bt = p.firefox + elif engine == "webkit": + bt = p.webkit + else: + return {"profile_id": profile["id"], "error": f"unknown engine {engine}"} + launch_kw: dict[str, Any] = {"headless": True} + if profile.get("channel"): + launch_kw["channel"] = profile["channel"] + if profile.get("executable_path"): + launch_kw["executable_path"] = profile["executable_path"] + try: + browser = await bt.launch(**launch_kw) + except Exception as e: + return {"profile_id": profile["id"], "error": f"launch: {e}"[:200]} + try: + ctx_kw: dict[str, Any] = { + "locale": profile.get("locale", "de-DE"), + "timezone_id": profile.get("timezone", "Europe/Berlin"), + } + if profile.get("device"): + preset = p.devices.get(profile["device"]) or {} + ctx_kw.update(preset) + elif profile.get("viewport"): + ctx_kw["viewport"] = profile["viewport"] + context = await browser.new_context(**ctx_kw) + page = await context.new_page() + try: + await page.goto(url, wait_until="domcontentloaded", timeout=30000) + except Exception as e: + await browser.close() + return {"profile_id": profile["id"], + "error": f"goto: {e}"[:200]} + await page.wait_for_timeout(2500) + + before = await context.cookies() + + # Reject branch (fresh context) + reject_clicked = await _try_click(page, _REJECT_TEXTS) + await page.wait_for_timeout(1500) + after_reject = await context.cookies() + + # Accept branch (fresh context to isolate) + accept_clicked = False + after_accept: list[dict] = [] + try: + context2 = await browser.new_context(**ctx_kw) + page2 = await context2.new_page() + try: + await page2.goto(url, wait_until="domcontentloaded", + timeout=30000) + except Exception: + pass + try: + await page2.wait_for_timeout(2500) + except Exception: + pass + try: + accept_clicked = await _try_click(page2, _ACCEPT_TEXTS) + except Exception: + pass + try: + await page2.wait_for_timeout(1500) + except Exception: + pass + try: + after_accept = await context2.cookies() + except Exception: + pass + except Exception as e: + logger.info("accept branch failed for %s: %s", + profile["id"], e) + + return { + "profile_id": profile["id"], + "label": profile["label"], + "engine": engine, + "reject_clicked": reject_clicked, + "accept_clicked": accept_clicked, + "before": _cookie_summary(before), + "after_reject": _cookie_summary(after_reject), + "after_accept": _cookie_summary(after_accept), + "reject_minus_before_count": ( + len(after_reject) - len(before) + ), + "accept_minus_before_count": ( + len(after_accept) - len(before) + ), + } + finally: + try: + await browser.close() + except Exception: + pass + + +async def run_cookie_matrix( + url: str, requested_profiles: list[str] | None = None, +) -> dict: + """Run focused cookie behavior scan across all default profiles.""" + from playwright.async_api import async_playwright + profiles = resolve_profiles(requested_profiles) + results: list[dict] = [] + async with async_playwright() as p: + # Sequential to avoid resource contention on the Mac Mini + # (4 browsers in parallel sometimes hits target-closed races). + for prof in profiles: + try: + r = await _scan_one(p, url, prof) + except Exception as e: + logger.warning("scan_one %s crashed: %s", prof["id"], e) + r = {"profile_id": prof["id"], "error": f"crash: {e}"[:200]} + results.append(r) + # Aggregate: cross-browser inconsistency detection + after_reject_counts = { + r["profile_id"]: r.get("after_reject", {}).get("count", 0) + for r in results if "error" not in r + } + inconsistent = False + if after_reject_counts: + cmin = min(after_reject_counts.values()) + cmax = max(after_reject_counts.values()) + inconsistent = (cmax - cmin) >= 2 + return { + "url": url, + "profile_count": len(profiles), + "results": results, + "aggregate": { + "reject_cookie_counts": after_reject_counts, + "inconsistent_reject": inconsistent, + }, + }