Files
breakpilot-compliance/consent-tester/services/multi_browser_scanner.py
T
Benjamin Admin 3f90e40807 fix(browser-matrix): Tracking-Signal statt Cookie-Rohzahl + Matrix-Schnellpfad
Korrektheit (§ 25 TDDDG): "Cookies vor Consent" ist KEIN Verstoss per se —
technisch notwendige Cookies inkl. des Consent-Cookies (speichert die
Ablehnung) sind nach Abs. 2 erlaubt. Verstoss ist nur nicht-essentielles
TRACKING vor Consent.
- browser_cross_finding: Befund haengt jetzt an violations.before_consent
  (Tracking), nicht an der Cookie-Rohzahl; § 25 Abs. 2-Hinweis im Detail.
  Regressionstest: Cookies-ohne-Tracking → KEIN Befund.
- multi_browser_scanner._extract_dimensions: Score nutzt Tracking-Violations
  + reject_respected-Verdikt statt Rohzahl (Fallback erhalten).
- BrowserBehaviorView: "Cookies vor Consent" nur rot/⚠ bei Tracking,
  "nach Ablehnen" neutral (Verdikt = reject-Spalte); erklaerende Zeile.

Speed: run_consent_test ueberspringt im Matrix-Modus (browser_profile gesetzt)
die teuren Phasen C/D-F/G — nur A+B noetig. Verhindert das 504 beim
Multi-Engine-Scan (BMW 4 Engines lief sonst in den 338s-Gateway-Timeout).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-13 00:10:41 +02:00

194 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Multi-browser consent-scan orchestrator (browser-matrix stage 1).
Runs the existing single-browser `consent_scanner.run_consent_test`
once per profile from `browser_profiles.resolve_profiles` and
aggregates the per-browser results with the worst-of rule:
* any HIGH-violation on any browser → robustness_score capped to <60
* Pre-Consent + Reject-Respekt are weighted 80% combined
* Banner-Design only contributes if the banner was detected at all
Returns a unified ScanResponse-compatible dict plus a fresh
`browser_matrix` block (one entry per profile) so the backend mail
renderer can show "Chrome 95% · Firefox 92% · WebKit 78% · Mobile-Safari 65%".
Heuristic only — the real per-test scoring (T1..T7 from the EDPB
taskforce report) is mocked here as a placeholder until the consent
scanner emits structured per-test results.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable, Awaitable
from .browser_profiles import resolve_profiles
logger = logging.getLogger(__name__)
# Worst-of capping: if pre-consent or reject-respect has ANY hard fail,
# overall robustness can never exceed this value.
_HARD_FAIL_CAP = 55
# Per-dimension weights — Sales/Risk-tuned (see strategy doc):
# Pre-Consent-Compliance 50%
# Reject-Respekt 30%
# Banner-Design / Dark 20%
_WEIGHTS = {"pre_consent": 0.5, "reject_respect": 0.3, "banner_design": 0.2}
# Nebenlaeufigkeit kappen: jeder Playwright-Browser braucht 300-500 MB; bei 7
# Profilen wuerde paralleles Starten das 2g-mem_limit des Containers sprengen
# (OOM-Kill). 2 gleichzeitig → Peak ~1 GB, Wall-Time ~Profile/2.
_MAX_CONCURRENCY = 2
def _extract_dimensions(banner_result: dict) -> dict[str, float]:
"""Best-effort: derive 3 sub-scores from the existing scan output.
Falls back to neutral 0.5 when the input is too sparse.
"""
if not banner_result:
return {"pre_consent": 0.5, "reject_respect": 0.5,
"banner_design": 0.5}
phases = banner_result.get("phases") or {}
before = phases.get("before_consent") or phases.get("before") or {}
after_reject = phases.get("after_reject") or {}
bv = (banner_result.get("banner_checks") or {}).get("violations") or []
summary = banner_result.get("summary") or {}
viol = summary.get("violations") or {}
# Pre-Consent: das rechtliche Signal ist nicht-essentielles TRACKING vor
# Consent, NICHT die Cookie-Rohzahl (essentielle inkl. Consent-Cookie sind
# nach § 25 Abs. 2 erlaubt). Fallback auf Rohzahl nur ohne Summary.
if "before_consent" in viol:
pre_track = viol.get("before_consent") or 0
pre_consent = max(0.0, 1.0 - min(1.0, pre_track / 3.0))
else:
pre_cookies = len(before.get("cookies") or [])
pre_consent = max(0.0, 1.0 - min(1.0, pre_cookies / 10.0))
# Reject: bevorzugt das reject_respected-Verdikt (kein Verstoß UND kein
# neuer Tracker), sonst after_reject-Tracking, sonst Cookie-Rohzahl.
if summary.get("reject_respected") is not None:
reject_respect = 1.0 if summary.get("reject_respected") else 0.2
elif "after_reject" in viol:
reject_respect = max(0.0, 1.0 - min(1.0, (viol.get("after_reject") or 0) / 2.0))
else:
rej_cookies = len(after_reject.get("cookies") or [])
reject_respect = max(0.0, 1.0 - min(1.0, rej_cookies / 5.0))
banner_design = max(0.0, 1.0 - min(1.0, len(bv) / 5.0))
return {
"pre_consent": round(pre_consent, 3),
"reject_respect": round(reject_respect, 3),
"banner_design": round(banner_design, 3),
}
def _score(dimensions: dict[str, float]) -> int:
base = (
dimensions["pre_consent"] * _WEIGHTS["pre_consent"]
+ dimensions["reject_respect"] * _WEIGHTS["reject_respect"]
+ dimensions["banner_design"] * _WEIGHTS["banner_design"]
)
pct = int(round(base * 100))
if (dimensions["pre_consent"] < 0.5
or dimensions["reject_respect"] < 0.5):
pct = min(pct, _HARD_FAIL_CAP)
return pct
def _verbal(score: int) -> str:
if score >= 95:
return "Im Prüfumfang keine wesentlichen Mängel"
if score >= 80:
return "Niedriges Risiko, Korrektur empfohlen"
if score >= 60:
return "Mittlere Mängel, kurzfristige Korrektur"
if score >= 30:
return "Schwere Mängel, sofortige Korrektur"
return "Bußgeldrelevante Verstöße"
async def run_matrix(
scanner: Callable[..., Awaitable[Any]],
url: str,
requested_profiles: list[str] | None = None,
**scanner_kwargs: Any,
) -> dict:
"""Run `scanner(url, profile=…, **kw)` once per profile in parallel.
`scanner` must be the existing consent_scanner.run_consent_test
or a shim with the same signature; it must accept a `browser_profile`
kwarg. Returns:
{
"browser_matrix": [
{"profile_id": ..., "label": ..., "scan": <raw scan dict>,
"dimensions": {...}, "score": int, "verbal": str},
...
],
"aggregate": {
"worst_score": int, "worst_profile": "...",
"best_score": int, "best_profile": "...",
"verbal": "...",
},
}
"""
profiles = resolve_profiles(requested_profiles)
if not profiles:
return {"browser_matrix": [], "aggregate": {}}
async def _run_one(prof: dict) -> dict:
try:
scan = await scanner(
url, browser_profile=prof, **scanner_kwargs,
)
except TypeError:
# Backward-compat: scanner that doesn't accept the kwarg
scan = await scanner(url, **scanner_kwargs)
except Exception as e:
logger.warning("matrix profile %s failed: %s", prof["id"], e)
return {
"profile_id": prof["id"], "label": prof["label"],
"engine": prof.get("engine"),
"is_mobile": bool(prof.get("device")),
"summary": None, "error": str(e)[:200],
"dimensions": {"pre_consent": 0, "reject_respect": 0,
"banner_design": 0},
"score": 0, "verbal": "Scan fehlgeschlagen",
}
dims = _extract_dimensions(scan or {})
score = _score(dims)
# Nur den kompakten `summary` an die Zeile heben — die vollen
# phases/Cookie-Listen werden für das Scoring konsumiert und dann
# verworfen (sonst bläht 6× volle Cookie-Liste die JSONB-Persistenz).
summary = (scan or {}).get("summary") if isinstance(scan, dict) else None
return {
"profile_id": prof["id"], "label": prof["label"],
"engine": prof.get("engine"),
"is_mobile": bool(prof.get("device")),
"summary": summary, "dimensions": dims, "score": score,
"verbal": _verbal(score),
}
_sem = asyncio.Semaphore(_MAX_CONCURRENCY)
async def _bounded(prof: dict) -> dict:
async with _sem:
return await _run_one(prof)
results = await asyncio.gather(*[_bounded(p) for p in profiles])
sorted_by_score = sorted(results, key=lambda r: r["score"])
worst = sorted_by_score[0]
best = sorted_by_score[-1]
return {
"browser_matrix": results,
"aggregate": {
"worst_score": worst["score"],
"worst_profile": worst["profile_id"],
"best_score": best["score"],
"best_profile": best["profile_id"],
"verbal": worst["verbal"],
"profiles_run": len(results),
},
}