"""Multi-browser consent-scan orchestrator (browser-matrix stage 1). Runs the existing single-browser `consent_scanner.run_consent_test` once per profile from `browser_profiles.resolve_profiles` and aggregates the per-browser results with the worst-of rule: * any HIGH-violation on any browser → robustness_score capped to <60 * Pre-Consent + Reject-Respekt are weighted 80% combined * Banner-Design only contributes if the banner was detected at all Returns a unified ScanResponse-compatible dict plus a fresh `browser_matrix` block (one entry per profile) so the backend mail renderer can show "Chrome 95% · Firefox 92% · WebKit 78% · Mobile-Safari 65%". Heuristic only — the real per-test scoring (T1..T7 from the EDPB taskforce report) is mocked here as a placeholder until the consent scanner emits structured per-test results. """ from __future__ import annotations import asyncio import logging from typing import Any, Callable, Awaitable from .browser_profiles import resolve_profiles logger = logging.getLogger(__name__) # Worst-of capping: if pre-consent or reject-respect has ANY hard fail, # overall robustness can never exceed this value. _HARD_FAIL_CAP = 55 # Per-dimension weights — Sales/Risk-tuned (see strategy doc): # Pre-Consent-Compliance 50% # Reject-Respekt 30% # Banner-Design / Dark 20% _WEIGHTS = {"pre_consent": 0.5, "reject_respect": 0.3, "banner_design": 0.2} def _extract_dimensions(banner_result: dict) -> dict[str, float]: """Best-effort: derive 3 sub-scores from the existing scan output. Falls back to neutral 0.5 when the input is too sparse. """ if not banner_result: return {"pre_consent": 0.5, "reject_respect": 0.5, "banner_design": 0.5} phases = banner_result.get("phases") or {} before = phases.get("before_consent") or phases.get("before") or {} after_reject = phases.get("after_reject") or {} bv = (banner_result.get("banner_checks") or {}).get("violations") or [] pre_cookies = len(before.get("cookies") or []) rej_cookies = len(after_reject.get("cookies") or []) pre_consent = max(0.0, 1.0 - min(1.0, pre_cookies / 10.0)) reject_respect = max(0.0, 1.0 - min(1.0, rej_cookies / 5.0)) banner_design = max(0.0, 1.0 - min(1.0, len(bv) / 5.0)) return { "pre_consent": round(pre_consent, 3), "reject_respect": round(reject_respect, 3), "banner_design": round(banner_design, 3), } def _score(dimensions: dict[str, float]) -> int: base = ( dimensions["pre_consent"] * _WEIGHTS["pre_consent"] + dimensions["reject_respect"] * _WEIGHTS["reject_respect"] + dimensions["banner_design"] * _WEIGHTS["banner_design"] ) pct = int(round(base * 100)) if (dimensions["pre_consent"] < 0.5 or dimensions["reject_respect"] < 0.5): pct = min(pct, _HARD_FAIL_CAP) return pct def _verbal(score: int) -> str: if score >= 95: return "Im Prüfumfang keine wesentlichen Mängel" if score >= 80: return "Niedriges Risiko, Korrektur empfohlen" if score >= 60: return "Mittlere Mängel, kurzfristige Korrektur" if score >= 30: return "Schwere Mängel, sofortige Korrektur" return "Bußgeldrelevante Verstöße" async def run_matrix( scanner: Callable[..., Awaitable[Any]], url: str, requested_profiles: list[str] | None = None, **scanner_kwargs: Any, ) -> dict: """Run `scanner(url, profile=…, **kw)` once per profile in parallel. `scanner` must be the existing consent_scanner.run_consent_test or a shim with the same signature; it must accept a `browser_profile` kwarg. Returns: { "browser_matrix": [ {"profile_id": ..., "label": ..., "scan": , "dimensions": {...}, "score": int, "verbal": str}, ... ], "aggregate": { "worst_score": int, "worst_profile": "...", "best_score": int, "best_profile": "...", "verbal": "...", }, } """ profiles = resolve_profiles(requested_profiles) if not profiles: return {"browser_matrix": [], "aggregate": {}} async def _run_one(prof: dict) -> dict: try: scan = await scanner( url, browser_profile=prof, **scanner_kwargs, ) except TypeError: # Backward-compat: scanner that doesn't accept the kwarg scan = await scanner(url, **scanner_kwargs) except Exception as e: logger.warning("matrix profile %s failed: %s", prof["id"], e) return { "profile_id": prof["id"], "label": prof["label"], "scan": None, "error": str(e)[:200], "dimensions": {"pre_consent": 0, "reject_respect": 0, "banner_design": 0}, "score": 0, "verbal": "Scan fehlgeschlagen", } dims = _extract_dimensions(scan or {}) score = _score(dims) return { "profile_id": prof["id"], "label": prof["label"], "scan": scan, "dimensions": dims, "score": score, "verbal": _verbal(score), } results = await asyncio.gather(*[_run_one(p) for p in profiles]) sorted_by_score = sorted(results, key=lambda r: r["score"]) worst = sorted_by_score[0] best = sorted_by_score[-1] return { "browser_matrix": results, "aggregate": { "worst_score": worst["score"], "worst_profile": worst["profile_id"], "best_score": best["score"], "best_profile": best["profile_id"], "verbal": worst["verbal"], "profiles_run": len(results), }, }