diff --git a/backend-compliance/compliance/api/agent_check/_b1_wiring.py b/backend-compliance/compliance/api/agent_check/_b1_wiring.py index 599a893d..75392b78 100644 --- a/backend-compliance/compliance/api/agent_check/_b1_wiring.py +++ b/backend-compliance/compliance/api/agent_check/_b1_wiring.py @@ -42,6 +42,29 @@ async def run_b1(state: dict) -> None: return _update(check_id, "Mobile Consent-Reachability prüfen...", 95) + + # Try the new Playwright WebKit + iPhone scan first (Task #7). + # Falls back to static HTTP fetch on error. + mobile = None + try: + from ._constants import CONSENT_TESTER_URL + async with httpx.AsyncClient(timeout=60.0) as c: + r = await c.post( + f"{CONSENT_TESTER_URL}/scan-mobile-reachability", + json={"url": homepage_url}, + ) + if r.status_code == 200: + mobile = r.json() + logger.info( + "B1 Mobile-Playwright: has_anchor=%s tap=%s click_opens=%s", + mobile.get("has_anchor"), + mobile.get("tap_target_px"), + mobile.get("click_opens_cmp"), + ) + except Exception as e: + logger.info("B1 Mobile-Playwright fallback to static fetch: %s", e) + + page_html = None try: async with httpx.AsyncClient( timeout=20.0, follow_redirects=True, @@ -50,21 +73,53 @@ async def run_b1(state: dict) -> None: "Version/17.5 Mobile/15E148 Safari/604.1"}, ) as c: r = await c.get(homepage_url) - if r.status_code != 200: - logger.info("B1: homepage fetch %s → HTTP %d", homepage_url, r.status_code) - return - page_html = r.text + if r.status_code == 200: + page_html = r.text except Exception as e: logger.warning("B1: homepage fetch failed: %s", e) + + if not page_html and not mobile: return - finding = evaluate_reachability(page_html, homepage_url) + finding = evaluate_reachability(page_html or "", homepage_url) + + # Enrich finding with mobile-playwright details when available + if mobile and mobile.get("has_anchor"): + finding["mobile_playwright"] = { + "has_anchor": mobile.get("has_anchor"), + "anchor_text": mobile.get("anchor_text"), + "tap_target_px": mobile.get("tap_target_px"), + "click_opens_cmp": mobile.get("click_opens_cmp"), + "engine_meta": mobile.get("engine_meta"), + } + # Tap-target rule (Apple HIG / WCAG 2.5.5): ≥ 44 px each side + tp = mobile.get("tap_target_px") or {} + if tp and (tp.get("w", 0) < 44 or tp.get("h", 0) < 44): + finding["notes"] = (finding.get("notes") or []) + [ + f"tap-target nur {tp.get('w')}×{tp.get('h')}px " + "(Apple HIG / WCAG verlangen ≥ 44×44)", + ] + if finding.get("passed"): + finding["passed"] = False + finding["severity"] = "MEDIUM" + finding["severity_reason"] = "misclassified" + # If anchor exists in DOM but click doesn't open CMP, bump severity + if mobile.get("has_anchor") and not mobile.get("click_opens_cmp"): + finding["notes"] = (finding.get("notes") or []) + [ + "click auf Footer-Link öffnet CMP nicht direkt", + ] + if finding.get("severity_reason") != "factually_wrong": + finding["severity"] = "MEDIUM" + finding["severity_reason"] = "misclassified" + finding["passed"] = False + state["reachability_finding"] = finding state["reachability_html"] = _render_block(finding) logger.info( - "B1 Reachability: passed=%s severity=%s reason=%s", + "B1 Reachability: passed=%s severity=%s reason=%s mobile=%s", finding["passed"], finding.get("severity"), finding.get("severity_reason"), + bool(mobile), ) diff --git a/backend-compliance/compliance/api/agent_check/_b9b10_wiring.py b/backend-compliance/compliance/api/agent_check/_b9b10_wiring.py index d7ee8a7f..4b4c313f 100644 --- a/backend-compliance/compliance/api/agent_check/_b9b10_wiring.py +++ b/backend-compliance/compliance/api/agent_check/_b9b10_wiring.py @@ -9,6 +9,9 @@ from __future__ import annotations import html import logging +from compliance.services.ai_retention_granularity_check import ( + check_ai_retention_granularity, +) from compliance.services.impressum_multi_entity_check import ( check_multi_entity_impressum, ) @@ -24,6 +27,7 @@ def run_b9b10(state: dict) -> None: new: list[dict] = [] new.extend(check_multi_entity_impressum(state)) new.extend(check_transfer_mechanism(state)) + new.extend(check_ai_retention_granularity(state)) if not new: return extras.extend(new) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index c3252f40..5d174030 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -51,6 +51,8 @@ async def run_compliance_check(check_id: str, req) -> None: await run_phase_b(state) # Phase C: Step 3b-d (banner + cross-check + TCF) + Step 4 await run_phase_c(state) + # Phase C-2: optional browser-matrix scan (env BROWSER_MATRIX=true) + await run_phase_c2(state) # Phase D-1/D-2: Step 5 vendor extraction + finalize await run_phase_d1(state) await run_phase_d2(state) diff --git a/backend-compliance/compliance/api/agent_check/_phase_c2_browser_matrix.py b/backend-compliance/compliance/api/agent_check/_phase_c2_browser_matrix.py new file mode 100644 index 00000000..820aaea3 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_phase_c2_browser_matrix.py @@ -0,0 +1,146 @@ +"""Phase C-2 — Browser-Matrix Multi-Browser Scan (Stage 1.c). + +After the single-browser scan in Phase C, optionally fan out to the +consent-tester /scan-matrix endpoint that runs the same probe on +chromium / firefox / webkit / mobile-safari and returns a worst-of +robustness score per browser. + +Activated by env `BROWSER_MATRIX=true`. Default off so existing +runs aren't slowed down 4× while we tune. + +The state gets these new keys: + + state["browser_matrix"] list[dict] per-profile results + state["browser_aggregate"] dict worst/best score + verbal + state["browser_matrix_html"] str pre-rendered V2 block +""" + +from __future__ import annotations + +import logging +import os +from html import escape as h + +import httpx + +from ._constants import CONSENT_TESTER_URL +from ._helpers import _update + +logger = logging.getLogger(__name__) + + +async def run_phase_c2(state: dict) -> None: + if os.environ.get("BROWSER_MATRIX", "false").lower() not in ( + "true", "1", "yes", "on", + ): + return + check_id = state["check_id"] + req = state["req"] + banner_url = "" + for d in req.documents: + if d.url: + from urllib.parse import urlparse + p = urlparse(d.url) + if p.scheme and p.netloc: + banner_url = f"{p.scheme}://{p.netloc}" + break + if not banner_url: + return + + _update(check_id, "Browser-Matrix: Multi-Engine-Scan...", 83) + + profiles_env = os.environ.get("BROWSER_MATRIX_PROFILES", "") + profiles = [p.strip() for p in profiles_env.split(",") if p.strip()] or None + + try: + async with httpx.AsyncClient(timeout=600.0) as c: + r = await c.post( + f"{CONSENT_TESTER_URL}/scan-matrix", + json={ + "url": banner_url, + "timeout_per_phase": 10, + "categories": [], + "browser_profiles": profiles, + }, + ) + if r.status_code != 200: + logger.warning("browser-matrix scan HTTP %d", r.status_code) + return + data = r.json() + except Exception as e: + logger.warning("browser-matrix scan failed: %s", e) + return + + state["browser_matrix"] = data.get("browser_matrix") or [] + state["browser_aggregate"] = data.get("aggregate") or {} + state["browser_matrix_html"] = _render( + state["browser_matrix"], state["browser_aggregate"], + ) + logger.info( + "browser-matrix: %d profiles, worst=%s@%s%%, best=%s@%s%%", + len(state["browser_matrix"]), + state["browser_aggregate"].get("worst_profile"), + state["browser_aggregate"].get("worst_score"), + state["browser_aggregate"].get("best_profile"), + state["browser_aggregate"].get("best_score"), + ) + + +def _render(rows: list[dict], aggregate: dict) -> str: + if not rows: + return "" + table_rows = [] + for r in rows: + sev = ("fail" if r["score"] < 60 + else "warn" if r["score"] < 80 else "pass") + color = ("#dc2626" if sev == "fail" + else "#f59e0b" if sev == "warn" else "#15803d") + dims = r.get("dimensions") or {} + dims_str = ( + f"PC {int(dims.get('pre_consent',0)*100)}% · " + f"RR {int(dims.get('reject_respect',0)*100)}% · " + f"BD {int(dims.get('banner_design',0)*100)}%" + ) + table_rows.append( + "" + f"" + f"{h(r.get('label') or r.get('profile_id') or '—')}" + f"{r['score']}%" + f"{dims_str}" + f"{h(r.get('verbal','—'))}" + "" + ) + worst = aggregate.get("worst_score", 0) + sev_color = ("#dc2626" if worst < 60 + else "#f59e0b" if worst < 80 else "#15803d") + head = ( + f"

" + f"Worst-of {worst}% " + f"(Profil {aggregate.get('worst_profile','—')}) — " + f"Best-of {aggregate.get('best_score','—')}% " + f"({aggregate.get('best_profile','—')}). " + "Aggregierter Score nach Worst-of-Regel: ein HIGH-Verstoß " + "auf einem Browser kappt den Gesamt-Score.

" + ) + return ( + "
" + "

" + "🌐 Browser-Matrix · Consent-Robustness pro Engine" + "

" + f"{head}" + "" + "" + "" + "" + "" + "" + "" + f"{''.join(table_rows)}
Browser-ProfilScorePre-Consent · " + "Reject-Respekt · Banner-DesignBewertung
" + "
" + ) diff --git a/backend-compliance/compliance/services/ai_retention_granularity_check.py b/backend-compliance/compliance/services/ai_retention_granularity_check.py new file mode 100644 index 00000000..fe2ae118 --- /dev/null +++ b/backend-compliance/compliance/services/ai_retention_granularity_check.py @@ -0,0 +1,116 @@ +"""B11 — AI-Retention-Granularity-Check (TH-RETENTION-002). + +DSGVO Art. 13 Abs. 2 lit. a + DSK-Empfehlung: pro Datenkategorie +eine spezifische Speicherdauer. Eine pauschale Angabe wie +"6 Monate für alle Daten" reicht nicht. + +GT-Pattern Elli: + Vertex-AI-Chatbot speichert "IT- und pseudonymisierte + Nutzungsdaten" pauschal 6 Monate. Keine Abstufung nach + Datenkategorie (Texteingaben / IP / Geräteinformationen / + Session-ID / Fehlerprotokolle). + +Heuristik: + 1. AI-Kontext erkennen (vertex ai / openai / claude / etc.) + 2. In ±600-char-Window prüfen: + - Existiert eine Speicherdauer-Aussage? (parse_duration_to_days) + - Werden ≥2 Datenkategorien aus AI-Standardliste genannt? + (Texteingaben, IP, Geräteinformationen, Session, Fehlerprotokolle) + - Wenn 1 Speicherdauer + ≥2 Kategorien aber kein + per-Kategorie-Differential → LOW +""" + +from __future__ import annotations + +import logging +import re + +from .retention_comparator import parse_duration_to_days + +logger = logging.getLogger(__name__) + + +_AI_PROVIDERS = ( + "vertex ai", "google vertex", "openai", "gpt-3", "gpt-4", "chatgpt", + "anthropic", "claude.ai", "claude-3", "mistral ai", + "ki-assistent", "ki assistent", "ai assistant", +) + + +_AI_DATA_CATEGORIES = ( + "texteingab", # Texteingaben / Texteingabe + "chatverlauf", "chatverläuf", + "ip-adress", + "geräteinform", "geraeteinform", "device-info", + "session-id", "sitzungs-id", + "browserversion", "user-agent", + "fehlerprotokoll", + "zeitstempel", +) + + +def _per_category_phrases() -> tuple[str, ...]: + """Patterns indicating per-category retention is mentioned.""" + return ( + "pro datenkategorie", + "je datenkategorie", + "unterschiedlich je", + "abhängig vom datentyp", + "abhaengig vom datentyp", + "differenziert nach", + "pro kategorie", + ) + + +def check_ai_retention_granularity(state: dict) -> list[dict]: + doc_texts = state.get("doc_texts") or {} + dse = (doc_texts.get("dse") or "").lower() + if not dse: + return [] + findings: list[dict] = [] + for ai_kw in _AI_PROVIDERS: + idx = dse.find(ai_kw) + if idx < 0: + continue + window = dse[max(0, idx - 800): idx + 800] + if not window: + continue + categories_found = [c for c in _AI_DATA_CATEGORIES if c in window] + if len(categories_found) < 2: + continue + # Per-category retention phrase already present? then OK + if any(p in window for p in _per_category_phrases()): + return [] + # Retention-claim in window? parse duration + m = re.search( + r"(\d+(?:[.,]\d+)?\s*(?:tage?|monat\w*|jahre?|" + r"day|month|year))", window, + ) + if not m: + continue + days, kind = parse_duration_to_days(m.group(1)) + if days is None: + continue + findings.append({ + "check_id": "TH-RETENTION-GRANULARITY-001", + "severity": "LOW", + "severity_reason": "incomplete", + "title": ( + "AI-Speicherdauer pauschal — pro Datenkategorie " + "differenzieren empfohlen" + ), + "norm": "DSGVO Art. 13 Abs. 2 lit. a + DSK-OH AI", + "ai_provider": ai_kw, + "retention_days": int(days), + "categories_detected": categories_found, + "action": ( + f"Für '{ai_kw}'-Kontext separate Speicherdauern je " + f"Datenkategorie angeben (Texteingaben / IP / " + f"Geräteinformationen / Session). Aktuell pauschal " + f"{int(days)} Tage." + ), + }) + break # one per DSE is enough + if findings: + logger.info("B11 AI-retention-granularity: %d findings", len(findings)) + return findings diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index 502933bf..430c1f61 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -44,8 +44,10 @@ def compose_v2(state: dict) -> str: state.get("vendor_consistency_html", ""), # B5 — AI-Act Art. 50 Transparenzpflicht state.get("ai_act_html", ""), - # B6/B7/B8 — DPO-cross-doc + Doc-Staleness + CMP-fingerprint + # B6/B7/B8/B9/B10 — DPO + Staleness + CMP + MultiEntity + Transfer state.get("extra_findings_html", ""), + # Browser-Matrix (Stage 1.c) + state.get("browser_matrix_html", ""), # All legacy build_*_html() wrapped in V2 sections — preserves # every information block from the old renderer (Exec Summary, # Banner-Screenshot, VVT, Redundancy, Solutions, Diff, etc.) diff --git a/backend-compliance/compliance/services/specialist_agents/__init__.py b/backend-compliance/compliance/services/specialist_agents/__init__.py new file mode 100644 index 00000000..0c72e364 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/__init__.py @@ -0,0 +1,17 @@ +"""Doc-Type Specialist-Agents — Phase 1 Prototyp. + +Architektur: + - Pro Doc-Type ein Spezialist-Agent mit System-Prompt (Domänenwissen) + + Knowledge-Base (anonymisierte Patterns/Statistiken aus + Multi-Mandanten-Daten) + - Jeder Agent liefert strukturierte Findings → enriched state + - Ein Cross-Doc-Router-Agent prüft ob Absätze falsch zugeordnet sind + ("Cookie-Inhalt steht in AGB statt Cookie-Richtlinie") + +Phase 1: Impressum-Agent als Prototyp (Pattern-Match-only, ohne LLM). +Phase 2: DSE-Agent + Cross-Doc-Router (LLM-gestützt). +Phase 3+: Weitere Doc-Types + Continuous Learning der KB. + +Privacy: KB enthält NIEMALS Roh-Mandantendaten. Anonymisierung + +Aggregation Pflicht (NER-Maskierung vor KB-Speicher). +""" diff --git a/backend-compliance/compliance/services/specialist_agents/impressum_agent.py b/backend-compliance/compliance/services/specialist_agents/impressum_agent.py new file mode 100644 index 00000000..e0adec28 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/impressum_agent.py @@ -0,0 +1,159 @@ +"""Impressum-Specialist-Agent Phase-1 Prototyp. + +Pattern-Match-only (kein LLM). Demonstriert die Architektur: + - Knowledge-Base mit § 5 TMG/DDG-Pflichtangaben + - Pattern-Library für Erkennung + - strukturierte Findings mit Norm + Action + +Phase 2 wird denselben Output produzieren, aber LLM-gestützt mit +Domain-spezifischem System-Prompt + Cross-Customer-KB. + +KB-Beispiel-Einträge: + - HR-Format DE: HR[BA] + - USt-IdNr-Format DE: DE\\d{9} + - Aufsichtsbehörden-Liste (Branchen) + - DSB-Adressformat +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + + +# Pflichtangaben nach § 5 TMG + § 1 DL-InfoV +PFLICHTANGABEN = { + "name_anbieter": { + "label": "Name + Anschrift des Anbieters", + "norm": "§ 5 Abs. 1 Nr. 1 TMG", + "patterns": [ + re.compile(r"\b(?:Anbieter|Diensteanbieter|" + r"Verantwortlich(?:er Anbieter)?)\s*[:.\s]", + re.IGNORECASE), + ], + "severity_if_missing": "HIGH", + }, + "kontakt_email": { + "label": "Email-Adresse", + "norm": "§ 5 Abs. 1 Nr. 2 TMG", + "patterns": [ + re.compile(r"\b[\w.+-]+@[\w-]+\.[a-z]{2,}\b", re.IGNORECASE), + ], + "severity_if_missing": "HIGH", + }, + "kontakt_telefon": { + "label": "Telefon", + "norm": "§ 5 Abs. 1 Nr. 2 TMG", + "patterns": [ + re.compile(r"(?:Tel(?:efon)?|Phone)\.?\s*[:.\s]\s*[\+\d][\d\s/\-()]{5,}", + re.IGNORECASE), + ], + "severity_if_missing": "MEDIUM", + }, + "handelsregister": { + "label": "Handelsregister-Eintrag", + "norm": "§ 5 Abs. 1 Nr. 4 TMG", + "patterns": [ + re.compile(r"\bHR[BA]\s+\d", re.IGNORECASE), + re.compile(r"Handelsregister", re.IGNORECASE), + ], + "severity_if_missing": "HIGH", + }, + "ust_id": { + "label": "USt-IdNr", + "norm": "§ 5 Abs. 1 Nr. 6 TMG", + "patterns": [ + re.compile(r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]", + re.IGNORECASE), + re.compile(r"\bDE\d{9}\b"), + ], + "severity_if_missing": "MEDIUM", + }, + "vertretungsberechtigte": { + "label": "Vertretungsberechtigte Person", + "norm": "§ 5 Abs. 1 Nr. 1 TMG (juristische Personen)", + "patterns": [ + re.compile(r"(?:Geschäftsführer|Vertretungsberechtigt|" + r"vertreten\s+durch)\s*[:.\s]", + re.IGNORECASE), + ], + "severity_if_missing": "HIGH", + }, + "aufsichtsbehoerde": { + "label": "Aufsichtsbehörde (regulierte Branchen)", + "norm": "§ 5 Abs. 1 Nr. 3 TMG (Branchen-bedingt)", + "patterns": [ + re.compile(r"Aufsichtsbeh(?:ö|oe)rde\s*[:.\s]", re.IGNORECASE), + re.compile(r"\bBAFin\b|\bBNetzA\b|\bLKA\b", re.IGNORECASE), + ], + "severity_if_missing": "LOW", + }, + "berufsangaben": { + "label": "Berufsbezeichnung + Berufsrechtliche Angaben", + "norm": "§ 5 Abs. 1 Nr. 5 TMG (Kammerberufe)", + "patterns": [ + re.compile(r"Berufsbezeichnung|Berufsordnung|Kammer", + re.IGNORECASE), + ], + "severity_if_missing": "LOW", + }, + "odr_link": { + "label": "OS-Link auf EU-Plattform", + "norm": "Art. 14 EU-VO 524/2013 (B2C-Onlineshops)", + "patterns": [ + re.compile(r"ec\.europa\.eu/consumers/odr", re.IGNORECASE), + ], + "severity_if_missing": "MEDIUM", + }, +} + + +def evaluate(impressum_text: str, + business_scope: set[str] | None = None) -> list[dict]: + """Run Impressum-Agent against the doc text. + + Returns a list of finding dicts; empty when all Pflichtangaben + present. `business_scope` controls which optional checks run + (e.g. OS-Link only for B2C ecommerce). + """ + if not impressum_text: + return [] + business_scope = business_scope or set() + findings: list[dict] = [] + for field_id, spec in PFLICHTANGABEN.items(): + # Skip context-dependent fields when scope doesn't match + if field_id == "odr_link" and "ecommerce" not in business_scope: + continue + if field_id == "aufsichtsbehoerde" and ( + "regulated_profession" not in business_scope + and "financial_services" not in business_scope + and "insurance" not in business_scope + ): + continue + if field_id == "berufsangaben" and ( + "regulated_profession" not in business_scope + ): + continue + found = any(p.search(impressum_text) for p in spec["patterns"]) + if found: + continue + findings.append({ + "check_id": f"IMPRESSUM-AGENT-{field_id.upper()}", + "agent": "impressum_agent_v1", + "field_id": field_id, + "severity": spec["severity_if_missing"], + "severity_reason": "missing", + "title": f"Pflichtangabe '{spec['label']}' fehlt im Impressum", + "norm": spec["norm"], + "action": ( + f"{spec['label']} im Impressum ergänzen " + f"(Pflichtangabe nach {spec['norm']})." + ), + }) + if findings: + logger.info( + "impressum_agent: %d findings (kein LLM, KB v1)", len(findings), + ) + return findings diff --git a/consent-tester/main.py b/consent-tester/main.py index 401f2b4f..216ebabe 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -61,7 +61,9 @@ class ScanResponse(BaseModel): from routes_matrix import router as matrix_router +from routes_mobile import router as mobile_router app.include_router(matrix_router) +app.include_router(mobile_router) @app.get("/health") diff --git a/consent-tester/routes_mobile.py b/consent-tester/routes_mobile.py new file mode 100644 index 00000000..8334a5b0 --- /dev/null +++ b/consent-tester/routes_mobile.py @@ -0,0 +1,29 @@ +"""POST /scan-mobile-reachability — B1 Playwright Mobile-Verifikation. + +Eigenes Modul-File damit main.py unter 500 LOC bleibt. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from fastapi import APIRouter +from pydantic import BaseModel + +from services.mobile_reachability_scanner import scan_mobile_reachability + +logger = logging.getLogger(__name__) +router = APIRouter() + + +class MobileReachReq(BaseModel): + url: str + + +@router.post("/scan-mobile-reachability") +async def scan_mobile(req: MobileReachReq): + logger.info("Mobile-reachability scan for %s", req.url) + res = await scan_mobile_reachability(req.url) + res["scanned_at"] = datetime.now(timezone.utc).isoformat() + return res diff --git a/consent-tester/services/mobile_reachability_scanner.py b/consent-tester/services/mobile_reachability_scanner.py new file mode 100644 index 00000000..7e15c67d --- /dev/null +++ b/consent-tester/services/mobile_reachability_scanner.py @@ -0,0 +1,163 @@ +"""B1 Mobile Reachability — echter Playwright-Scan auf iPhone-Emulation. + +Ersetzt den statischen HTTP-Fetch im Backend-B1-Wiring durch eine +echte WebKit-Browser-Session mit `devices['iPhone 15']`-Preset. Misst: + + - hat Footer einen Reopen-Anchor (Text/aria-label/onclick)? + - Tap-Target-Größe (boundingBox in px) — Apple HIG 44pt = ≥44 px + - Click-Behavior: öffnet sich der CMP direkt? (DOM-Mutation + + Modal-Detection nach 2s) + +Output schema (für Backend-B1 ersetzbar mit statischer Logik): + + { + "url": str, + "has_anchor": bool, + "anchor_text": str, + "tap_target_px": {"w": int, "h": int} | None, + "click_opens_cmp": bool, + "modal_selector": str | None, + "screenshot_b64": str (initial Footer-Crop), + "engine_meta": {"engine": "webkit", "device": "iPhone 15", + "user_agent": str, "viewport": str}, + } +""" + +from __future__ import annotations + +import base64 +import logging +from typing import Any + +logger = logging.getLogger(__name__) + +# Phrasen für Footer-Anchor-Suche (mirror des Backend-Service) +_REOPEN_PHRASES = ( + "cookie-einstellungen", "cookie einstellungen", + "cookie-präferenzen", "cookie-praeferenzen", + "cookie-einwilligung", + "einwilligung verwalten", + "consent manager", "consent settings", "consent-einstellungen", + "datenschutz-einstellungen", "datenschutzeinstellungen", + "cookies verwalten", "manage cookies", "manage preferences", + "privacy settings", "privacy preferences", + "tracking-einstellungen", +) + + +async def scan_mobile_reachability(url: str) -> dict[str, Any]: + """Run Mobile-Safari emulation + footer reachability check.""" + try: + from playwright.async_api import async_playwright + except Exception as e: + logger.warning("playwright not available: %s", e) + return {"url": url, "error": "playwright missing"} + + async with async_playwright() as p: + device_preset = p.devices.get("iPhone 15") or {} + browser = await p.webkit.launch(headless=True) + try: + context = await browser.new_context( + **device_preset, + locale="de-DE", + timezone_id="Europe/Berlin", + ) + page = await context.new_page() + try: + await page.goto(url, wait_until="domcontentloaded", + timeout=30000) + except Exception as e: + return {"url": url, "error": f"goto failed: {e}"[:200]} + try: + await page.wait_for_timeout(1500) + except Exception: + pass + + ua = await page.evaluate("() => navigator.userAgent") + viewport = page.viewport_size or {} + engine_meta = { + "engine": "webkit", + "device": "iPhone 15", + "user_agent": ua, + "viewport": f"{viewport.get('width','?')}x{viewport.get('height','?')}", + } + + # Find footer reopen anchor by text matching + anchor_loc = None + for phrase in _REOPEN_PHRASES: + try: + candidate = page.locator( + f"footer >> text=/{phrase}/i" + ).first + if await candidate.count() > 0: + anchor_loc = candidate + anchor_text = phrase + break + except Exception: + continue + + result: dict[str, Any] = { + "url": url, + "has_anchor": False, + "anchor_text": "", + "tap_target_px": None, + "click_opens_cmp": False, + "modal_selector": None, + "engine_meta": engine_meta, + } + + if anchor_loc is None: + # Capture footer crop + try: + footer = page.locator("footer").first + if await footer.count() > 0: + png = await footer.screenshot() + result["screenshot_b64"] = base64.b64encode( + png, + ).decode("ascii")[:120000] + except Exception: + pass + return result + + result["has_anchor"] = True + result["anchor_text"] = anchor_text + + try: + box = await anchor_loc.bounding_box() + if box: + result["tap_target_px"] = { + "w": int(box["width"]), "h": int(box["height"]), + } + except Exception: + pass + + # DOM-Modal-Snapshot vorher + try: + before_modals = await page.evaluate( + "() => Array.from(document.querySelectorAll(" + "'[role=dialog],[aria-modal=true],.cmp-modal," + ".ot-sdk-container,#usercentrics-cmp')).length" + ) + except Exception: + before_modals = 0 + + # Klick + warten + try: + await anchor_loc.click(timeout=5000) + await page.wait_for_timeout(2000) + after_modals = await page.evaluate( + "() => Array.from(document.querySelectorAll(" + "'[role=dialog],[aria-modal=true],.cmp-modal," + ".ot-sdk-container,#usercentrics-cmp')).length" + ) + if after_modals > before_modals: + result["click_opens_cmp"] = True + result["modal_selector"] = ( + "[role=dialog] | [aria-modal=true] | cmp-modal" + ) + except Exception as e: + logger.info("anchor click skipped: %s", e) + + return result + finally: + await browser.close()