Files
breakpilot-compliance/consent-tester/services/multi_browser_scanner.py
T
Benjamin Admin c7fde93061 feat(backend): On-demand Browser-Verhaltens-Matrix + Snapshot-Persistenz (Phase 2)
- check_snapshot: update_browser_matrix/load_browser_matrix — migrationsfrei
  in banner_result.browser_matrix (JSONB jsonb_set, eigener scanned_at)
- snapshot_check_routes: POST /snapshots/{id}/browser-behavior/run laeuft
  /scan-matrix LIVE (Re-Crawl je Engine, nur live messbar), persistiert das
  Ergebnis; GET /snapshots/{id}/browser-behavior liefert die gespeicherte
  Matrix ohne Re-Crawl. Profil-Set = 4 Default-Engines + Brave/Chrome/Edge.
- consent-tester multi_browser_scanner: Semaphore(2) gegen OOM (7 Browser
  parallel sprengten das 2g-mem_limit)
- Pydantic-Modell mit Optional[List[...]] (nicht `| None`) → Py3.9-sicher
- Tests: _snapshot_scan_url + Request-Defaults (5)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-12 23:03:28 +02:00

178 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Multi-browser consent-scan orchestrator (browser-matrix stage 1).
Runs the existing single-browser `consent_scanner.run_consent_test`
once per profile from `browser_profiles.resolve_profiles` and
aggregates the per-browser results with the worst-of rule:
* any HIGH-violation on any browser → robustness_score capped to <60
* Pre-Consent + Reject-Respekt are weighted 80% combined
* Banner-Design only contributes if the banner was detected at all
Returns a unified ScanResponse-compatible dict plus a fresh
`browser_matrix` block (one entry per profile) so the backend mail
renderer can show "Chrome 95% · Firefox 92% · WebKit 78% · Mobile-Safari 65%".
Heuristic only — the real per-test scoring (T1..T7 from the EDPB
taskforce report) is mocked here as a placeholder until the consent
scanner emits structured per-test results.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable, Awaitable
from .browser_profiles import resolve_profiles
logger = logging.getLogger(__name__)
# Worst-of capping: if pre-consent or reject-respect has ANY hard fail,
# overall robustness can never exceed this value.
_HARD_FAIL_CAP = 55
# Per-dimension weights — Sales/Risk-tuned (see strategy doc):
# Pre-Consent-Compliance 50%
# Reject-Respekt 30%
# Banner-Design / Dark 20%
_WEIGHTS = {"pre_consent": 0.5, "reject_respect": 0.3, "banner_design": 0.2}
# Nebenlaeufigkeit kappen: jeder Playwright-Browser braucht 300-500 MB; bei 7
# Profilen wuerde paralleles Starten das 2g-mem_limit des Containers sprengen
# (OOM-Kill). 2 gleichzeitig → Peak ~1 GB, Wall-Time ~Profile/2.
_MAX_CONCURRENCY = 2
def _extract_dimensions(banner_result: dict) -> dict[str, float]:
"""Best-effort: derive 3 sub-scores from the existing scan output.
Falls back to neutral 0.5 when the input is too sparse.
"""
if not banner_result:
return {"pre_consent": 0.5, "reject_respect": 0.5,
"banner_design": 0.5}
phases = banner_result.get("phases") or {}
before = phases.get("before_consent") or phases.get("before") or {}
after_reject = phases.get("after_reject") or {}
bv = (banner_result.get("banner_checks") or {}).get("violations") or []
pre_cookies = len(before.get("cookies") or [])
rej_cookies = len(after_reject.get("cookies") or [])
pre_consent = max(0.0, 1.0 - min(1.0, pre_cookies / 10.0))
reject_respect = max(0.0, 1.0 - min(1.0, rej_cookies / 5.0))
banner_design = max(0.0, 1.0 - min(1.0, len(bv) / 5.0))
return {
"pre_consent": round(pre_consent, 3),
"reject_respect": round(reject_respect, 3),
"banner_design": round(banner_design, 3),
}
def _score(dimensions: dict[str, float]) -> int:
base = (
dimensions["pre_consent"] * _WEIGHTS["pre_consent"]
+ dimensions["reject_respect"] * _WEIGHTS["reject_respect"]
+ dimensions["banner_design"] * _WEIGHTS["banner_design"]
)
pct = int(round(base * 100))
if (dimensions["pre_consent"] < 0.5
or dimensions["reject_respect"] < 0.5):
pct = min(pct, _HARD_FAIL_CAP)
return pct
def _verbal(score: int) -> str:
if score >= 95:
return "Im Prüfumfang keine wesentlichen Mängel"
if score >= 80:
return "Niedriges Risiko, Korrektur empfohlen"
if score >= 60:
return "Mittlere Mängel, kurzfristige Korrektur"
if score >= 30:
return "Schwere Mängel, sofortige Korrektur"
return "Bußgeldrelevante Verstöße"
async def run_matrix(
scanner: Callable[..., Awaitable[Any]],
url: str,
requested_profiles: list[str] | None = None,
**scanner_kwargs: Any,
) -> dict:
"""Run `scanner(url, profile=…, **kw)` once per profile in parallel.
`scanner` must be the existing consent_scanner.run_consent_test
or a shim with the same signature; it must accept a `browser_profile`
kwarg. Returns:
{
"browser_matrix": [
{"profile_id": ..., "label": ..., "scan": <raw scan dict>,
"dimensions": {...}, "score": int, "verbal": str},
...
],
"aggregate": {
"worst_score": int, "worst_profile": "...",
"best_score": int, "best_profile": "...",
"verbal": "...",
},
}
"""
profiles = resolve_profiles(requested_profiles)
if not profiles:
return {"browser_matrix": [], "aggregate": {}}
async def _run_one(prof: dict) -> dict:
try:
scan = await scanner(
url, browser_profile=prof, **scanner_kwargs,
)
except TypeError:
# Backward-compat: scanner that doesn't accept the kwarg
scan = await scanner(url, **scanner_kwargs)
except Exception as e:
logger.warning("matrix profile %s failed: %s", prof["id"], e)
return {
"profile_id": prof["id"], "label": prof["label"],
"engine": prof.get("engine"),
"is_mobile": bool(prof.get("device")),
"summary": None, "error": str(e)[:200],
"dimensions": {"pre_consent": 0, "reject_respect": 0,
"banner_design": 0},
"score": 0, "verbal": "Scan fehlgeschlagen",
}
dims = _extract_dimensions(scan or {})
score = _score(dims)
# Nur den kompakten `summary` an die Zeile heben — die vollen
# phases/Cookie-Listen werden für das Scoring konsumiert und dann
# verworfen (sonst bläht 6× volle Cookie-Liste die JSONB-Persistenz).
summary = (scan or {}).get("summary") if isinstance(scan, dict) else None
return {
"profile_id": prof["id"], "label": prof["label"],
"engine": prof.get("engine"),
"is_mobile": bool(prof.get("device")),
"summary": summary, "dimensions": dims, "score": score,
"verbal": _verbal(score),
}
_sem = asyncio.Semaphore(_MAX_CONCURRENCY)
async def _bounded(prof: dict) -> dict:
async with _sem:
return await _run_one(prof)
results = await asyncio.gather(*[_bounded(p) for p in profiles])
sorted_by_score = sorted(results, key=lambda r: r["score"])
worst = sorted_by_score[0]
best = sorted_by_score[-1]
return {
"browser_matrix": results,
"aggregate": {
"worst_score": worst["score"],
"worst_profile": worst["profile_id"],
"best_score": best["score"],
"best_profile": best["profile_id"],
"verbal": worst["verbal"],
"profiles_run": len(results),
},
}