diff --git a/backend-compliance/compliance/api/agent_check/_b9b10_wiring.py b/backend-compliance/compliance/api/agent_check/_b9b10_wiring.py new file mode 100644 index 00000000..d7ee8a7f --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b9b10_wiring.py @@ -0,0 +1,92 @@ +"""B9 + B10 wiring — Multi-Entity-Impressum + Drittland-Mechanismus. + +Runs after B6/B7/B8. Adds Findings into `state["extra_findings"]` +and re-renders the extra-block HTML. +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.impressum_multi_entity_check import ( + check_multi_entity_impressum, +) +from compliance.services.transfer_mechanism_check import ( + check_transfer_mechanism, +) + +logger = logging.getLogger(__name__) + + +def run_b9b10(state: dict) -> None: + extras = state.get("extra_findings") or [] + new: list[dict] = [] + new.extend(check_multi_entity_impressum(state)) + new.extend(check_transfer_mechanism(state)) + if not new: + return + extras.extend(new) + state["extra_findings"] = extras + state["extra_findings_html"] = _render(extras) + logger.info("B9/B10 added %d findings (total extra=%d)", + len(new), len(extras)) + + +def _render(findings: list[dict]) -> str: + cards = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else ( + "#f59e0b" if sev == "MEDIUM" else "#64748b" + ) + meta = "" + if f.get("entities_missing"): + meta = ("
" + f"Fehlt bei: " + f"{html.escape(', '.join(f['entities_missing']))}" + "
") + elif f.get("vendor"): + meta = ("
" + f"Vendor: {html.escape(f['vendor'])} " + f"({html.escape(f.get('country','?'))})" + "
") + elif f.get("doc_date"): + meta = ("
" + f"Stand: {html.escape(f['doc_date'])} " + f"({f.get('age_years','?')} J. alt)" + "
") + elif f.get("detected_provider"): + meta = ("
" + f"Erkannter Provider: " + f"{html.escape(f['detected_provider'])}" + "
") + elif f.get("evidence_dse"): + meta = ("
" + f"In DSE: {html.escape(', '.join(f['evidence_dse']))}" + "
") + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"{meta}" + f"
" + f"→ Empfehlung: " + f"{html.escape(f.get('action') or '')}
" + "
" + ) + return ( + "
" + "

" + "📌 Zusätzliche Cross-Doc-Befunde" + "

" + + "".join(cards) + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 49e3f0af..c3252f40 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -21,6 +21,7 @@ from ._b3_wiring import run_b3 from ._b4_wiring import run_b4 from ._b5_wiring import run_b5 from ._b6b7b8_wiring import run_b6b7b8 +from ._b9b10_wiring import run_b9b10 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b @@ -63,6 +64,7 @@ async def run_compliance_check(check_id: str, req) -> None: run_b4(state) # Cross-doc vendor-consistency (Elli Vertex↔Iadvize) run_b5(state) # AI-Act Art. 50 transparency run_b6b7b8(state) # DPO-cross-doc + Doc-Staleness + CMP-fingerprint + run_b9b10(state) # Multi-Entity-Impressum + Drittland-Mechanismus # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/services/finding_plausibility_check.py b/backend-compliance/compliance/services/finding_plausibility_check.py index 66693e0f..daecc07d 100644 --- a/backend-compliance/compliance/services/finding_plausibility_check.py +++ b/backend-compliance/compliance/services/finding_plausibility_check.py @@ -51,8 +51,13 @@ logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") MODEL = os.getenv("PLAUSIBILITY_LLM_MODEL", "qwen3:30b-a3b") -BATCH_SIZE = int(os.getenv("PLAUSIBILITY_BATCH_SIZE", "8")) -TIMEOUT = float(os.getenv("PLAUSIBILITY_TIMEOUT_S", "60.0")) +# Reduced from 8 → 4 to fight qwen3 empty-response-on-large-prompts bug. +# 4 items × ~500 token/item + 2000 system + 1500 excerpt = ~5500 token total, +# well within qwen3's safe range for format='json'. +BATCH_SIZE = int(os.getenv("PLAUSIBILITY_BATCH_SIZE", "4")) +TIMEOUT = float(os.getenv("PLAUSIBILITY_TIMEOUT_S", "45.0")) +# Reduced excerpt 4000 → 1500 chars (same reason). +DOC_EXCERPT_CHARS = int(os.getenv("PLAUSIBILITY_DOC_EXCERPT", "1500")) # In-memory cache: (input_hash) -> result_dict. Survives one run. _CACHE: dict[str, dict] = {} @@ -121,7 +126,8 @@ def _build_user_prompt(items: list[dict], doc_title: str, ) return ( f"DOKUMENT: {doc_title}\n\n" - f"DOKUMENT-AUSZUG (max 4000 Zeichen):\n{doc_excerpt[:4000]}\n\n" + f"DOKUMENT-AUSZUG (max {DOC_EXCERPT_CHARS} Zeichen):\n" + f"{doc_excerpt[:DOC_EXCERPT_CHARS]}\n\n" f"FINDINGS ZU BEWERTEN:\n{findings_block}" ) @@ -149,6 +155,23 @@ async def _ask_llm_batch(items: list[dict], doc_title: str, r.raise_for_status() content = (r.json().get("message") or {}).get("content", "") if not content: + # Single retry with smaller batch — qwen3 sometimes + # rejects ≥6-item prompts under format='json'. + if len(items) > 2: + half = len(items) // 2 + logger.info( + "plausibility empty → retry split %d → %dx2", + len(items), half, + ) + first = await _ask_llm_batch( + items[:half], doc_title, doc_excerpt, + ) + second = await _ask_llm_batch( + items[half:], doc_title, doc_excerpt, + ) + out.update(first) + out.update(second) + return out logger.warning("plausibility LLM returned empty content") return out try: diff --git a/backend-compliance/compliance/services/impressum_multi_entity_check.py b/backend-compliance/compliance/services/impressum_multi_entity_check.py new file mode 100644 index 00000000..00d41530 --- /dev/null +++ b/backend-compliance/compliance/services/impressum_multi_entity_check.py @@ -0,0 +1,99 @@ +"""B9 — Multi-Entity-Impressum-Check. + +Findings, wenn ein Impressum mehrere Entitäten (mehrere GmbH/AG/UG) +nennt, aber Pflichtangaben nur bei einer davon vollständig sind. + +Konkreter Elli-Pattern (GT IMPRESSUM-001): + - Entity 1: "Elli Mobility GmbH ... USt-IdNr DE814424009 ..." + - Entity 2: "VW Group Charging GmbH ... [keine USt-IdNr] ..." + → USt-IdNr fehlt bei Entity 2. + +Heuristik: + 1. Entitäten erkennen: jede Match auf " (GmbH|AG|UG|KG|SE)" als + Entity-Boundary; Text-Slice von dort bis zur nächsten Entity. + 2. Pro Entity prüfen: USt-IdNr, Handelsregister, Vertretungsberechtigte. + 3. Wenn Entity N ein Feld nennt, das Entity M nicht hat → MEDIUM. +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + +_ENTITY_PAT = re.compile( + r"([A-ZÄÖÜ][\w\-\&\s]{1,50}?\s+(?:GmbH|AG|UG|KG|SE|" + r"e\.V\.|GbR|OHG|Limited|Ltd|LLC))", + re.IGNORECASE, +) + +_USTID_PAT = re.compile(r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]\s*" + r"(DE\d{8,10}|[A-Z]{2}\d{6,12})", re.IGNORECASE) +_HR_PAT = re.compile(r"\b(?:HR[BA]|Handelsregister|Registergericht)" + r"\s*[:.\s]*([\w\s\d\-/]{4,80})", re.IGNORECASE) +_GF_PAT = re.compile(r"(?:Geschäftsführer|Vertretungsberechtigt|" + r"vertreten\s+durch)\s*[:.\s]+", re.IGNORECASE) + + +def _slice_entities(text: str) -> list[tuple[str, str]]: + """Return [(entity_name, text_slice)] for each detected entity.""" + matches = list(_ENTITY_PAT.finditer(text)) + if len(matches) < 2: + return [] + slices: list[tuple[str, str]] = [] + for i, m in enumerate(matches): + start = m.start() + end = matches[i + 1].start() if i + 1 < len(matches) else len(text) + slices.append((m.group(1).strip(), text[start:end])) + return slices + + +def check_multi_entity_impressum(state: dict) -> list[dict]: + doc_texts = state.get("doc_texts") or {} + imp = doc_texts.get("impressum") or "" + if not imp: + return [] + slices = _slice_entities(imp) + if not slices: + return [] + # Compute features per entity + features = [] + for name, slc in slices: + features.append({ + "name": name, + "ust_id": bool(_USTID_PAT.search(slc)), + "hr": bool(_HR_PAT.search(slc)), + "gf": bool(_GF_PAT.search(slc)), + }) + # If ALL share the same flags → no inconsistency + findings: list[dict] = [] + for field, label in ( + ("ust_id", "USt-IdNr."), + ("hr", "Handelsregister-Eintrag"), + ("gf", "Vertretungsberechtigte"), + ): + present = [f for f in features if f[field]] + missing = [f for f in features if not f[field]] + if present and missing and len(present) >= 1: + findings.append({ + "check_id": f"IMPRESSUM-MULTI-{field.upper()}", + "severity": "MEDIUM", + "severity_reason": "incomplete", + "title": ( + f"{label} fehlt bei " + f"{len(missing)} von {len(features)} Entitäten" + ), + "norm": "§ 5 Abs. 1 TMG (Pflichtangabe pro Diensteanbieter)", + "entities_present": [f["name"] for f in present], + "entities_missing": [f["name"] for f in missing], + "action": ( + f"{label} im Impressum für " + f"{', '.join(f['name'] for f in missing)} ergänzen. " + "Pflichtangabe ist pro Diensteanbieter zu erfüllen, " + "nicht 'eine reicht für alle'." + ), + }) + if findings: + logger.info("B9 multi-entity impressum: %d findings", len(findings)) + return findings diff --git a/backend-compliance/compliance/services/transfer_mechanism_check.py b/backend-compliance/compliance/services/transfer_mechanism_check.py new file mode 100644 index 00000000..983451ed --- /dev/null +++ b/backend-compliance/compliance/services/transfer_mechanism_check.py @@ -0,0 +1,98 @@ +"""B10 — Drittland-Transfer-Mechanismus-Konsistenz pro Vendor. + +DSGVO Art. 44 ff. verlangt für Drittland-Transfers EINEN klaren +Mechanismus: Angemessenheitsbeschluss / EU-US DPF / SCCs / BCRs / +ausdrückliche Einwilligung. Wenn ein Vendor in cmp_vendors als +Drittland-Verarbeiter erkannt wird, muss der DSE-Text einen +Mechanismus pro Vendor (oder per Vendor-Kategorie) klar benennen. + +GT-Pattern Elli (TRANSFER-001): + - Google/Meta → DPF in DSE genannt ✓ + - Salesforce → SCCs ✓ + - Webflow als US-Sitz erwähnt aber kein Mechanismus → MEDIUM + +Heuristik: + 1. Aus cmp_vendors die Drittland-Vendors filtern (third_country=True). + 2. Im DSE-Text suchen, ob pro Vendor ein Mechanismus erwähnt ist. + 3. Wenn ein Drittland-Vendor keinen Mechanismus hat → MEDIUM. +""" + +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + +_MECHANISM_KEYWORDS = ( + ("DPF / Data Privacy Framework", + ["data privacy framework", "dpf-", "eu-us dpf", + "angemessenheitsbeschluss"]), + ("Standardvertragsklauseln (SCCs)", + ["standardvertragsklauseln", "scc-", "scc ", "standard contractual", + "art. 46 abs. 2 lit. c"]), + ("Binding Corporate Rules", + ["binding corporate rules", "bcr-", "verbindliche unternehmensregeln"]), + ("Ausdrückliche Einwilligung", + ["ausdrückliche einwilligung nach art. 49", + "explicit consent under art. 49"]), +) + + +def _mechanism_for_vendor(vendor_name: str, dse_text: str) -> str | None: + if not vendor_name or not dse_text: + return None + name_lc = vendor_name.lower() + text_lc = dse_text.lower() + # Find vendor mention in DSE; locate a ±400 char window for + # mechanism keywords + idx = text_lc.find(name_lc) + if idx < 0: + return None + window = text_lc[max(0, idx - 400): idx + 400] + for mech_label, kws in _MECHANISM_KEYWORDS: + if any(k in window for k in kws): + return mech_label + return None + + +def check_transfer_mechanism(state: dict) -> list[dict]: + cmp_vendors = state.get("cmp_vendors") or [] + doc_texts = state.get("doc_texts") or {} + dse = doc_texts.get("dse") or "" + if not cmp_vendors or not dse: + return [] + findings: list[dict] = [] + for v in cmp_vendors: + country = (v.get("country") or "").upper().strip() + name = (v.get("name") or "").strip() + if not name: + continue + # Skip EU/EEA + if country in ("DE", "AT", "BE", "BG", "HR", "CY", "CZ", "DK", + "EE", "FI", "FR", "GR", "HU", "IE", "IT", "LV", + "LT", "LU", "MT", "NL", "PL", "PT", "RO", "SK", + "SI", "ES", "SE", "IS", "LI", "NO", "CH"): + continue + # Either flagged as third_country OR country not in EU + mech = _mechanism_for_vendor(name, dse) + if mech is None: + findings.append({ + "check_id": "TRANSFER-MECH-001", + "vendor": name, + "country": country or "UNKNOWN", + "severity": "MEDIUM", + "severity_reason": "missing", + "title": ( + f"Drittland-Transfer-Mechanismus für {name} " + f"({country or 'Drittland'}) fehlt in DSE" + ), + "norm": "DSGVO Art. 44 + Art. 46 / Art. 49", + "action": ( + f"Im DSE-Abschnitt zu {name} den Transfermechanismus " + "angeben (DPF / SCCs / BCRs / Einwilligung) und ggf. " + "Vertragsdokument referenzieren." + ), + }) + if findings: + logger.info("B10 transfer-mechanism: %d findings", len(findings)) + return findings diff --git a/consent-tester/Dockerfile b/consent-tester/Dockerfile index 8977fa32..68d7134a 100644 --- a/consent-tester/Dockerfile +++ b/consent-tester/Dockerfile @@ -8,6 +8,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 \ libxrandr2 libgbm1 libpango-1.0-0 libcairo2 libasound2 \ curl \ + # Browser-matrix stage 1: Firefox + WebKit deps + Xvfb (headed runs) + xvfb \ + libdbus-glib-1-2 libxt6 \ + libwoff1 libvpx7 libevent-2.1-7 libopus0 libgstreamer-plugins-base1.0-0 \ + libgstreamer-gl1.0-0 libgstreamer1.0-0 libwebpdemux2 libharfbuzz-icu0 \ + libenchant-2-2 libsecret-1-0 libhyphen0 libmanette-0.2-0 libflite1 \ + libgles2 libx264-164 \ && rm -rf /var/lib/apt/lists/* # Create user BEFORE installing Playwright (so browsers are in user's cache) @@ -17,8 +24,9 @@ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Install Playwright browsers AS appuser (so they land in /home/appuser/.cache/) +# Stage 1: chromium + firefox + webkit (Mobile-Safari = WebKit + devices preset) USER appuser -RUN playwright install chromium +RUN playwright install chromium firefox webkit USER root COPY . . diff --git a/consent-tester/main.py b/consent-tester/main.py index 274938ec..401f2b4f 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -60,6 +60,10 @@ class ScanResponse(BaseModel): banner_screenshot_b64: str = "" # P85: base64-PNG des Banners (initial-view) +from routes_matrix import router as matrix_router +app.include_router(matrix_router) + + @app.get("/health") async def health(): return {"status": "healthy", "service": "consent-tester"} diff --git a/consent-tester/routes_matrix.py b/consent-tester/routes_matrix.py new file mode 100644 index 00000000..8ef3fb1b --- /dev/null +++ b/consent-tester/routes_matrix.py @@ -0,0 +1,61 @@ +"""POST /scan-matrix — browser-matrix stage-1 endpoint. + +Runs the existing consent_scanner once per browser profile and +returns the aggregated robustness-score per browser plus a +worst-of/best-of summary. Kept in its own module so main.py stays +under the 500-LOC cap. + +KNOWN LIMITATION (stage 1.a): + The underlying `run_consent_test` does not yet accept a + `browser_profile` kwarg — all profiles currently execute on the + same Chromium instance. Engine diversity (real Firefox/WebKit + contexts) ships in stage 1.b once consent_scanner is split. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from fastapi import APIRouter +from pydantic import BaseModel + +from services.consent_scanner import run_consent_test +from services.multi_browser_scanner import run_matrix + +logger = logging.getLogger(__name__) +router = APIRouter() + + +class MatrixScanRequest(BaseModel): + url: str + timeout_per_phase: int = 10 + categories: list[str] = [] + # Resolved against browser_profiles.resolve_profiles. None or + # empty list → default 4 profiles (chromium/firefox/webkit/iphone). + browser_profiles: list[str] | None = None + + +async def _scanner_shim(url: str, browser_profile: dict | None = None, + timeout_per_phase: int = 10, + categories: list[str] | None = None): + """Shim that ignores `browser_profile` until consent_scanner accepts it.""" + return await run_consent_test(url, timeout_per_phase, + categories or []) + + +@router.post("/scan-matrix") +async def scan_matrix(req: MatrixScanRequest): + """Run consent-scan across the resolved browser-profile matrix.""" + logger.info("Matrix scan for %s profiles=%s", req.url, + req.browser_profiles or "default") + matrix = await run_matrix( + _scanner_shim, + req.url, + requested_profiles=req.browser_profiles, + timeout_per_phase=req.timeout_per_phase, + categories=req.categories, + ) + matrix["url"] = req.url + matrix["scanned_at"] = datetime.now(timezone.utc).isoformat() + return matrix diff --git a/consent-tester/services/browser_profiles.py b/consent-tester/services/browser_profiles.py new file mode 100644 index 00000000..f57cea84 --- /dev/null +++ b/consent-tester/services/browser_profiles.py @@ -0,0 +1,138 @@ +"""Browser-matrix stage-1 profile registry. + +Each profile is a deterministic recipe for a Playwright BrowserContext. +The orchestrator runs the scan once per profile and aggregates the +results with the worst-of-rule (a HIGH on any browser → HIGH overall). + +Keep this module dependency-light so it can be imported in unit tests +without spawning Playwright. The Playwright glue lives in +`services/multi_browser_scanner.py`. + +Profile schema: + { + "id": str canonical identifier shown in the audit report + "label": str human-readable name + "engine": str blink | gecko | webkit + "channel": str? Playwright channel ('chrome' / 'msedge') + "device": str? Playwright devices preset for mobile emulation + "headless": bool + "viewport": {"width": int, "height": int} (ignored when `device` set) + "locale": str + "timezone": str + "user_agent": str? overridden UA when not derived from device + } +""" + +from __future__ import annotations + +DEFAULT_PROFILES: list[dict] = [ + { + "id": "chromium-headed-de", + "label": "Chromium (Headed) · de-DE", + "engine": "blink", + "channel": None, + "device": None, + "headless": False, + "viewport": {"width": 1920, "height": 1080}, + "locale": "de-DE", + "timezone": "Europe/Berlin", + "user_agent": None, + }, + { + "id": "firefox-headed-de", + "label": "Firefox (Headed, ETP-Standard) · de-DE", + "engine": "gecko", + "channel": None, + "device": None, + "headless": False, + "viewport": {"width": 1920, "height": 1080}, + "locale": "de-DE", + "timezone": "Europe/Berlin", + "user_agent": None, + }, + { + "id": "webkit-headed-de", + "label": "WebKit (Headed) · de-DE", + "engine": "webkit", + "channel": None, + "device": None, + "headless": False, + "viewport": {"width": 1920, "height": 1080}, + "locale": "de-DE", + "timezone": "Europe/Berlin", + "user_agent": None, + }, + { + "id": "iphone-mobile-safari-de", + "label": "Mobile Safari (iPhone 15) · de-DE", + "engine": "webkit", + "channel": None, + "device": "iPhone 15", + "headless": False, + "viewport": None, + "locale": "de-DE", + "timezone": "Europe/Berlin", + "user_agent": None, + }, +] + + +# Optional profiles enabled via env var BROWSER_PROFILES_EXTRA +EXTRA_PROFILES: dict[str, dict] = { + "chrome-channel-desktop-de": { + "id": "chrome-channel-desktop-de", + "label": "Chrome Channel (Google Build) · de-DE", + "engine": "blink", + "channel": "chrome", + "device": None, + "headless": False, + "viewport": {"width": 1920, "height": 1080}, + "locale": "de-DE", + "timezone": "Europe/Berlin", + "user_agent": None, + }, + "edge-channel-desktop-de": { + "id": "edge-channel-desktop-de", + "label": "Edge Channel · de-DE", + "engine": "blink", + "channel": "msedge", + "device": None, + "headless": False, + "viewport": {"width": 1920, "height": 1080}, + "locale": "de-DE", + "timezone": "Europe/Berlin", + "user_agent": None, + }, + "brave-default-de": { + "id": "brave-default-de", + "label": "Brave Default-Shields · de-DE", + "engine": "blink", + "channel": None, + "device": None, + "headless": False, + "viewport": {"width": 1920, "height": 1080}, + "locale": "de-DE", + "timezone": "Europe/Berlin", + "user_agent": None, + "executable_path": "/usr/bin/brave-browser", + }, +} + + +def resolve_profiles(requested: list[str] | None) -> list[dict]: + """Map requested ids to profile dicts. Falls back to all defaults + when `requested` is None or empty.""" + if not requested: + return list(DEFAULT_PROFILES) + by_id = {p["id"]: p for p in DEFAULT_PROFILES} + by_id.update(EXTRA_PROFILES) + out: list[dict] = [] + for r in requested: + prof = by_id.get(r) + if prof: + out.append(prof) + return out or list(DEFAULT_PROFILES) + + +def default_ids() -> list[str]: + return [p["id"] for p in DEFAULT_PROFILES] diff --git a/consent-tester/services/multi_browser_scanner.py b/consent-tester/services/multi_browser_scanner.py new file mode 100644 index 00000000..61fb8059 --- /dev/null +++ b/consent-tester/services/multi_browser_scanner.py @@ -0,0 +1,158 @@ +"""Multi-browser consent-scan orchestrator (browser-matrix stage 1). + +Runs the existing single-browser `consent_scanner.run_consent_test` +once per profile from `browser_profiles.resolve_profiles` and +aggregates the per-browser results with the worst-of rule: + + * any HIGH-violation on any browser → robustness_score capped to <60 + * Pre-Consent + Reject-Respekt are weighted 80% combined + * Banner-Design only contributes if the banner was detected at all + +Returns a unified ScanResponse-compatible dict plus a fresh +`browser_matrix` block (one entry per profile) so the backend mail +renderer can show "Chrome 95% · Firefox 92% · WebKit 78% · Mobile-Safari 65%". + +Heuristic only — the real per-test scoring (T1..T7 from the EDPB +taskforce report) is mocked here as a placeholder until the consent +scanner emits structured per-test results. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Callable, Awaitable + +from .browser_profiles import resolve_profiles + +logger = logging.getLogger(__name__) + +# Worst-of capping: if pre-consent or reject-respect has ANY hard fail, +# overall robustness can never exceed this value. +_HARD_FAIL_CAP = 55 + +# Per-dimension weights — Sales/Risk-tuned (see strategy doc): +# Pre-Consent-Compliance 50% +# Reject-Respekt 30% +# Banner-Design / Dark 20% +_WEIGHTS = {"pre_consent": 0.5, "reject_respect": 0.3, "banner_design": 0.2} + + +def _extract_dimensions(banner_result: dict) -> dict[str, float]: + """Best-effort: derive 3 sub-scores from the existing scan output. + + Falls back to neutral 0.5 when the input is too sparse. + """ + if not banner_result: + return {"pre_consent": 0.5, "reject_respect": 0.5, + "banner_design": 0.5} + phases = banner_result.get("phases") or {} + before = phases.get("before_consent") or phases.get("before") or {} + after_reject = phases.get("after_reject") or {} + bv = (banner_result.get("banner_checks") or {}).get("violations") or [] + pre_cookies = len(before.get("cookies") or []) + rej_cookies = len(after_reject.get("cookies") or []) + pre_consent = max(0.0, 1.0 - min(1.0, pre_cookies / 10.0)) + reject_respect = max(0.0, 1.0 - min(1.0, rej_cookies / 5.0)) + banner_design = max(0.0, 1.0 - min(1.0, len(bv) / 5.0)) + return { + "pre_consent": round(pre_consent, 3), + "reject_respect": round(reject_respect, 3), + "banner_design": round(banner_design, 3), + } + + +def _score(dimensions: dict[str, float]) -> int: + base = ( + dimensions["pre_consent"] * _WEIGHTS["pre_consent"] + + dimensions["reject_respect"] * _WEIGHTS["reject_respect"] + + dimensions["banner_design"] * _WEIGHTS["banner_design"] + ) + pct = int(round(base * 100)) + if (dimensions["pre_consent"] < 0.5 + or dimensions["reject_respect"] < 0.5): + pct = min(pct, _HARD_FAIL_CAP) + return pct + + +def _verbal(score: int) -> str: + if score >= 95: + return "Im Prüfumfang keine wesentlichen Mängel" + if score >= 80: + return "Niedriges Risiko, Korrektur empfohlen" + if score >= 60: + return "Mittlere Mängel, kurzfristige Korrektur" + if score >= 30: + return "Schwere Mängel, sofortige Korrektur" + return "Bußgeldrelevante Verstöße" + + +async def run_matrix( + scanner: Callable[..., Awaitable[Any]], + url: str, + requested_profiles: list[str] | None = None, + **scanner_kwargs: Any, +) -> dict: + """Run `scanner(url, profile=…, **kw)` once per profile in parallel. + + `scanner` must be the existing consent_scanner.run_consent_test + or a shim with the same signature; it must accept a `browser_profile` + kwarg. Returns: + + { + "browser_matrix": [ + {"profile_id": ..., "label": ..., "scan": , + "dimensions": {...}, "score": int, "verbal": str}, + ... + ], + "aggregate": { + "worst_score": int, "worst_profile": "...", + "best_score": int, "best_profile": "...", + "verbal": "...", + }, + } + """ + profiles = resolve_profiles(requested_profiles) + if not profiles: + return {"browser_matrix": [], "aggregate": {}} + + async def _run_one(prof: dict) -> dict: + try: + scan = await scanner( + url, browser_profile=prof, **scanner_kwargs, + ) + except TypeError: + # Backward-compat: scanner that doesn't accept the kwarg + scan = await scanner(url, **scanner_kwargs) + except Exception as e: + logger.warning("matrix profile %s failed: %s", prof["id"], e) + return { + "profile_id": prof["id"], "label": prof["label"], + "scan": None, "error": str(e)[:200], + "dimensions": {"pre_consent": 0, "reject_respect": 0, + "banner_design": 0}, + "score": 0, "verbal": "Scan fehlgeschlagen", + } + dims = _extract_dimensions(scan or {}) + score = _score(dims) + return { + "profile_id": prof["id"], "label": prof["label"], + "scan": scan, "dimensions": dims, "score": score, + "verbal": _verbal(score), + } + + results = await asyncio.gather(*[_run_one(p) for p in profiles]) + sorted_by_score = sorted(results, key=lambda r: r["score"]) + worst = sorted_by_score[0] + best = sorted_by_score[-1] + return { + "browser_matrix": results, + "aggregate": { + "worst_score": worst["score"], + "worst_profile": worst["profile_id"], + "best_score": best["score"], + "best_profile": best["profile_id"], + "verbal": worst["verbal"], + "profiles_run": len(results), + }, + }