881e9c28de
- _scanner_run reicht browser_profile an run_consent_test durch (statt Single-Chromium-Shim) - neue scan_matrix_summary.matrix_scan_dict: ConsentTestResult -> schlanke Matrix-dict-Form (phases fuer _extract_dimensions + kompakter `summary`: cookies_before_consent/after_reject, reject_respected-Heuristik [keine Verstoesse UND kein neuer Tracker], surface, screenshot) - multi_browser_scanner._run_one hebt summary + engine + is_mobile an die Zeile, verwirft die vollen Cookie-Listen (JSONB-Persistenz schlank) - consent_scanner: _ctx_base mit Mobile-Device-Emulation (iPhone-Profil -> echtes Mobile-Viewport/Touch), alle 5 new_context auf **_ctx_base - Tests: test_scan_matrix_summary (6) inkl. _extract_dimensions-Vertrag Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
167 lines
6.2 KiB
Python
167 lines
6.2 KiB
Python
"""Multi-browser consent-scan orchestrator (browser-matrix stage 1).
|
||
|
||
Runs the existing single-browser `consent_scanner.run_consent_test`
|
||
once per profile from `browser_profiles.resolve_profiles` and
|
||
aggregates the per-browser results with the worst-of rule:
|
||
|
||
* any HIGH-violation on any browser → robustness_score capped to <60
|
||
* Pre-Consent + Reject-Respekt are weighted 80% combined
|
||
* Banner-Design only contributes if the banner was detected at all
|
||
|
||
Returns a unified ScanResponse-compatible dict plus a fresh
|
||
`browser_matrix` block (one entry per profile) so the backend mail
|
||
renderer can show "Chrome 95% · Firefox 92% · WebKit 78% · Mobile-Safari 65%".
|
||
|
||
Heuristic only — the real per-test scoring (T1..T7 from the EDPB
|
||
taskforce report) is mocked here as a placeholder until the consent
|
||
scanner emits structured per-test results.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
from typing import Any, Callable, Awaitable
|
||
|
||
from .browser_profiles import resolve_profiles
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Worst-of capping: if pre-consent or reject-respect has ANY hard fail,
|
||
# overall robustness can never exceed this value.
|
||
_HARD_FAIL_CAP = 55
|
||
|
||
# Per-dimension weights — Sales/Risk-tuned (see strategy doc):
|
||
# Pre-Consent-Compliance 50%
|
||
# Reject-Respekt 30%
|
||
# Banner-Design / Dark 20%
|
||
_WEIGHTS = {"pre_consent": 0.5, "reject_respect": 0.3, "banner_design": 0.2}
|
||
|
||
|
||
def _extract_dimensions(banner_result: dict) -> dict[str, float]:
|
||
"""Best-effort: derive 3 sub-scores from the existing scan output.
|
||
|
||
Falls back to neutral 0.5 when the input is too sparse.
|
||
"""
|
||
if not banner_result:
|
||
return {"pre_consent": 0.5, "reject_respect": 0.5,
|
||
"banner_design": 0.5}
|
||
phases = banner_result.get("phases") or {}
|
||
before = phases.get("before_consent") or phases.get("before") or {}
|
||
after_reject = phases.get("after_reject") or {}
|
||
bv = (banner_result.get("banner_checks") or {}).get("violations") or []
|
||
pre_cookies = len(before.get("cookies") or [])
|
||
rej_cookies = len(after_reject.get("cookies") or [])
|
||
pre_consent = max(0.0, 1.0 - min(1.0, pre_cookies / 10.0))
|
||
reject_respect = max(0.0, 1.0 - min(1.0, rej_cookies / 5.0))
|
||
banner_design = max(0.0, 1.0 - min(1.0, len(bv) / 5.0))
|
||
return {
|
||
"pre_consent": round(pre_consent, 3),
|
||
"reject_respect": round(reject_respect, 3),
|
||
"banner_design": round(banner_design, 3),
|
||
}
|
||
|
||
|
||
def _score(dimensions: dict[str, float]) -> int:
|
||
base = (
|
||
dimensions["pre_consent"] * _WEIGHTS["pre_consent"]
|
||
+ dimensions["reject_respect"] * _WEIGHTS["reject_respect"]
|
||
+ dimensions["banner_design"] * _WEIGHTS["banner_design"]
|
||
)
|
||
pct = int(round(base * 100))
|
||
if (dimensions["pre_consent"] < 0.5
|
||
or dimensions["reject_respect"] < 0.5):
|
||
pct = min(pct, _HARD_FAIL_CAP)
|
||
return pct
|
||
|
||
|
||
def _verbal(score: int) -> str:
|
||
if score >= 95:
|
||
return "Im Prüfumfang keine wesentlichen Mängel"
|
||
if score >= 80:
|
||
return "Niedriges Risiko, Korrektur empfohlen"
|
||
if score >= 60:
|
||
return "Mittlere Mängel, kurzfristige Korrektur"
|
||
if score >= 30:
|
||
return "Schwere Mängel, sofortige Korrektur"
|
||
return "Bußgeldrelevante Verstöße"
|
||
|
||
|
||
async def run_matrix(
|
||
scanner: Callable[..., Awaitable[Any]],
|
||
url: str,
|
||
requested_profiles: list[str] | None = None,
|
||
**scanner_kwargs: Any,
|
||
) -> dict:
|
||
"""Run `scanner(url, profile=…, **kw)` once per profile in parallel.
|
||
|
||
`scanner` must be the existing consent_scanner.run_consent_test
|
||
or a shim with the same signature; it must accept a `browser_profile`
|
||
kwarg. Returns:
|
||
|
||
{
|
||
"browser_matrix": [
|
||
{"profile_id": ..., "label": ..., "scan": <raw scan dict>,
|
||
"dimensions": {...}, "score": int, "verbal": str},
|
||
...
|
||
],
|
||
"aggregate": {
|
||
"worst_score": int, "worst_profile": "...",
|
||
"best_score": int, "best_profile": "...",
|
||
"verbal": "...",
|
||
},
|
||
}
|
||
"""
|
||
profiles = resolve_profiles(requested_profiles)
|
||
if not profiles:
|
||
return {"browser_matrix": [], "aggregate": {}}
|
||
|
||
async def _run_one(prof: dict) -> dict:
|
||
try:
|
||
scan = await scanner(
|
||
url, browser_profile=prof, **scanner_kwargs,
|
||
)
|
||
except TypeError:
|
||
# Backward-compat: scanner that doesn't accept the kwarg
|
||
scan = await scanner(url, **scanner_kwargs)
|
||
except Exception as e:
|
||
logger.warning("matrix profile %s failed: %s", prof["id"], e)
|
||
return {
|
||
"profile_id": prof["id"], "label": prof["label"],
|
||
"engine": prof.get("engine"),
|
||
"is_mobile": bool(prof.get("device")),
|
||
"summary": None, "error": str(e)[:200],
|
||
"dimensions": {"pre_consent": 0, "reject_respect": 0,
|
||
"banner_design": 0},
|
||
"score": 0, "verbal": "Scan fehlgeschlagen",
|
||
}
|
||
dims = _extract_dimensions(scan or {})
|
||
score = _score(dims)
|
||
# Nur den kompakten `summary` an die Zeile heben — die vollen
|
||
# phases/Cookie-Listen werden für das Scoring konsumiert und dann
|
||
# verworfen (sonst bläht 6× volle Cookie-Liste die JSONB-Persistenz).
|
||
summary = (scan or {}).get("summary") if isinstance(scan, dict) else None
|
||
return {
|
||
"profile_id": prof["id"], "label": prof["label"],
|
||
"engine": prof.get("engine"),
|
||
"is_mobile": bool(prof.get("device")),
|
||
"summary": summary, "dimensions": dims, "score": score,
|
||
"verbal": _verbal(score),
|
||
}
|
||
|
||
results = await asyncio.gather(*[_run_one(p) for p in profiles])
|
||
sorted_by_score = sorted(results, key=lambda r: r["score"])
|
||
worst = sorted_by_score[0]
|
||
best = sorted_by_score[-1]
|
||
return {
|
||
"browser_matrix": results,
|
||
"aggregate": {
|
||
"worst_score": worst["score"],
|
||
"worst_profile": worst["profile_id"],
|
||
"best_score": best["score"],
|
||
"best_profile": best["profile_id"],
|
||
"verbal": worst["verbal"],
|
||
"profiles_run": len(results),
|
||
},
|
||
}
|