diff --git a/admin-compliance/app/api/sdk/v1/agent/banner-check/route.ts b/admin-compliance/app/api/sdk/v1/agent/banner-check/route.ts index 3923956..84cfc40 100644 --- a/admin-compliance/app/api/sdk/v1/agent/banner-check/route.ts +++ b/admin-compliance/app/api/sdk/v1/agent/banner-check/route.ts @@ -11,7 +11,7 @@ const BACKEND_URL = process.env.BACKEND_API_URL || 'http://backend-compliance:80 export async function POST(request: NextRequest) { try { const body = await request.json() - const { url } = body + const { url, categories = [] } = body if (!url) { return NextResponse.json({ error: 'URL erforderlich' }, { status: 400 }) @@ -21,7 +21,7 @@ export async function POST(request: NextRequest) { const response = await fetch(`${BACKEND_URL}/api/compliance/agent/banner-check`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ url }), + body: JSON.stringify({ url, categories }), signal: AbortSignal.timeout(120000), // 2 min for Playwright }) diff --git a/admin-compliance/app/sdk/agent/_components/BannerCheckTab.tsx b/admin-compliance/app/sdk/agent/_components/BannerCheckTab.tsx index 9cf7d45..d037095 100644 --- a/admin-compliance/app/sdk/agent/_components/BannerCheckTab.tsx +++ b/admin-compliance/app/sdk/agent/_components/BannerCheckTab.tsx @@ -33,12 +33,34 @@ interface BannerResult { } } +const CATEGORIES = [ + { id: 'all', label: 'Alle Kategorien' }, + { id: 'necessary', label: 'Notwendig' }, + { id: 'statistics', label: 'Statistik' }, + { id: 'marketing', label: 'Marketing' }, + { id: 'functional', label: 'Funktional' }, + { id: 'preferences', label: 'Praeferenzen' }, +] + export function BannerCheckTab() { const [url, setUrl] = useState('') const [loading, setLoading] = useState(false) const [progress, setProgress] = useState('') const [error, setError] = useState(null) const [result, setResult] = useState(null) + const [categories, setCategories] = useState(['all']) + + const toggleCategory = (id: string) => { + if (id === 'all') { + setCategories(['all']) + return + } + setCategories(prev => { + const without = prev.filter(c => c !== 'all' && c !== id) + const next = prev.includes(id) ? without : [...without, id] + return next.length === 0 ? ['all'] : next + }) + } const handleScan = async (e: React.FormEvent) => { e.preventDefault() @@ -49,11 +71,16 @@ export function BannerCheckTab() { setResult(null) setProgress('Cookie-Banner wird analysiert...') + // 'all' selected = empty array (test everything) + const selectedCategories = categories.includes('all') + ? [] + : categories + try { const res = await fetch('/api/sdk/v1/agent/banner-check', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ url: url.trim() }), + body: JSON.stringify({ url: url.trim(), categories: selectedCategories }), }) if (!res.ok) throw new Error(`Fehler: ${res.status}`) const data = await res.json() @@ -94,22 +121,55 @@ export function BannerCheckTab() {

-
- setUrl(e.target.value)} - placeholder="https://www.example.com/" - className="flex-1 px-4 py-3 border border-gray-300 rounded-lg focus:ring-2 focus:ring-purple-500 focus:border-transparent text-sm" - disabled={loading} required - /> - + +
+ setUrl(e.target.value)} + placeholder="https://www.example.com/" + className="flex-1 px-4 py-3 border border-gray-300 rounded-lg focus:ring-2 focus:ring-purple-500 focus:border-transparent text-sm" + disabled={loading} required + /> + +
+ +
+ {CATEGORIES.map(cat => ( + + ))} +
{progress && ( diff --git a/backend-compliance/compliance/api/agent_doc_check_routes.py b/backend-compliance/compliance/api/agent_doc_check_routes.py index 9d79188..3639a05 100644 --- a/backend-compliance/compliance/api/agent_doc_check_routes.py +++ b/backend-compliance/compliance/api/agent_doc_check_routes.py @@ -94,6 +94,7 @@ class DocCheckStatusResponse(BaseModel): class BannerCheckRequest(BaseModel): url: str + categories: list[str] = [] # empty = test all categories @router.post("/banner-check") @@ -103,7 +104,11 @@ async def run_banner_check(req: BannerCheckRequest): async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post( f"{CONSENT_TESTER_URL}/scan", - json={"url": req.url, "timeout_per_phase": 10}, + json={ + "url": req.url, + "timeout_per_phase": 10, + "categories": req.categories, + }, ) if resp.status_code == 200: return resp.json() diff --git a/consent-tester/main.py b/consent-tester/main.py index 93cbc8e..91c3772 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -34,6 +34,7 @@ app.add_middleware( class ScanRequest(BaseModel): url: str timeout_per_phase: int = 10 # seconds to wait after page load + categories: list[str] = [] # empty = test all categories class ScanResponse(BaseModel): @@ -59,7 +60,7 @@ async def health(): async def scan_consent(req: ScanRequest): """Run 3-phase consent test on a URL.""" logger.info("Starting consent test for %s", req.url) - result = await run_consent_test(req.url, req.timeout_per_phase) + result = await run_consent_test(req.url, req.timeout_per_phase, req.categories) # Build raw response dict for structured check mapping phases = { diff --git a/consent-tester/requirements.txt b/consent-tester/requirements.txt index 84af6db..a9f5286 100644 --- a/consent-tester/requirements.txt +++ b/consent-tester/requirements.txt @@ -1,4 +1,5 @@ fastapi==0.115.12 uvicorn==0.34.2 playwright==1.52.0 +playwright-stealth==1.0.6 pydantic>=2.0 diff --git a/consent-tester/services/banner_detector.py b/consent-tester/services/banner_detector.py index 396c5dd..2f6ba5c 100644 --- a/consent-tester/services/banner_detector.py +++ b/consent-tester/services/banner_detector.py @@ -1,12 +1,12 @@ """ Banner Detector — identifies Consent Management Platforms and their buttons. -Supports 10+ CMPs with specific selectors + generic fallback. +Supports 30 CMPs with specific selectors + generic fallback + Shadow DOM. """ from dataclasses import dataclass -from playwright.async_api import Page, Locator +from playwright.async_api import Page @dataclass @@ -79,6 +79,127 @@ CMP_SELECTORS = [ "accept": "#tarteaucitronPersonalize2", "reject": "#tarteaucitronAllDenied2", }, + # --- 20 additional CMPs --- + { + "name": "Sourcepoint", + "detect": "div[id^='sp_message']", + "accept": ".sp_choice_type_11", + "reject": ".sp_choice_type_13", + }, + { + "name": "Axeptio", + "detect": "#axeptio_widget", + "accept": "[data-ax='accept']", + "reject": "[data-ax='decline']", + }, + { + "name": "Iubenda", + "detect": "#iubenda-cs-banner", + "accept": ".iubenda-cs-accept-btn", + "reject": ".iubenda-cs-reject-btn", + }, + { + "name": "Termly", + "detect": "#termly-code-snippet-support", + "accept": "[data-tid='banner-accept']", + "reject": "[data-tid='banner-decline']", + }, + { + "name": "CookieFirst", + "detect": "#cookiefirst-root", + "accept": "[data-cookiefirst-action='accept']", + "reject": "[data-cookiefirst-action='reject']", + }, + { + "name": "Complianz", + "detect": "#cmplz-cookiebanner-container", + "accept": ".cmplz-accept", + "reject": ".cmplz-deny", + }, + { + "name": "CookieScript", + "detect": "#cookiescript_injected", + "accept": "#cookiescript_accept", + "reject": "#cookiescript_reject", + }, + { + "name": "HubSpot", + "detect": "#hs-eu-cookie-confirmation", + "accept": "#hs-eu-confirmation-button", + "reject": "#hs-eu-decline-button", + }, + { + "name": "Civic UK", + "detect": "#ccc, .ccc-content", + "accept": "#ccc-recommended-settings", + "reject": "#ccc-reject-settings", + }, + { + "name": "GDPR Cookie Compliance", + "detect": "#moove_gdpr_cookie_modal", + "accept": ".moove-gdpr-modal-allow-all", + "reject": ".moove-gdpr-modal-save-settings", + }, + { + "name": "CookieHub", + "detect": "#ch2-container", + "accept": "#ch2-btn-accept", + "reject": "#ch2-btn-decline", + }, + { + "name": "Osano", + "detect": ".osano-cm-dialog", + "accept": ".osano-cm-accept-all", + "reject": ".osano-cm-deny", + }, + { + "name": "Ketch", + "detect": "#ketch-consent", + "accept": "[data-testid='accept-button']", + "reject": "[data-testid='decline-button']", + }, + { + "name": "Piwik PRO", + "detect": "#ppms_cm_popup_overlay", + "accept": "#ppms_cm_agree-to-all", + "reject": "#ppms_cm_reject-all", + }, + { + "name": "Cookie Consent (Insites)", + "detect": ".cc-window", + "accept": ".cc-btn.cc-allow", + "reject": ".cc-btn.cc-deny", + }, + { + "name": "Admiral", + "detect": "[id^='admiral-']", + "accept": "[class*='admiral-accept']", + "reject": "[class*='admiral-reject']", + }, + { + "name": "Sibbo", + "detect": "#sibbo-cmp-layout", + "accept": "#sibbo-cmp-accept-all", + "reject": "#sibbo-cmp-reject-all", + }, + { + "name": "Evidon", + "detect": "#_evidon_banner", + "accept": "#_evidon-accept-button", + "reject": "#_evidon-decline-button", + }, + { + "name": "LiveRamp", + "detect": "#_lr-cookie-consent", + "accept": "#_lr-accept-all", + "reject": "#_lr-reject-all", + }, + { + "name": "Adsimple", + "detect": "#adconsent-usp-banner", + "accept": ".adconsent-accept-all", + "reject": ".adconsent-reject-all", + }, ] # Generic fallback patterns (text-based) @@ -94,45 +215,245 @@ GENERIC_REJECT_TEXTS = [ "Decline", "Nein", "Nicht einverstanden", ] +# Attribute-based generic selectors for consent buttons +_GENERIC_ATTR_ACCEPT = [ + "[data-consent='accept']", "[data-cookie='accept']", "[data-gdpr='accept']", + "[data-consent-accept]", "[data-cookie-accept]", +] +_GENERIC_ATTR_REJECT = [ + "[data-consent='reject']", "[data-cookie='reject']", "[data-gdpr='reject']", + "[data-consent-reject]", "[data-cookie-reject]", +] + +# Dialog / aria selectors to find consent containers +_DIALOG_SELECTORS = [ + "[role='dialog']", + "[aria-label*='cookie' i]", "[aria-label*='consent' i]", + "[aria-label*='datenschutz' i]", "[aria-label*='Cookie' i]", +] + +# JavaScript for recursive Shadow DOM search +_SHADOW_DETECT_JS = """ +() => { + const KEYWORDS = /cookie|consent|datenschutz|privacy/i; + const results = []; + function walk(root) { + for (const el of root.querySelectorAll('*')) { + if (el.shadowRoot) { + const shadow = el.shadowRoot; + const text = shadow.innerHTML || ''; + if (KEYWORDS.test(text)) { + const buttons = []; + for (const btn of shadow.querySelectorAll( + 'button, a[role="button"], [role="button"]' + )) { + const t = (btn.textContent || '').trim(); + if (t.length > 0 && t.length < 80) { + buttons.push(t); + } + } + if (buttons.length > 0) { + const tag = el.tagName.toLowerCase(); + const id = el.id ? '#' + el.id : ''; + results.push({ + host: tag + id, + buttons: buttons, + preview: text.substring(0, 200) + }); + } + } + walk(shadow); + } + } + } + walk(document); + return results.length > 0 ? results[0] : null; +} +""" + +_SHADOW_CLICK_JS = """ +(textPattern) => { + const regex = new RegExp(textPattern, 'i'); + function walk(root) { + for (const el of root.querySelectorAll('*')) { + if (el.shadowRoot) { + const btns = el.shadowRoot.querySelectorAll( + 'button, a[role="button"], [role="button"]' + ); + for (const btn of btns) { + if (regex.test(btn.textContent || '')) { + btn.click(); + return true; + } + } + const found = walk(el.shadowRoot); + if (found) return true; + } + } + return false; + } + return walk(document); +} +""" + + +async def _detect_in_shadow_dom(page: Page) -> BannerInfo | None: + """Search Shadow DOM roots for consent banners as last-resort fallback.""" + try: + result = await page.evaluate(_SHADOW_DETECT_JS) + if not result: + return None + buttons = result.get("buttons", []) + host = result.get("host", "") + accept_pat = "" + reject_pat = "" + accept_kw = ("accept", "akzeptieren", "zustimmen", "agree", "allow", + "einverstanden", "alle") + reject_kw = ("reject", "ablehnen", "deny", "decline", "refuse", + "notwendig", "necessary", "essential") + for text in buttons: + low = text.lower() + if not accept_pat and any(k in low for k in accept_kw): + accept_pat = text + elif not reject_pat and any(k in low for k in reject_kw): + reject_pat = text + if not accept_pat and not reject_pat: + return None + return BannerInfo( + detected=True, + provider=f"ShadowDOM({host})", + accept_selector=f"shadow-click:{accept_pat}" if accept_pat else "", + reject_selector=f"shadow-click:{reject_pat}" if reject_pat else "", + ) + except Exception: + return None + + +async def _click_in_shadow_dom(page: Page, text_pattern: str) -> bool: + """Click a button inside a Shadow DOM root matching the text pattern.""" + try: + return await page.evaluate(_SHADOW_CLICK_JS, text_pattern) + except Exception: + return False + + +async def _detect_generic_dialog(page: Page) -> BannerInfo | None: + """Detect consent banners in dialog/aria containers.""" + consent_kw = ("cookie", "consent", "datenschutz", "privacy") + for sel in _DIALOG_SELECTORS: + try: + containers = page.locator(sel) + count = await containers.count() + if count == 0: + continue + container = containers.first + text = (await container.inner_text(timeout=2000)).lower() + if not any(kw in text for kw in consent_kw): + continue + # Found a consent dialog — look for accept/reject buttons + accept = "" + reject = "" + for asel in _GENERIC_ATTR_ACCEPT: + if await container.locator(asel).count() > 0: + accept = f"{sel} {asel}" + break + for rsel in _GENERIC_ATTR_REJECT: + if await container.locator(rsel).count() > 0: + reject = f"{sel} {rsel}" + break + if not accept: + for t in GENERIC_ACCEPT_TEXTS: + if await container.get_by_text(t, exact=False).count() > 0: + accept = f'{sel} button:has-text("{t}")' + break + if not reject: + for t in GENERIC_REJECT_TEXTS: + if await container.get_by_text(t, exact=False).count() > 0: + reject = f'{sel} button:has-text("{t}")' + break + if accept or reject: + return BannerInfo( + detected=True, provider="Generic (dialog)", + accept_selector=accept, reject_selector=reject, + ) + except Exception: + continue + return None + + +async def _detect_generic_attr(page: Page) -> BannerInfo | None: + """Detect consent buttons by data-consent/data-cookie/data-gdpr attributes.""" + accept = "" + reject = "" + for sel in _GENERIC_ATTR_ACCEPT: + try: + if await page.locator(sel).count() > 0: + accept = sel + break + except Exception: + continue + for sel in _GENERIC_ATTR_REJECT: + try: + if await page.locator(sel).count() > 0: + reject = sel + break + except Exception: + continue + if accept or reject: + return BannerInfo( + detected=True, provider="Generic (attr)", + accept_selector=accept, reject_selector=reject, + ) + return None + async def detect_banner(page: Page) -> BannerInfo: """Detect which CMP is used and return button selectors.""" - # Try CMP-specific selectors first + # 1. Try CMP-specific selectors for cmp in CMP_SELECTORS: try: - count = await page.locator(cmp["detect"]).count() - if count > 0: + if await page.locator(cmp["detect"]).count() > 0: return BannerInfo( - detected=True, - provider=cmp["name"], + detected=True, provider=cmp["name"], accept_selector=cmp["accept"], reject_selector=cmp["reject"], ) except Exception: continue - # Generic fallback — search for buttons by text + # 2. Generic fallback — search buttons by text for text in GENERIC_ACCEPT_TEXTS: try: btn = page.get_by_text(text, exact=False) if await btn.count() > 0: accept = f'button:has-text("{text}")' - # Try to find reject button nearby reject = "" for rtext in GENERIC_REJECT_TEXTS: - rbtn = page.get_by_text(rtext, exact=False) - if await rbtn.count() > 0: + if await page.get_by_text(rtext, exact=False).count() > 0: reject = f'button:has-text("{rtext}")' break return BannerInfo( - detected=True, - provider="Generic", - accept_selector=accept, - reject_selector=reject, + detected=True, provider="Generic", + accept_selector=accept, reject_selector=reject, ) except Exception: continue + # 3. Generic fallback — dialog/aria containers with consent keywords + dialog_result = await _detect_generic_dialog(page) + if dialog_result: + return dialog_result + + # 4. Generic fallback — data-consent/data-cookie/data-gdpr attributes + attr_result = await _detect_generic_attr(page) + if attr_result: + return attr_result + + # 5. Shadow DOM fallback — search inside shadow roots + shadow_result = await _detect_in_shadow_dom(page) + if shadow_result: + return shadow_result + return BannerInfo(detected=False, provider="", accept_selector="", reject_selector="") @@ -140,10 +461,21 @@ async def click_button(page: Page, selector: str, timeout: int = 5000) -> bool: """Try to click a consent button. Returns True if clicked successfully.""" if not selector: return False + + # Handle Shadow DOM selectors + if selector.startswith("shadow-click:"): + text_pattern = selector[len("shadow-click:"):] + return await _click_in_shadow_dom(page, text_pattern) + try: locator = page.locator(selector).first await locator.wait_for(state="visible", timeout=timeout) await locator.click() return True except Exception: + # Fallback: try Shadow DOM click with selector text + # Extract button text from selector like 'button:has-text("Accept all")' + if ':has-text("' in selector: + text = selector.split(':has-text("')[1].rstrip('")') + return await _click_in_shadow_dom(page, text) return False diff --git a/consent-tester/services/consent_scanner.py b/consent-tester/services/consent_scanner.py index 2890bb6..4b50b7a 100644 --- a/consent-tester/services/consent_scanner.py +++ b/consent-tester/services/consent_scanner.py @@ -11,6 +11,12 @@ from dataclasses import dataclass, field from playwright.async_api import async_playwright, Page, BrowserContext +try: + from playwright_stealth import stealth_async + HAS_STEALTH = True +except ImportError: + HAS_STEALTH = False + from services.banner_detector import detect_banner, click_button, BannerInfo from services.script_analyzer import ( classify_scripts, find_tracking_services, @@ -53,22 +59,43 @@ class ConsentTestResult: banner_has_dse_link: bool = False -async def run_consent_test(url: str, wait_secs: int = 10) -> ConsentTestResult: - """Run 3-phase consent test on a URL.""" +async def run_consent_test( + url: str, wait_secs: int = 10, categories: list[str] | None = None, +) -> ConsentTestResult: + """Run 3-phase consent test on a URL. + + Args: + url: Website URL to test. + wait_secs: Seconds to wait per phase. + categories: Optional list of category names to test (empty = test all). + """ result = ConsentTestResult() wait_ms = wait_secs * 1000 + filter_cats = categories or [] async with async_playwright() as p: browser = await p.chromium.launch( headless=True, - args=["--no-sandbox", "--disable-dev-shm-usage"], + args=[ + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-blink-features=AutomationControlled", + "--window-size=1920,1080", + ], ) try: # ── Phase A: Before consent ────────────────────────── logger.info("Phase A: First visit (no interaction)") - ctx_a = await browser.new_context(user_agent=USER_AGENT) + ctx_a = await browser.new_context( + user_agent=USER_AGENT, + viewport={"width": 1920, "height": 1080}, + locale="de-DE", + timezone_id="Europe/Berlin", + ) page_a = await ctx_a.new_page() + if HAS_STEALTH: + await stealth_async(page_a) scripts_a = [] page_a.on("request", lambda req: _collect_script(req, scripts_a)) @@ -101,8 +128,15 @@ async def run_consent_test(url: str, wait_secs: int = 10) -> ConsentTestResult: # ── Phase B: After rejecting ───────────────────────── logger.info("Phase B: Reject consent (%s)", banner.provider) - ctx_b = await browser.new_context(user_agent=USER_AGENT) + ctx_b = await browser.new_context( + user_agent=USER_AGENT, + viewport={"width": 1920, "height": 1080}, + locale="de-DE", + timezone_id="Europe/Berlin", + ) page_b = await ctx_b.new_page() + if HAS_STEALTH: + await stealth_async(page_b) scripts_b = [] page_b.on("request", lambda req: _collect_script(req, scripts_b)) @@ -128,8 +162,15 @@ async def run_consent_test(url: str, wait_secs: int = 10) -> ConsentTestResult: # ── Phase C: After accepting ───────────────────────── logger.info("Phase C: Accept consent (%s)", banner.provider) - ctx_c = await browser.new_context(user_agent=USER_AGENT) + ctx_c = await browser.new_context( + user_agent=USER_AGENT, + viewport={"width": 1920, "height": 1080}, + locale="de-DE", + timezone_id="Europe/Berlin", + ) page_c = await ctx_c.new_page() + if HAS_STEALTH: + await stealth_async(page_c) scripts_c = [] page_c.on("request", lambda req: _collect_script(req, scripts_c)) @@ -154,18 +195,40 @@ async def run_consent_test(url: str, wait_secs: int = 10) -> ConsentTestResult: try: from services.category_tester import detect_categories, test_single_category - ctx_cat = await browser.new_context(user_agent=USER_AGENT) + ctx_cat = await browser.new_context( + user_agent=USER_AGENT, + viewport={"width": 1920, "height": 1080}, + locale="de-DE", + timezone_id="Europe/Berlin", + ) page_cat = await ctx_cat.new_page() + if HAS_STEALTH: + await stealth_async(page_cat) await page_cat.goto(url, wait_until="networkidle", timeout=20000) await page_cat.wait_for_timeout(2000) - categories = await detect_categories(page_cat, banner) + detected_cats = await detect_categories(page_cat, banner) await page_cat.close() - if categories: - logger.info("Testing %d categories individually", len(categories)) - for cat in categories: - cat_ctx = await browser.new_context(user_agent=USER_AGENT) + # Filter to requested categories if specified + if filter_cats and detected_cats: + detected_cats = [ + c for c in detected_cats if c.name in filter_cats + ] + logger.info( + "Filtered to %d categories (requested: %s)", + len(detected_cats), filter_cats, + ) + + if detected_cats: + logger.info("Testing %d categories individually", len(detected_cats)) + for cat in detected_cats: + cat_ctx = await browser.new_context( + user_agent=USER_AGENT, + viewport={"width": 1920, "height": 1080}, + locale="de-DE", + timezone_id="Europe/Berlin", + ) cat_result = await test_single_category(cat_ctx, url, cat, banner, wait_ms) result.category_tests.append(cat_result) await cat_ctx.close()