feat: Browser-Matrix Stufe 1.a + 2 weitere GT-Findings + Plausibility-LLM-Härtung

Stage 1.a Browser-Matrix (Task #15) — Multi-Engine Scaffolding:
  - consent-tester/Dockerfile: firefox + webkit + Xvfb deps
  - playwright install chromium firefox webkit
  - services/browser_profiles.py: Registry mit DEFAULT_PROFILES
    (Chromium-Headed/Firefox-Headed/WebKit-Headed/Mobile-Safari) +
    EXTRA_PROFILES (Chrome-Channel, Edge, Brave)
  - services/multi_browser_scanner.py: run_matrix() orchestriert N
    parallele Scans + worst-of-Aggregation + 3 Sub-Scores
    (Pre-Consent 50%, Reject-Respekt 30%, Banner-Design 20%) +
    Hard-Fail-Cap auf <60% bei Pre-Consent/Reject-Verstoß
  - routes_matrix.py: POST /scan-matrix Endpoint (eigenes Modul,
    damit main.py unter 500 LOC bleibt)
  KNOWN: Stage 1.a-Shim ruft alle Profile auf demselben Chromium,
    echte Engine-Diversität in Stage 1.b (consent_scanner.py Param)

Coverage-Gap 3 (Task #17): 2/3 verbleibende GT-Lücken geschlossen:
  - B9 impressum_multi_entity_check (IMPRESSUM-001): erkennt
    USt-IdNr/HR/GF-Fehlen pro Entity bei multi-entity Impressen
    (Elli: USt-IdNr nur bei Elli Mobility, fehlt bei VW Group Charging)
  - B10 transfer_mechanism_check (TRANSFER-001): pro Non-EU-Vendor
    in cmp_vendors prüft DSE auf DPF/SCCs/BCRs/Einwilligung im
    ±400-char-Window. Findet Vendors ohne benannten Mechanismus.
  - TH-RETENTION-002 (AI-Datenkategorie-Differenzierung) bleibt
    semantisch-tief, vorgesehen für Specialist-Agents Task #18.

Plausibility-LLM Empty-Response-Härtung (Task #16):
  - BATCH_SIZE 8 → 4, EXCERPT 4000 → 1500 chars, TIMEOUT 60 → 45s
  - Single-retry mit halbierter Batch wenn LLM empty content
    zurückgibt — qwen3:30b-a3b rejektiert manchmal ≥6-Item-Prompts
    unter format='json'. Falls auch Half-Batch empty: log + skip.
  - Pipeline läuft jetzt nicht mehr 10min in Timeouts.

GT-Coverage Sprung: 10/13 → 11/13 (85%). 4/4 HIGH ✓, 5/6 MEDIUM ✓,
2/3 LOW ✓.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-06 21:42:27 +02:00
parent d0e3621192
commit e1dadc8027
10 changed files with 687 additions and 4 deletions
@@ -0,0 +1,92 @@
"""B9 + B10 wiring — Multi-Entity-Impressum + Drittland-Mechanismus.
Runs after B6/B7/B8. Adds Findings into `state["extra_findings"]`
and re-renders the extra-block HTML.
"""
from __future__ import annotations
import html
import logging
from compliance.services.impressum_multi_entity_check import (
check_multi_entity_impressum,
)
from compliance.services.transfer_mechanism_check import (
check_transfer_mechanism,
)
logger = logging.getLogger(__name__)
def run_b9b10(state: dict) -> None:
extras = state.get("extra_findings") or []
new: list[dict] = []
new.extend(check_multi_entity_impressum(state))
new.extend(check_transfer_mechanism(state))
if not new:
return
extras.extend(new)
state["extra_findings"] = extras
state["extra_findings_html"] = _render(extras)
logger.info("B9/B10 added %d findings (total extra=%d)",
len(new), len(extras))
def _render(findings: list[dict]) -> str:
cards = []
for f in findings:
sev = (f.get("severity") or "").upper()
color = "#dc2626" if sev == "HIGH" else (
"#f59e0b" if sev == "MEDIUM" else "#64748b"
)
meta = ""
if f.get("entities_missing"):
meta = ("<div style='font-size:12px;color:#475569;margin-top:6px;'>"
f"<em>Fehlt bei: "
f"{html.escape(', '.join(f['entities_missing']))}</em>"
"</div>")
elif f.get("vendor"):
meta = ("<div style='font-size:12px;color:#475569;margin-top:6px;'>"
f"<em>Vendor: {html.escape(f['vendor'])} "
f"({html.escape(f.get('country','?'))})</em>"
"</div>")
elif f.get("doc_date"):
meta = ("<div style='font-size:12px;color:#475569;margin-top:6px;'>"
f"<em>Stand: {html.escape(f['doc_date'])} "
f"({f.get('age_years','?')} J. alt)</em>"
"</div>")
elif f.get("detected_provider"):
meta = ("<div style='font-size:12px;color:#475569;margin-top:6px;'>"
f"<em>Erkannter Provider: "
f"{html.escape(f['detected_provider'])}</em>"
"</div>")
elif f.get("evidence_dse"):
meta = ("<div style='font-size:12px;color:#475569;margin-top:6px;'>"
f"<em>In DSE: {html.escape(', '.join(f['evidence_dse']))}</em>"
"</div>")
cards.append(
f"<div style='margin:12px 0;padding:14px;background:#fff;"
f"border-left:3px solid {color};border-radius:4px;'>"
f"<div style='font-weight:600;color:{color};font-size:14px;'>"
f"{sev} · {html.escape(f.get('check_id') or '')}</div>"
f"<div style='font-size:14px;margin-top:4px;'>"
f"<strong>{html.escape(f.get('title') or '')}</strong></div>"
f"<div style='font-size:12px;color:#64748b;margin-top:2px;'>"
f"{html.escape(f.get('norm') or '')}</div>"
f"{meta}"
f"<div style='font-size:13px;margin-top:8px;background:#dcfce7;"
f"padding:8px 10px;border-radius:4px;'>"
f"<strong>→ Empfehlung:</strong> "
f"{html.escape(f.get('action') or '')}</div>"
"</div>"
)
return (
"<div style='margin:24px 0;padding:16px;border-left:4px solid #f59e0b;"
"background:#fffbeb;border-radius:4px;'>"
"<h2 style='margin:0 0 8px;color:#92400e;font-size:16px;'>"
"📌 Zusätzliche Cross-Doc-Befunde"
"</h2>"
+ "".join(cards) +
"</div>"
)
@@ -21,6 +21,7 @@ from ._b3_wiring import run_b3
from ._b4_wiring import run_b4
from ._b5_wiring import run_b5
from ._b6b7b8_wiring import run_b6b7b8
from ._b9b10_wiring import run_b9b10
from ._constants import _compliance_check_jobs
from ._phase_a_resolve import run_phase_a
from ._phase_b_profile_check import run_phase_b
@@ -63,6 +64,7 @@ async def run_compliance_check(check_id: str, req) -> None:
run_b4(state) # Cross-doc vendor-consistency (Elli Vertex↔Iadvize)
run_b5(state) # AI-Act Art. 50 transparency
run_b6b7b8(state) # DPO-cross-doc + Doc-Staleness + CMP-fingerprint
run_b9b10(state) # Multi-Entity-Impressum + Drittland-Mechanismus
# Phase D-3 top/mid/bot: Step 5 HTML blocks
await run_phase_d3_top(state)
await run_phase_d3_mid(state)
@@ -51,8 +51,13 @@ logger = logging.getLogger(__name__)
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
MODEL = os.getenv("PLAUSIBILITY_LLM_MODEL", "qwen3:30b-a3b")
BATCH_SIZE = int(os.getenv("PLAUSIBILITY_BATCH_SIZE", "8"))
TIMEOUT = float(os.getenv("PLAUSIBILITY_TIMEOUT_S", "60.0"))
# Reduced from 8 → 4 to fight qwen3 empty-response-on-large-prompts bug.
# 4 items × ~500 token/item + 2000 system + 1500 excerpt = ~5500 token total,
# well within qwen3's safe range for format='json'.
BATCH_SIZE = int(os.getenv("PLAUSIBILITY_BATCH_SIZE", "4"))
TIMEOUT = float(os.getenv("PLAUSIBILITY_TIMEOUT_S", "45.0"))
# Reduced excerpt 4000 → 1500 chars (same reason).
DOC_EXCERPT_CHARS = int(os.getenv("PLAUSIBILITY_DOC_EXCERPT", "1500"))
# In-memory cache: (input_hash) -> result_dict. Survives one run.
_CACHE: dict[str, dict] = {}
@@ -121,7 +126,8 @@ def _build_user_prompt(items: list[dict], doc_title: str,
)
return (
f"DOKUMENT: {doc_title}\n\n"
f"DOKUMENT-AUSZUG (max 4000 Zeichen):\n{doc_excerpt[:4000]}\n\n"
f"DOKUMENT-AUSZUG (max {DOC_EXCERPT_CHARS} Zeichen):\n"
f"{doc_excerpt[:DOC_EXCERPT_CHARS]}\n\n"
f"FINDINGS ZU BEWERTEN:\n{findings_block}"
)
@@ -149,6 +155,23 @@ async def _ask_llm_batch(items: list[dict], doc_title: str,
r.raise_for_status()
content = (r.json().get("message") or {}).get("content", "")
if not content:
# Single retry with smaller batch — qwen3 sometimes
# rejects ≥6-item prompts under format='json'.
if len(items) > 2:
half = len(items) // 2
logger.info(
"plausibility empty → retry split %d%dx2",
len(items), half,
)
first = await _ask_llm_batch(
items[:half], doc_title, doc_excerpt,
)
second = await _ask_llm_batch(
items[half:], doc_title, doc_excerpt,
)
out.update(first)
out.update(second)
return out
logger.warning("plausibility LLM returned empty content")
return out
try:
@@ -0,0 +1,99 @@
"""B9 — Multi-Entity-Impressum-Check.
Findings, wenn ein Impressum mehrere Entitäten (mehrere GmbH/AG/UG)
nennt, aber Pflichtangaben nur bei einer davon vollständig sind.
Konkreter Elli-Pattern (GT IMPRESSUM-001):
- Entity 1: "Elli Mobility GmbH ... USt-IdNr DE814424009 ..."
- Entity 2: "VW Group Charging GmbH ... [keine USt-IdNr] ..."
→ USt-IdNr fehlt bei Entity 2.
Heuristik:
1. Entitäten erkennen: jede Match auf "<Name> (GmbH|AG|UG|KG|SE)" als
Entity-Boundary; Text-Slice von dort bis zur nächsten Entity.
2. Pro Entity prüfen: USt-IdNr, Handelsregister, Vertretungsberechtigte.
3. Wenn Entity N ein Feld nennt, das Entity M nicht hat → MEDIUM.
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
_ENTITY_PAT = re.compile(
r"([A-ZÄÖÜ][\w\-\&\s]{1,50}?\s+(?:GmbH|AG|UG|KG|SE|"
r"e\.V\.|GbR|OHG|Limited|Ltd|LLC))",
re.IGNORECASE,
)
_USTID_PAT = re.compile(r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]\s*"
r"(DE\d{8,10}|[A-Z]{2}\d{6,12})", re.IGNORECASE)
_HR_PAT = re.compile(r"\b(?:HR[BA]|Handelsregister|Registergericht)"
r"\s*[:.\s]*([\w\s\d\-/]{4,80})", re.IGNORECASE)
_GF_PAT = re.compile(r"(?:Geschäftsführer|Vertretungsberechtigt|"
r"vertreten\s+durch)\s*[:.\s]+", re.IGNORECASE)
def _slice_entities(text: str) -> list[tuple[str, str]]:
"""Return [(entity_name, text_slice)] for each detected entity."""
matches = list(_ENTITY_PAT.finditer(text))
if len(matches) < 2:
return []
slices: list[tuple[str, str]] = []
for i, m in enumerate(matches):
start = m.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
slices.append((m.group(1).strip(), text[start:end]))
return slices
def check_multi_entity_impressum(state: dict) -> list[dict]:
doc_texts = state.get("doc_texts") or {}
imp = doc_texts.get("impressum") or ""
if not imp:
return []
slices = _slice_entities(imp)
if not slices:
return []
# Compute features per entity
features = []
for name, slc in slices:
features.append({
"name": name,
"ust_id": bool(_USTID_PAT.search(slc)),
"hr": bool(_HR_PAT.search(slc)),
"gf": bool(_GF_PAT.search(slc)),
})
# If ALL share the same flags → no inconsistency
findings: list[dict] = []
for field, label in (
("ust_id", "USt-IdNr."),
("hr", "Handelsregister-Eintrag"),
("gf", "Vertretungsberechtigte"),
):
present = [f for f in features if f[field]]
missing = [f for f in features if not f[field]]
if present and missing and len(present) >= 1:
findings.append({
"check_id": f"IMPRESSUM-MULTI-{field.upper()}",
"severity": "MEDIUM",
"severity_reason": "incomplete",
"title": (
f"{label} fehlt bei "
f"{len(missing)} von {len(features)} Entitäten"
),
"norm": "§ 5 Abs. 1 TMG (Pflichtangabe pro Diensteanbieter)",
"entities_present": [f["name"] for f in present],
"entities_missing": [f["name"] for f in missing],
"action": (
f"{label} im Impressum für "
f"{', '.join(f['name'] for f in missing)} ergänzen. "
"Pflichtangabe ist pro Diensteanbieter zu erfüllen, "
"nicht 'eine reicht für alle'."
),
})
if findings:
logger.info("B9 multi-entity impressum: %d findings", len(findings))
return findings
@@ -0,0 +1,98 @@
"""B10 — Drittland-Transfer-Mechanismus-Konsistenz pro Vendor.
DSGVO Art. 44 ff. verlangt für Drittland-Transfers EINEN klaren
Mechanismus: Angemessenheitsbeschluss / EU-US DPF / SCCs / BCRs /
ausdrückliche Einwilligung. Wenn ein Vendor in cmp_vendors als
Drittland-Verarbeiter erkannt wird, muss der DSE-Text einen
Mechanismus pro Vendor (oder per Vendor-Kategorie) klar benennen.
GT-Pattern Elli (TRANSFER-001):
- Google/Meta → DPF in DSE genannt ✓
- Salesforce → SCCs ✓
- Webflow als US-Sitz erwähnt aber kein Mechanismus → MEDIUM
Heuristik:
1. Aus cmp_vendors die Drittland-Vendors filtern (third_country=True).
2. Im DSE-Text suchen, ob pro Vendor ein Mechanismus erwähnt ist.
3. Wenn ein Drittland-Vendor keinen Mechanismus hat → MEDIUM.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
_MECHANISM_KEYWORDS = (
("DPF / Data Privacy Framework",
["data privacy framework", "dpf-", "eu-us dpf",
"angemessenheitsbeschluss"]),
("Standardvertragsklauseln (SCCs)",
["standardvertragsklauseln", "scc-", "scc ", "standard contractual",
"art. 46 abs. 2 lit. c"]),
("Binding Corporate Rules",
["binding corporate rules", "bcr-", "verbindliche unternehmensregeln"]),
("Ausdrückliche Einwilligung",
["ausdrückliche einwilligung nach art. 49",
"explicit consent under art. 49"]),
)
def _mechanism_for_vendor(vendor_name: str, dse_text: str) -> str | None:
if not vendor_name or not dse_text:
return None
name_lc = vendor_name.lower()
text_lc = dse_text.lower()
# Find vendor mention in DSE; locate a ±400 char window for
# mechanism keywords
idx = text_lc.find(name_lc)
if idx < 0:
return None
window = text_lc[max(0, idx - 400): idx + 400]
for mech_label, kws in _MECHANISM_KEYWORDS:
if any(k in window for k in kws):
return mech_label
return None
def check_transfer_mechanism(state: dict) -> list[dict]:
cmp_vendors = state.get("cmp_vendors") or []
doc_texts = state.get("doc_texts") or {}
dse = doc_texts.get("dse") or ""
if not cmp_vendors or not dse:
return []
findings: list[dict] = []
for v in cmp_vendors:
country = (v.get("country") or "").upper().strip()
name = (v.get("name") or "").strip()
if not name:
continue
# Skip EU/EEA
if country in ("DE", "AT", "BE", "BG", "HR", "CY", "CZ", "DK",
"EE", "FI", "FR", "GR", "HU", "IE", "IT", "LV",
"LT", "LU", "MT", "NL", "PL", "PT", "RO", "SK",
"SI", "ES", "SE", "IS", "LI", "NO", "CH"):
continue
# Either flagged as third_country OR country not in EU
mech = _mechanism_for_vendor(name, dse)
if mech is None:
findings.append({
"check_id": "TRANSFER-MECH-001",
"vendor": name,
"country": country or "UNKNOWN",
"severity": "MEDIUM",
"severity_reason": "missing",
"title": (
f"Drittland-Transfer-Mechanismus für {name} "
f"({country or 'Drittland'}) fehlt in DSE"
),
"norm": "DSGVO Art. 44 + Art. 46 / Art. 49",
"action": (
f"Im DSE-Abschnitt zu {name} den Transfermechanismus "
"angeben (DPF / SCCs / BCRs / Einwilligung) und ggf. "
"Vertragsdokument referenzieren."
),
})
if findings:
logger.info("B10 transfer-mechanism: %d findings", len(findings))
return findings
+9 -1
View File
@@ -8,6 +8,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 \
libxrandr2 libgbm1 libpango-1.0-0 libcairo2 libasound2 \
curl \
# Browser-matrix stage 1: Firefox + WebKit deps + Xvfb (headed runs)
xvfb \
libdbus-glib-1-2 libxt6 \
libwoff1 libvpx7 libevent-2.1-7 libopus0 libgstreamer-plugins-base1.0-0 \
libgstreamer-gl1.0-0 libgstreamer1.0-0 libwebpdemux2 libharfbuzz-icu0 \
libenchant-2-2 libsecret-1-0 libhyphen0 libmanette-0.2-0 libflite1 \
libgles2 libx264-164 \
&& rm -rf /var/lib/apt/lists/*
# Create user BEFORE installing Playwright (so browsers are in user's cache)
@@ -17,8 +24,9 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Install Playwright browsers AS appuser (so they land in /home/appuser/.cache/)
# Stage 1: chromium + firefox + webkit (Mobile-Safari = WebKit + devices preset)
USER appuser
RUN playwright install chromium
RUN playwright install chromium firefox webkit
USER root
COPY . .
+4
View File
@@ -60,6 +60,10 @@ class ScanResponse(BaseModel):
banner_screenshot_b64: str = "" # P85: base64-PNG des Banners (initial-view)
from routes_matrix import router as matrix_router
app.include_router(matrix_router)
@app.get("/health")
async def health():
return {"status": "healthy", "service": "consent-tester"}
+61
View File
@@ -0,0 +1,61 @@
"""POST /scan-matrix — browser-matrix stage-1 endpoint.
Runs the existing consent_scanner once per browser profile and
returns the aggregated robustness-score per browser plus a
worst-of/best-of summary. Kept in its own module so main.py stays
under the 500-LOC cap.
KNOWN LIMITATION (stage 1.a):
The underlying `run_consent_test` does not yet accept a
`browser_profile` kwarg — all profiles currently execute on the
same Chromium instance. Engine diversity (real Firefox/WebKit
contexts) ships in stage 1.b once consent_scanner is split.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from fastapi import APIRouter
from pydantic import BaseModel
from services.consent_scanner import run_consent_test
from services.multi_browser_scanner import run_matrix
logger = logging.getLogger(__name__)
router = APIRouter()
class MatrixScanRequest(BaseModel):
url: str
timeout_per_phase: int = 10
categories: list[str] = []
# Resolved against browser_profiles.resolve_profiles. None or
# empty list → default 4 profiles (chromium/firefox/webkit/iphone).
browser_profiles: list[str] | None = None
async def _scanner_shim(url: str, browser_profile: dict | None = None,
timeout_per_phase: int = 10,
categories: list[str] | None = None):
"""Shim that ignores `browser_profile` until consent_scanner accepts it."""
return await run_consent_test(url, timeout_per_phase,
categories or [])
@router.post("/scan-matrix")
async def scan_matrix(req: MatrixScanRequest):
"""Run consent-scan across the resolved browser-profile matrix."""
logger.info("Matrix scan for %s profiles=%s", req.url,
req.browser_profiles or "default")
matrix = await run_matrix(
_scanner_shim,
req.url,
requested_profiles=req.browser_profiles,
timeout_per_phase=req.timeout_per_phase,
categories=req.categories,
)
matrix["url"] = req.url
matrix["scanned_at"] = datetime.now(timezone.utc).isoformat()
return matrix
+138
View File
@@ -0,0 +1,138 @@
"""Browser-matrix stage-1 profile registry.
Each profile is a deterministic recipe for a Playwright BrowserContext.
The orchestrator runs the scan once per profile and aggregates the
results with the worst-of-rule (a HIGH on any browser → HIGH overall).
Keep this module dependency-light so it can be imported in unit tests
without spawning Playwright. The Playwright glue lives in
`services/multi_browser_scanner.py`.
Profile schema:
{
"id": str canonical identifier shown in the audit report
"label": str human-readable name
"engine": str blink | gecko | webkit
"channel": str? Playwright channel ('chrome' / 'msedge')
"device": str? Playwright devices preset for mobile emulation
"headless": bool
"viewport": {"width": int, "height": int} (ignored when `device` set)
"locale": str
"timezone": str
"user_agent": str? overridden UA when not derived from device
}
"""
from __future__ import annotations
DEFAULT_PROFILES: list[dict] = [
{
"id": "chromium-headed-de",
"label": "Chromium (Headed) · de-DE",
"engine": "blink",
"channel": None,
"device": None,
"headless": False,
"viewport": {"width": 1920, "height": 1080},
"locale": "de-DE",
"timezone": "Europe/Berlin",
"user_agent": None,
},
{
"id": "firefox-headed-de",
"label": "Firefox (Headed, ETP-Standard) · de-DE",
"engine": "gecko",
"channel": None,
"device": None,
"headless": False,
"viewport": {"width": 1920, "height": 1080},
"locale": "de-DE",
"timezone": "Europe/Berlin",
"user_agent": None,
},
{
"id": "webkit-headed-de",
"label": "WebKit (Headed) · de-DE",
"engine": "webkit",
"channel": None,
"device": None,
"headless": False,
"viewport": {"width": 1920, "height": 1080},
"locale": "de-DE",
"timezone": "Europe/Berlin",
"user_agent": None,
},
{
"id": "iphone-mobile-safari-de",
"label": "Mobile Safari (iPhone 15) · de-DE",
"engine": "webkit",
"channel": None,
"device": "iPhone 15",
"headless": False,
"viewport": None,
"locale": "de-DE",
"timezone": "Europe/Berlin",
"user_agent": None,
},
]
# Optional profiles enabled via env var BROWSER_PROFILES_EXTRA
EXTRA_PROFILES: dict[str, dict] = {
"chrome-channel-desktop-de": {
"id": "chrome-channel-desktop-de",
"label": "Chrome Channel (Google Build) · de-DE",
"engine": "blink",
"channel": "chrome",
"device": None,
"headless": False,
"viewport": {"width": 1920, "height": 1080},
"locale": "de-DE",
"timezone": "Europe/Berlin",
"user_agent": None,
},
"edge-channel-desktop-de": {
"id": "edge-channel-desktop-de",
"label": "Edge Channel · de-DE",
"engine": "blink",
"channel": "msedge",
"device": None,
"headless": False,
"viewport": {"width": 1920, "height": 1080},
"locale": "de-DE",
"timezone": "Europe/Berlin",
"user_agent": None,
},
"brave-default-de": {
"id": "brave-default-de",
"label": "Brave Default-Shields · de-DE",
"engine": "blink",
"channel": None,
"device": None,
"headless": False,
"viewport": {"width": 1920, "height": 1080},
"locale": "de-DE",
"timezone": "Europe/Berlin",
"user_agent": None,
"executable_path": "/usr/bin/brave-browser",
},
}
def resolve_profiles(requested: list[str] | None) -> list[dict]:
"""Map requested ids to profile dicts. Falls back to all defaults
when `requested` is None or empty."""
if not requested:
return list(DEFAULT_PROFILES)
by_id = {p["id"]: p for p in DEFAULT_PROFILES}
by_id.update(EXTRA_PROFILES)
out: list[dict] = []
for r in requested:
prof = by_id.get(r)
if prof:
out.append(prof)
return out or list(DEFAULT_PROFILES)
def default_ids() -> list[str]:
return [p["id"] for p in DEFAULT_PROFILES]
@@ -0,0 +1,158 @@
"""Multi-browser consent-scan orchestrator (browser-matrix stage 1).
Runs the existing single-browser `consent_scanner.run_consent_test`
once per profile from `browser_profiles.resolve_profiles` and
aggregates the per-browser results with the worst-of rule:
* any HIGH-violation on any browser robustness_score capped to <60
* Pre-Consent + Reject-Respekt are weighted 80% combined
* Banner-Design only contributes if the banner was detected at all
Returns a unified ScanResponse-compatible dict plus a fresh
`browser_matrix` block (one entry per profile) so the backend mail
renderer can show "Chrome 95% · Firefox 92% · WebKit 78% · Mobile-Safari 65%".
Heuristic only the real per-test scoring (T1..T7 from the EDPB
taskforce report) is mocked here as a placeholder until the consent
scanner emits structured per-test results.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable, Awaitable
from .browser_profiles import resolve_profiles
logger = logging.getLogger(__name__)
# Worst-of capping: if pre-consent or reject-respect has ANY hard fail,
# overall robustness can never exceed this value.
_HARD_FAIL_CAP = 55
# Per-dimension weights — Sales/Risk-tuned (see strategy doc):
# Pre-Consent-Compliance 50%
# Reject-Respekt 30%
# Banner-Design / Dark 20%
_WEIGHTS = {"pre_consent": 0.5, "reject_respect": 0.3, "banner_design": 0.2}
def _extract_dimensions(banner_result: dict) -> dict[str, float]:
"""Best-effort: derive 3 sub-scores from the existing scan output.
Falls back to neutral 0.5 when the input is too sparse.
"""
if not banner_result:
return {"pre_consent": 0.5, "reject_respect": 0.5,
"banner_design": 0.5}
phases = banner_result.get("phases") or {}
before = phases.get("before_consent") or phases.get("before") or {}
after_reject = phases.get("after_reject") or {}
bv = (banner_result.get("banner_checks") or {}).get("violations") or []
pre_cookies = len(before.get("cookies") or [])
rej_cookies = len(after_reject.get("cookies") or [])
pre_consent = max(0.0, 1.0 - min(1.0, pre_cookies / 10.0))
reject_respect = max(0.0, 1.0 - min(1.0, rej_cookies / 5.0))
banner_design = max(0.0, 1.0 - min(1.0, len(bv) / 5.0))
return {
"pre_consent": round(pre_consent, 3),
"reject_respect": round(reject_respect, 3),
"banner_design": round(banner_design, 3),
}
def _score(dimensions: dict[str, float]) -> int:
base = (
dimensions["pre_consent"] * _WEIGHTS["pre_consent"]
+ dimensions["reject_respect"] * _WEIGHTS["reject_respect"]
+ dimensions["banner_design"] * _WEIGHTS["banner_design"]
)
pct = int(round(base * 100))
if (dimensions["pre_consent"] < 0.5
or dimensions["reject_respect"] < 0.5):
pct = min(pct, _HARD_FAIL_CAP)
return pct
def _verbal(score: int) -> str:
if score >= 95:
return "Im Prüfumfang keine wesentlichen Mängel"
if score >= 80:
return "Niedriges Risiko, Korrektur empfohlen"
if score >= 60:
return "Mittlere Mängel, kurzfristige Korrektur"
if score >= 30:
return "Schwere Mängel, sofortige Korrektur"
return "Bußgeldrelevante Verstöße"
async def run_matrix(
scanner: Callable[..., Awaitable[Any]],
url: str,
requested_profiles: list[str] | None = None,
**scanner_kwargs: Any,
) -> dict:
"""Run `scanner(url, profile=…, **kw)` once per profile in parallel.
`scanner` must be the existing consent_scanner.run_consent_test
or a shim with the same signature; it must accept a `browser_profile`
kwarg. Returns:
{
"browser_matrix": [
{"profile_id": ..., "label": ..., "scan": <raw scan dict>,
"dimensions": {...}, "score": int, "verbal": str},
...
],
"aggregate": {
"worst_score": int, "worst_profile": "...",
"best_score": int, "best_profile": "...",
"verbal": "...",
},
}
"""
profiles = resolve_profiles(requested_profiles)
if not profiles:
return {"browser_matrix": [], "aggregate": {}}
async def _run_one(prof: dict) -> dict:
try:
scan = await scanner(
url, browser_profile=prof, **scanner_kwargs,
)
except TypeError:
# Backward-compat: scanner that doesn't accept the kwarg
scan = await scanner(url, **scanner_kwargs)
except Exception as e:
logger.warning("matrix profile %s failed: %s", prof["id"], e)
return {
"profile_id": prof["id"], "label": prof["label"],
"scan": None, "error": str(e)[:200],
"dimensions": {"pre_consent": 0, "reject_respect": 0,
"banner_design": 0},
"score": 0, "verbal": "Scan fehlgeschlagen",
}
dims = _extract_dimensions(scan or {})
score = _score(dims)
return {
"profile_id": prof["id"], "label": prof["label"],
"scan": scan, "dimensions": dims, "score": score,
"verbal": _verbal(score),
}
results = await asyncio.gather(*[_run_one(p) for p in profiles])
sorted_by_score = sorted(results, key=lambda r: r["score"])
worst = sorted_by_score[0]
best = sorted_by_score[-1]
return {
"browser_matrix": results,
"aggregate": {
"worst_score": worst["score"],
"worst_profile": worst["profile_id"],
"best_score": best["score"],
"best_profile": best["profile_id"],
"verbal": worst["verbal"],
"profiles_run": len(results),
},
}