feat: Browser-Matrix C2 + B11 AI-Retention + Impressum-Specialist-Agent + B1 Mobile Playwright

Task #15 Stage 1.c-e — Browser-Matrix Backend-Integration:
  - _phase_c2_browser_matrix.py: ruft consent-tester /scan-matrix wenn
    env BROWSER_MATRIX=true, fuellt state["browser_matrix"] +
    state["browser_aggregate"] + state["browser_matrix_html"]
  - V2-Mail-Block: 🌐 Browser-Matrix Tabelle (Profile · Score ·
    Sub-Scores PC/RR/BD · Bewertung) mit Worst-of-Header
  - Orchestrator ruft run_phase_c2 nach run_phase_c
  KNOWN: Stage 1.b (consent_scanner browser_profile-Param) bleibt
    zurueckgestellt (Datei in loc-exception, Hook-Patch verweigert).
    Stage 1.a-Shim laeuft im consent-tester — alle Profile aktuell
    auf Chromium, echte Engine-Diversitaet kommt mit 1.b.

Task #17 TH-RETENTION-002 als B11 ai_retention_granularity_check:
  - Erkennt AI-Provider-Kontext (vertex/openai/anthropic/etc)
  - In +-800-char-Window: prueft ≥2 Datenkategorien aus Standard-Liste
    (Texteingaben/IP/Geraet/Session/Fehlerprotokoll/Zeitstempel)
  - Wenn 1 pauschale Speicherdauer + ≥2 Kategorien aber kein
    per-Kategorie-Differential → LOW
  - Smoke: Elli-Mock-DSE trifft LOW "AI-Speicherdauer pauschal"

Task #18 Specialist-Agents Phase-1-Prototyp:
  - compliance/services/specialist_agents/__init__.py mit Architektur-Doku
  - impressum_agent.py: 9 Pflichtangaben § 5 TMG + § 1 DL-InfoV
    als Pattern-Registry (Name, Email, Telefon, HR, USt-IdNr,
    Vertretungsberechtigt, Aufsichtsbehoerde, Berufsangaben, OS-Link)
  - business_scope-aware (OS-Link nur fuer ecommerce, Aufsichtsbehoerde
    nur fuer regulated_profession/financial/insurance)
  - Phase-1 ist Pattern-Match-only (kein LLM), demonstriert die
    Schnittstelle. Phase 2 ersetzt Pattern durch System-Prompt + KB.
  - Smoke: minimal-Impressum triggert 4 Findings korrekt

Task #7 B1 Playwright Mobile-Verifikation:
  - consent-tester/services/mobile_reachability_scanner.py: echte
    WebKit-launch + p.devices['iPhone 15'] preset + de-DE locale +
    Europe/Berlin timezone
  - Footer-Anchor-Suche via locator("footer >> text=/.../i") fuer
    13 Reopen-Phrasen
  - Tap-Target-Boundingbox-Messung (Apple HIG / WCAG ≥44x44)
  - Click-Behavior: DOM-Modal-Snapshot vor/nach, erkennt CMP-Open
  - Output: has_anchor, anchor_text, tap_target_px, click_opens_cmp,
    engine_meta, screenshot_b64 (Footer-Crop wenn kein Anchor)
  - consent-tester/routes_mobile.py POST /scan-mobile-reachability
  - Backend _b1_wiring erweitert: ruft Mobile-Endpoint zuerst,
    Fallback auf statischen HTTP-Fetch. Mobile-Daten enrichen
    finding.mobile_playwright + Severity-Bump bei
    tap-target<44 / click-doesnt-open-CMP.
  KNOWN: WebKit-System-Libs sind im Dockerfile ergaenzt (Stage 1.a-
    Commit), greifen aber erst nach CI/CD-Rebuild des consent-tester.
    Bis dahin faellt B1 sauber auf statischen Fetch zurueck.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-06 22:20:25 +02:00
parent e1dadc8027
commit 37093ff9e3
11 changed files with 702 additions and 7 deletions
@@ -42,6 +42,29 @@ async def run_b1(state: dict) -> None:
return
_update(check_id, "Mobile Consent-Reachability prüfen...", 95)
# Try the new Playwright WebKit + iPhone scan first (Task #7).
# Falls back to static HTTP fetch on error.
mobile = None
try:
from ._constants import CONSENT_TESTER_URL
async with httpx.AsyncClient(timeout=60.0) as c:
r = await c.post(
f"{CONSENT_TESTER_URL}/scan-mobile-reachability",
json={"url": homepage_url},
)
if r.status_code == 200:
mobile = r.json()
logger.info(
"B1 Mobile-Playwright: has_anchor=%s tap=%s click_opens=%s",
mobile.get("has_anchor"),
mobile.get("tap_target_px"),
mobile.get("click_opens_cmp"),
)
except Exception as e:
logger.info("B1 Mobile-Playwright fallback to static fetch: %s", e)
page_html = None
try:
async with httpx.AsyncClient(
timeout=20.0, follow_redirects=True,
@@ -50,21 +73,53 @@ async def run_b1(state: dict) -> None:
"Version/17.5 Mobile/15E148 Safari/604.1"},
) as c:
r = await c.get(homepage_url)
if r.status_code != 200:
logger.info("B1: homepage fetch %s → HTTP %d", homepage_url, r.status_code)
return
page_html = r.text
if r.status_code == 200:
page_html = r.text
except Exception as e:
logger.warning("B1: homepage fetch failed: %s", e)
if not page_html and not mobile:
return
finding = evaluate_reachability(page_html, homepage_url)
finding = evaluate_reachability(page_html or "", homepage_url)
# Enrich finding with mobile-playwright details when available
if mobile and mobile.get("has_anchor"):
finding["mobile_playwright"] = {
"has_anchor": mobile.get("has_anchor"),
"anchor_text": mobile.get("anchor_text"),
"tap_target_px": mobile.get("tap_target_px"),
"click_opens_cmp": mobile.get("click_opens_cmp"),
"engine_meta": mobile.get("engine_meta"),
}
# Tap-target rule (Apple HIG / WCAG 2.5.5): ≥ 44 px each side
tp = mobile.get("tap_target_px") or {}
if tp and (tp.get("w", 0) < 44 or tp.get("h", 0) < 44):
finding["notes"] = (finding.get("notes") or []) + [
f"tap-target nur {tp.get('w')}×{tp.get('h')}px "
"(Apple HIG / WCAG verlangen ≥ 44×44)",
]
if finding.get("passed"):
finding["passed"] = False
finding["severity"] = "MEDIUM"
finding["severity_reason"] = "misclassified"
# If anchor exists in DOM but click doesn't open CMP, bump severity
if mobile.get("has_anchor") and not mobile.get("click_opens_cmp"):
finding["notes"] = (finding.get("notes") or []) + [
"click auf Footer-Link öffnet CMP nicht direkt",
]
if finding.get("severity_reason") != "factually_wrong":
finding["severity"] = "MEDIUM"
finding["severity_reason"] = "misclassified"
finding["passed"] = False
state["reachability_finding"] = finding
state["reachability_html"] = _render_block(finding)
logger.info(
"B1 Reachability: passed=%s severity=%s reason=%s",
"B1 Reachability: passed=%s severity=%s reason=%s mobile=%s",
finding["passed"], finding.get("severity"),
finding.get("severity_reason"),
bool(mobile),
)
@@ -9,6 +9,9 @@ from __future__ import annotations
import html
import logging
from compliance.services.ai_retention_granularity_check import (
check_ai_retention_granularity,
)
from compliance.services.impressum_multi_entity_check import (
check_multi_entity_impressum,
)
@@ -24,6 +27,7 @@ def run_b9b10(state: dict) -> None:
new: list[dict] = []
new.extend(check_multi_entity_impressum(state))
new.extend(check_transfer_mechanism(state))
new.extend(check_ai_retention_granularity(state))
if not new:
return
extras.extend(new)
@@ -51,6 +51,8 @@ async def run_compliance_check(check_id: str, req) -> None:
await run_phase_b(state)
# Phase C: Step 3b-d (banner + cross-check + TCF) + Step 4
await run_phase_c(state)
# Phase C-2: optional browser-matrix scan (env BROWSER_MATRIX=true)
await run_phase_c2(state)
# Phase D-1/D-2: Step 5 vendor extraction + finalize
await run_phase_d1(state)
await run_phase_d2(state)
@@ -0,0 +1,146 @@
"""Phase C-2 — Browser-Matrix Multi-Browser Scan (Stage 1.c).
After the single-browser scan in Phase C, optionally fan out to the
consent-tester /scan-matrix endpoint that runs the same probe on
chromium / firefox / webkit / mobile-safari and returns a worst-of
robustness score per browser.
Activated by env `BROWSER_MATRIX=true`. Default off so existing
runs aren't slowed down 4× while we tune.
The state gets these new keys:
state["browser_matrix"] list[dict] per-profile results
state["browser_aggregate"] dict worst/best score + verbal
state["browser_matrix_html"] str pre-rendered V2 block
"""
from __future__ import annotations
import logging
import os
from html import escape as h
import httpx
from ._constants import CONSENT_TESTER_URL
from ._helpers import _update
logger = logging.getLogger(__name__)
async def run_phase_c2(state: dict) -> None:
if os.environ.get("BROWSER_MATRIX", "false").lower() not in (
"true", "1", "yes", "on",
):
return
check_id = state["check_id"]
req = state["req"]
banner_url = ""
for d in req.documents:
if d.url:
from urllib.parse import urlparse
p = urlparse(d.url)
if p.scheme and p.netloc:
banner_url = f"{p.scheme}://{p.netloc}"
break
if not banner_url:
return
_update(check_id, "Browser-Matrix: Multi-Engine-Scan...", 83)
profiles_env = os.environ.get("BROWSER_MATRIX_PROFILES", "")
profiles = [p.strip() for p in profiles_env.split(",") if p.strip()] or None
try:
async with httpx.AsyncClient(timeout=600.0) as c:
r = await c.post(
f"{CONSENT_TESTER_URL}/scan-matrix",
json={
"url": banner_url,
"timeout_per_phase": 10,
"categories": [],
"browser_profiles": profiles,
},
)
if r.status_code != 200:
logger.warning("browser-matrix scan HTTP %d", r.status_code)
return
data = r.json()
except Exception as e:
logger.warning("browser-matrix scan failed: %s", e)
return
state["browser_matrix"] = data.get("browser_matrix") or []
state["browser_aggregate"] = data.get("aggregate") or {}
state["browser_matrix_html"] = _render(
state["browser_matrix"], state["browser_aggregate"],
)
logger.info(
"browser-matrix: %d profiles, worst=%s@%s%%, best=%s@%s%%",
len(state["browser_matrix"]),
state["browser_aggregate"].get("worst_profile"),
state["browser_aggregate"].get("worst_score"),
state["browser_aggregate"].get("best_profile"),
state["browser_aggregate"].get("best_score"),
)
def _render(rows: list[dict], aggregate: dict) -> str:
if not rows:
return ""
table_rows = []
for r in rows:
sev = ("fail" if r["score"] < 60
else "warn" if r["score"] < 80 else "pass")
color = ("#dc2626" if sev == "fail"
else "#f59e0b" if sev == "warn" else "#15803d")
dims = r.get("dimensions") or {}
dims_str = (
f"PC {int(dims.get('pre_consent',0)*100)}% · "
f"RR {int(dims.get('reject_respect',0)*100)}% · "
f"BD {int(dims.get('banner_design',0)*100)}%"
)
table_rows.append(
"<tr>"
f"<td style='padding:6px 10px;border-bottom:1px solid #e5e7eb;'>"
f"{h(r.get('label') or r.get('profile_id') or '')}</td>"
f"<td style='padding:6px 10px;border-bottom:1px solid #e5e7eb;"
f"color:{color};font-weight:600;'>{r['score']}%</td>"
f"<td style='padding:6px 10px;border-bottom:1px solid #e5e7eb;"
f"color:#475569;font-size:12px;'>{dims_str}</td>"
f"<td style='padding:6px 10px;border-bottom:1px solid #e5e7eb;"
f"color:#475569;font-size:13px;'>{h(r.get('verbal',''))}</td>"
"</tr>"
)
worst = aggregate.get("worst_score", 0)
sev_color = ("#dc2626" if worst < 60
else "#f59e0b" if worst < 80 else "#15803d")
head = (
f"<p style='margin:0 0 8px;font-size:13px;color:#475569;'>"
f"<strong style='color:{sev_color};'>Worst-of {worst}%</strong> "
f"(Profil <code>{aggregate.get('worst_profile','')}</code>) — "
f"Best-of {aggregate.get('best_score','')}% "
f"(<code>{aggregate.get('best_profile','')}</code>). "
"Aggregierter Score nach Worst-of-Regel: ein HIGH-Verstoß "
"auf einem Browser kappt den Gesamt-Score.</p>"
)
return (
"<div style='margin:24px 0;padding:16px;border-left:4px solid "
f"{sev_color};background:#f8fafc;border-radius:4px;'>"
"<h2 style='margin:0 0 8px;color:#1e293b;font-size:16px;'>"
"🌐 Browser-Matrix · Consent-Robustness pro Engine"
"</h2>"
f"{head}"
"<table style='width:100%;border-collapse:collapse;font-size:13px;"
"margin-top:8px;background:#fff;'>"
"<thead><tr style='background:#f1f5f9;'>"
"<th style='text-align:left;padding:6px 10px;'>Browser-Profil</th>"
"<th style='text-align:left;padding:6px 10px;'>Score</th>"
"<th style='text-align:left;padding:6px 10px;'>Pre-Consent · "
"Reject-Respekt · Banner-Design</th>"
"<th style='text-align:left;padding:6px 10px;'>Bewertung</th>"
"</tr></thead>"
f"<tbody>{''.join(table_rows)}</tbody></table>"
"</div>"
)
@@ -0,0 +1,116 @@
"""B11 — AI-Retention-Granularity-Check (TH-RETENTION-002).
DSGVO Art. 13 Abs. 2 lit. a + DSK-Empfehlung: pro Datenkategorie
eine spezifische Speicherdauer. Eine pauschale Angabe wie
"6 Monate für alle Daten" reicht nicht.
GT-Pattern Elli:
Vertex-AI-Chatbot speichert "IT- und pseudonymisierte
Nutzungsdaten" pauschal 6 Monate. Keine Abstufung nach
Datenkategorie (Texteingaben / IP / Geräteinformationen /
Session-ID / Fehlerprotokolle).
Heuristik:
1. AI-Kontext erkennen (vertex ai / openai / claude / etc.)
2. In ±600-char-Window prüfen:
- Existiert eine Speicherdauer-Aussage? (parse_duration_to_days)
- Werden ≥2 Datenkategorien aus AI-Standardliste genannt?
(Texteingaben, IP, Geräteinformationen, Session, Fehlerprotokolle)
- Wenn 1 Speicherdauer + ≥2 Kategorien aber kein
per-Kategorie-Differential → LOW
"""
from __future__ import annotations
import logging
import re
from .retention_comparator import parse_duration_to_days
logger = logging.getLogger(__name__)
_AI_PROVIDERS = (
"vertex ai", "google vertex", "openai", "gpt-3", "gpt-4", "chatgpt",
"anthropic", "claude.ai", "claude-3", "mistral ai",
"ki-assistent", "ki assistent", "ai assistant",
)
_AI_DATA_CATEGORIES = (
"texteingab", # Texteingaben / Texteingabe
"chatverlauf", "chatverläuf",
"ip-adress",
"geräteinform", "geraeteinform", "device-info",
"session-id", "sitzungs-id",
"browserversion", "user-agent",
"fehlerprotokoll",
"zeitstempel",
)
def _per_category_phrases() -> tuple[str, ...]:
"""Patterns indicating per-category retention is mentioned."""
return (
"pro datenkategorie",
"je datenkategorie",
"unterschiedlich je",
"abhängig vom datentyp",
"abhaengig vom datentyp",
"differenziert nach",
"pro kategorie",
)
def check_ai_retention_granularity(state: dict) -> list[dict]:
doc_texts = state.get("doc_texts") or {}
dse = (doc_texts.get("dse") or "").lower()
if not dse:
return []
findings: list[dict] = []
for ai_kw in _AI_PROVIDERS:
idx = dse.find(ai_kw)
if idx < 0:
continue
window = dse[max(0, idx - 800): idx + 800]
if not window:
continue
categories_found = [c for c in _AI_DATA_CATEGORIES if c in window]
if len(categories_found) < 2:
continue
# Per-category retention phrase already present? then OK
if any(p in window for p in _per_category_phrases()):
return []
# Retention-claim in window? parse duration
m = re.search(
r"(\d+(?:[.,]\d+)?\s*(?:tage?|monat\w*|jahre?|"
r"day|month|year))", window,
)
if not m:
continue
days, kind = parse_duration_to_days(m.group(1))
if days is None:
continue
findings.append({
"check_id": "TH-RETENTION-GRANULARITY-001",
"severity": "LOW",
"severity_reason": "incomplete",
"title": (
"AI-Speicherdauer pauschal — pro Datenkategorie "
"differenzieren empfohlen"
),
"norm": "DSGVO Art. 13 Abs. 2 lit. a + DSK-OH AI",
"ai_provider": ai_kw,
"retention_days": int(days),
"categories_detected": categories_found,
"action": (
f"Für '{ai_kw}'-Kontext separate Speicherdauern je "
f"Datenkategorie angeben (Texteingaben / IP / "
f"Geräteinformationen / Session). Aktuell pauschal "
f"{int(days)} Tage."
),
})
break # one per DSE is enough
if findings:
logger.info("B11 AI-retention-granularity: %d findings", len(findings))
return findings
@@ -44,8 +44,10 @@ def compose_v2(state: dict) -> str:
state.get("vendor_consistency_html", ""),
# B5 — AI-Act Art. 50 Transparenzpflicht
state.get("ai_act_html", ""),
# B6/B7/B8 — DPO-cross-doc + Doc-Staleness + CMP-fingerprint
# B6/B7/B8/B9/B10 — DPO + Staleness + CMP + MultiEntity + Transfer
state.get("extra_findings_html", ""),
# Browser-Matrix (Stage 1.c)
state.get("browser_matrix_html", ""),
# All legacy build_*_html() wrapped in V2 sections — preserves
# every information block from the old renderer (Exec Summary,
# Banner-Screenshot, VVT, Redundancy, Solutions, Diff, etc.)
@@ -0,0 +1,17 @@
"""Doc-Type Specialist-Agents — Phase 1 Prototyp.
Architektur:
- Pro Doc-Type ein Spezialist-Agent mit System-Prompt (Domänenwissen)
+ Knowledge-Base (anonymisierte Patterns/Statistiken aus
Multi-Mandanten-Daten)
- Jeder Agent liefert strukturierte Findings → enriched state
- Ein Cross-Doc-Router-Agent prüft ob Absätze falsch zugeordnet sind
("Cookie-Inhalt steht in AGB statt Cookie-Richtlinie")
Phase 1: Impressum-Agent als Prototyp (Pattern-Match-only, ohne LLM).
Phase 2: DSE-Agent + Cross-Doc-Router (LLM-gestützt).
Phase 3+: Weitere Doc-Types + Continuous Learning der KB.
Privacy: KB enthält NIEMALS Roh-Mandantendaten. Anonymisierung +
Aggregation Pflicht (NER-Maskierung vor KB-Speicher).
"""
@@ -0,0 +1,159 @@
"""Impressum-Specialist-Agent Phase-1 Prototyp.
Pattern-Match-only (kein LLM). Demonstriert die Architektur:
- Knowledge-Base mit § 5 TMG/DDG-Pflichtangaben
- Pattern-Library für Erkennung
- strukturierte Findings mit Norm + Action
Phase 2 wird denselben Output produzieren, aber LLM-gestützt mit
Domain-spezifischem System-Prompt + Cross-Customer-KB.
KB-Beispiel-Einträge:
- HR-Format DE: HR[BA] <Nr> <Stadt>
- USt-IdNr-Format DE: DE\\d{9}
- Aufsichtsbehörden-Liste (Branchen)
- DSB-Adressformat
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
# Pflichtangaben nach § 5 TMG + § 1 DL-InfoV
PFLICHTANGABEN = {
"name_anbieter": {
"label": "Name + Anschrift des Anbieters",
"norm": "§ 5 Abs. 1 Nr. 1 TMG",
"patterns": [
re.compile(r"\b(?:Anbieter|Diensteanbieter|"
r"Verantwortlich(?:er Anbieter)?)\s*[:.\s]",
re.IGNORECASE),
],
"severity_if_missing": "HIGH",
},
"kontakt_email": {
"label": "Email-Adresse",
"norm": "§ 5 Abs. 1 Nr. 2 TMG",
"patterns": [
re.compile(r"\b[\w.+-]+@[\w-]+\.[a-z]{2,}\b", re.IGNORECASE),
],
"severity_if_missing": "HIGH",
},
"kontakt_telefon": {
"label": "Telefon",
"norm": "§ 5 Abs. 1 Nr. 2 TMG",
"patterns": [
re.compile(r"(?:Tel(?:efon)?|Phone)\.?\s*[:.\s]\s*[\+\d][\d\s/\-()]{5,}",
re.IGNORECASE),
],
"severity_if_missing": "MEDIUM",
},
"handelsregister": {
"label": "Handelsregister-Eintrag",
"norm": "§ 5 Abs. 1 Nr. 4 TMG",
"patterns": [
re.compile(r"\bHR[BA]\s+\d", re.IGNORECASE),
re.compile(r"Handelsregister", re.IGNORECASE),
],
"severity_if_missing": "HIGH",
},
"ust_id": {
"label": "USt-IdNr",
"norm": "§ 5 Abs. 1 Nr. 6 TMG",
"patterns": [
re.compile(r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]",
re.IGNORECASE),
re.compile(r"\bDE\d{9}\b"),
],
"severity_if_missing": "MEDIUM",
},
"vertretungsberechtigte": {
"label": "Vertretungsberechtigte Person",
"norm": "§ 5 Abs. 1 Nr. 1 TMG (juristische Personen)",
"patterns": [
re.compile(r"(?:Geschäftsführer|Vertretungsberechtigt|"
r"vertreten\s+durch)\s*[:.\s]",
re.IGNORECASE),
],
"severity_if_missing": "HIGH",
},
"aufsichtsbehoerde": {
"label": "Aufsichtsbehörde (regulierte Branchen)",
"norm": "§ 5 Abs. 1 Nr. 3 TMG (Branchen-bedingt)",
"patterns": [
re.compile(r"Aufsichtsbeh(?:ö|oe)rde\s*[:.\s]", re.IGNORECASE),
re.compile(r"\bBAFin\b|\bBNetzA\b|\bLKA\b", re.IGNORECASE),
],
"severity_if_missing": "LOW",
},
"berufsangaben": {
"label": "Berufsbezeichnung + Berufsrechtliche Angaben",
"norm": "§ 5 Abs. 1 Nr. 5 TMG (Kammerberufe)",
"patterns": [
re.compile(r"Berufsbezeichnung|Berufsordnung|Kammer",
re.IGNORECASE),
],
"severity_if_missing": "LOW",
},
"odr_link": {
"label": "OS-Link auf EU-Plattform",
"norm": "Art. 14 EU-VO 524/2013 (B2C-Onlineshops)",
"patterns": [
re.compile(r"ec\.europa\.eu/consumers/odr", re.IGNORECASE),
],
"severity_if_missing": "MEDIUM",
},
}
def evaluate(impressum_text: str,
business_scope: set[str] | None = None) -> list[dict]:
"""Run Impressum-Agent against the doc text.
Returns a list of finding dicts; empty when all Pflichtangaben
present. `business_scope` controls which optional checks run
(e.g. OS-Link only for B2C ecommerce).
"""
if not impressum_text:
return []
business_scope = business_scope or set()
findings: list[dict] = []
for field_id, spec in PFLICHTANGABEN.items():
# Skip context-dependent fields when scope doesn't match
if field_id == "odr_link" and "ecommerce" not in business_scope:
continue
if field_id == "aufsichtsbehoerde" and (
"regulated_profession" not in business_scope
and "financial_services" not in business_scope
and "insurance" not in business_scope
):
continue
if field_id == "berufsangaben" and (
"regulated_profession" not in business_scope
):
continue
found = any(p.search(impressum_text) for p in spec["patterns"])
if found:
continue
findings.append({
"check_id": f"IMPRESSUM-AGENT-{field_id.upper()}",
"agent": "impressum_agent_v1",
"field_id": field_id,
"severity": spec["severity_if_missing"],
"severity_reason": "missing",
"title": f"Pflichtangabe '{spec['label']}' fehlt im Impressum",
"norm": spec["norm"],
"action": (
f"{spec['label']} im Impressum ergänzen "
f"(Pflichtangabe nach {spec['norm']})."
),
})
if findings:
logger.info(
"impressum_agent: %d findings (kein LLM, KB v1)", len(findings),
)
return findings
+2
View File
@@ -61,7 +61,9 @@ class ScanResponse(BaseModel):
from routes_matrix import router as matrix_router
from routes_mobile import router as mobile_router
app.include_router(matrix_router)
app.include_router(mobile_router)
@app.get("/health")
+29
View File
@@ -0,0 +1,29 @@
"""POST /scan-mobile-reachability — B1 Playwright Mobile-Verifikation.
Eigenes Modul-File damit main.py unter 500 LOC bleibt.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from fastapi import APIRouter
from pydantic import BaseModel
from services.mobile_reachability_scanner import scan_mobile_reachability
logger = logging.getLogger(__name__)
router = APIRouter()
class MobileReachReq(BaseModel):
url: str
@router.post("/scan-mobile-reachability")
async def scan_mobile(req: MobileReachReq):
logger.info("Mobile-reachability scan for %s", req.url)
res = await scan_mobile_reachability(req.url)
res["scanned_at"] = datetime.now(timezone.utc).isoformat()
return res
@@ -0,0 +1,163 @@
"""B1 Mobile Reachability — echter Playwright-Scan auf iPhone-Emulation.
Ersetzt den statischen HTTP-Fetch im Backend-B1-Wiring durch eine
echte WebKit-Browser-Session mit `devices['iPhone 15']`-Preset. Misst:
- hat Footer einen Reopen-Anchor (Text/aria-label/onclick)?
- Tap-Target-Größe (boundingBox in px) Apple HIG 44pt = 44 px
- Click-Behavior: öffnet sich der CMP direkt? (DOM-Mutation +
Modal-Detection nach 2s)
Output schema (für Backend-B1 ersetzbar mit statischer Logik):
{
"url": str,
"has_anchor": bool,
"anchor_text": str,
"tap_target_px": {"w": int, "h": int} | None,
"click_opens_cmp": bool,
"modal_selector": str | None,
"screenshot_b64": str (initial Footer-Crop),
"engine_meta": {"engine": "webkit", "device": "iPhone 15",
"user_agent": str, "viewport": str},
}
"""
from __future__ import annotations
import base64
import logging
from typing import Any
logger = logging.getLogger(__name__)
# Phrasen für Footer-Anchor-Suche (mirror des Backend-Service)
_REOPEN_PHRASES = (
"cookie-einstellungen", "cookie einstellungen",
"cookie-präferenzen", "cookie-praeferenzen",
"cookie-einwilligung",
"einwilligung verwalten",
"consent manager", "consent settings", "consent-einstellungen",
"datenschutz-einstellungen", "datenschutzeinstellungen",
"cookies verwalten", "manage cookies", "manage preferences",
"privacy settings", "privacy preferences",
"tracking-einstellungen",
)
async def scan_mobile_reachability(url: str) -> dict[str, Any]:
"""Run Mobile-Safari emulation + footer reachability check."""
try:
from playwright.async_api import async_playwright
except Exception as e:
logger.warning("playwright not available: %s", e)
return {"url": url, "error": "playwright missing"}
async with async_playwright() as p:
device_preset = p.devices.get("iPhone 15") or {}
browser = await p.webkit.launch(headless=True)
try:
context = await browser.new_context(
**device_preset,
locale="de-DE",
timezone_id="Europe/Berlin",
)
page = await context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded",
timeout=30000)
except Exception as e:
return {"url": url, "error": f"goto failed: {e}"[:200]}
try:
await page.wait_for_timeout(1500)
except Exception:
pass
ua = await page.evaluate("() => navigator.userAgent")
viewport = page.viewport_size or {}
engine_meta = {
"engine": "webkit",
"device": "iPhone 15",
"user_agent": ua,
"viewport": f"{viewport.get('width','?')}x{viewport.get('height','?')}",
}
# Find footer reopen anchor by text matching
anchor_loc = None
for phrase in _REOPEN_PHRASES:
try:
candidate = page.locator(
f"footer >> text=/{phrase}/i"
).first
if await candidate.count() > 0:
anchor_loc = candidate
anchor_text = phrase
break
except Exception:
continue
result: dict[str, Any] = {
"url": url,
"has_anchor": False,
"anchor_text": "",
"tap_target_px": None,
"click_opens_cmp": False,
"modal_selector": None,
"engine_meta": engine_meta,
}
if anchor_loc is None:
# Capture footer crop
try:
footer = page.locator("footer").first
if await footer.count() > 0:
png = await footer.screenshot()
result["screenshot_b64"] = base64.b64encode(
png,
).decode("ascii")[:120000]
except Exception:
pass
return result
result["has_anchor"] = True
result["anchor_text"] = anchor_text
try:
box = await anchor_loc.bounding_box()
if box:
result["tap_target_px"] = {
"w": int(box["width"]), "h": int(box["height"]),
}
except Exception:
pass
# DOM-Modal-Snapshot vorher
try:
before_modals = await page.evaluate(
"() => Array.from(document.querySelectorAll("
"'[role=dialog],[aria-modal=true],.cmp-modal,"
".ot-sdk-container,#usercentrics-cmp')).length"
)
except Exception:
before_modals = 0
# Klick + warten
try:
await anchor_loc.click(timeout=5000)
await page.wait_for_timeout(2000)
after_modals = await page.evaluate(
"() => Array.from(document.querySelectorAll("
"'[role=dialog],[aria-modal=true],.cmp-modal,"
".ot-sdk-container,#usercentrics-cmp')).length"
)
if after_modals > before_modals:
result["click_opens_cmp"] = True
result["modal_selector"] = (
"[role=dialog] | [aria-modal=true] | cmp-modal"
)
except Exception as e:
logger.info("anchor click skipped: %s", e)
return result
finally:
await browser.close()