diff --git a/consent-tester/requirements.txt b/consent-tester/requirements.txt index a9f52869..ad46da7f 100644 --- a/consent-tester/requirements.txt +++ b/consent-tester/requirements.txt @@ -3,3 +3,5 @@ uvicorn==0.34.2 playwright==1.52.0 playwright-stealth==1.0.6 pydantic>=2.0 +httpx>=0.27 +redis>=5.0 diff --git a/consent-tester/services/cmp_llm_fallback.py b/consent-tester/services/cmp_llm_fallback.py new file mode 100644 index 00000000..70ca48a0 --- /dev/null +++ b/consent-tester/services/cmp_llm_fallback.py @@ -0,0 +1,288 @@ +""" +LLM-based Cookie-Policy Fallback (Phase C+D of the cascade). + +When the named CMP library (Phase B) AND the generic JSON heuristic (Phase A) +both come up empty, we send the page's DOM snapshot + network log to an LLM +and ask it to find the policy. Cascade order: + + Qwen (local Ollama on Mac Mini) ← Phase C, fast + private + ↓ if invalid response + OVH (managed 120B model) ← Phase D, more capable backup + +The LLM returns one of: + {"strategy": "url", "value": ""} + {"strategy": "selector", "value": ""} + {"strategy": "text", "value": ""} + +Per-domain results are cached in Valkey (7-day TTL) so we only pay the LLM +cost once per site. + +Wiring: dsi_discovery.py calls LLMCascade.analyze(...) when self_wc < 300 +AND cmp_capture.payloads is empty. +""" + +from __future__ import annotations + +import json +import logging +import os +from typing import Any, Literal + +import httpx + +logger = logging.getLogger(__name__) + + +_SYSTEM_PROMPT = ( + "Du analysierst eine Website-Inspektion, um die Cookie-Richtlinie zu " + "finden. Wir haben einen DOM-Auszug und ein Netzwerk-Log mit JSON-" + "Responses geladen. Gib genau EIN JSON-Objekt zurueck:\n" + '{"strategy": "url"|"selector"|"text", "value": "..."}\n\n' + "- strategy=url: vollstaendige JSON-URL, die die strukturierte Cookie-" + "Policy enthaelt (z.B. CMP-Endpoints von OneTrust, Cookiebot, etc.)\n" + "- strategy=selector: CSS-Selector im aktuellen DOM, der den eigentlichen " + "Policy-Text enthaelt (kein Sub-iframe).\n" + "- strategy=text: nur wenn der Policy-Text bereits im DOM steht und du " + "ihn extrahieren kannst (max 4000 Worte).\n\n" + "Antworte AUSSCHLIESSLICH mit JSON. Keine Erklaerung. Wenn nichts " + "gefunden: {\"strategy\":\"none\",\"value\":\"\"}" +) + + +class LLMCookieExtractor: + """One LLM endpoint (Ollama or OpenAI-compatible).""" + + def __init__( + self, + kind: Literal["ollama", "openai"], + base_url: str, + model: str, + api_key: str = "", + timeout: float = 60.0, + ) -> None: + self.kind = kind + self.base_url = base_url.rstrip("/") + self.model = model + self.api_key = api_key + self.timeout = timeout + + async def analyze( + self, + target_url: str, + dom_snapshot: str, + network_log: list[dict], + ) -> dict: + """Send DOM + network info to the LLM, parse the JSON response.""" + user_prompt = _build_user_prompt(target_url, dom_snapshot, network_log) + try: + content = await self._chat(_SYSTEM_PROMPT, user_prompt) + except Exception as e: + logger.warning("%s LLM call failed: %s", self.kind, e) + return {} + return _parse_json_response(content) + + async def _chat(self, system: str, user: str) -> str: + if self.kind == "ollama": + return await self._chat_ollama(system, user) + return await self._chat_openai(system, user) + + async def _chat_ollama(self, system: str, user: str) -> str: + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + "stream": False, + "format": "json", + "options": {"temperature": 0.1, "num_predict": 2000}, + } + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post(f"{self.base_url}/api/chat", json=payload) + resp.raise_for_status() + data = resp.json() + return (data.get("message") or {}).get("content", "") + + async def _chat_openai(self, system: str, user: str) -> str: + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + "temperature": 0.1, + "max_tokens": 2000, + "response_format": {"type": "json_object"}, + } + async with httpx.AsyncClient(timeout=self.timeout) as client: + resp = await client.post( + f"{self.base_url}/v1/chat/completions", + json=payload, headers=headers, + ) + resp.raise_for_status() + data = resp.json() + choice = (data.get("choices") or [{}])[0] + return (choice.get("message") or {}).get("content", "") or "" + + +class LLMCascade: + """Try Qwen first (local, fast). Fall through to OVH on invalid result.""" + + def __init__( + self, + qwen: LLMCookieExtractor | None = None, + ovh: LLMCookieExtractor | None = None, + ) -> None: + self.qwen = qwen + self.ovh = ovh + + @classmethod + def from_env(cls) -> "LLMCascade": + qwen = None + ovh = None + + ollama_url = os.getenv("OLLAMA_URL", "http://bp-core-ollama:11434") + qwen_model = os.getenv("CMP_LLM_MODEL", "qwen3:30b-a3b") + if ollama_url and qwen_model: + qwen = LLMCookieExtractor( + kind="ollama", base_url=ollama_url, model=qwen_model, timeout=90.0, + ) + + ovh_url = os.getenv("OVH_LLM_URL", "").strip() + ovh_key = os.getenv("OVH_LLM_KEY", "").strip() + ovh_model = os.getenv("OVH_LLM_MODEL", "").strip() + if ovh_url and ovh_model: + ovh = LLMCookieExtractor( + kind="openai", base_url=ovh_url, model=ovh_model, + api_key=ovh_key, timeout=60.0, + ) + + if not qwen and not ovh: + logger.warning("LLMCascade: neither Qwen nor OVH configured") + return cls(qwen=qwen, ovh=ovh) + + async def analyze( + self, + target_url: str, + dom_snapshot: str, + network_log: list[dict], + ) -> dict: + """Return the first valid LLM analysis, or {} if all fail.""" + for tier_name, ex in (("qwen", self.qwen), ("ovh", self.ovh)): + if ex is None: + continue + res = await ex.analyze(target_url, dom_snapshot, network_log) + strategy = res.get("strategy") + if strategy in ("url", "selector", "text") and res.get("value"): + logger.info("LLM (%s) suggested strategy=%s", tier_name, strategy) + res["_tier"] = tier_name + return res + logger.info("LLM (%s) returned no usable strategy", tier_name) + return {} + + +def _build_user_prompt( + target_url: str, dom_snapshot: str, network_log: list[dict], +) -> str: + # Truncate to keep prompt < ~16K chars + dom = dom_snapshot[:5000] + log_lines = [] + for entry in network_log[:60]: + log_lines.append( + f"- {entry.get('status', '?')} " + f"{entry.get('content_type', '?')[:30]} " + f"{entry.get('size', '?')}B " + f"{entry.get('url', '')[:150]}" + ) + log_text = "\n".join(log_lines) + return ( + f"Ziel-URL: {target_url}\n\n" + f"=== DOM-Auszug (body.innerText, gekuerzt) ===\n{dom}\n\n" + f"=== Netzwerk-Log (JSON-Responses dieser Seite) ===\n{log_text}" + ) + + +def _parse_json_response(content: str) -> dict: + """LLMs sometimes wrap JSON in code-fences or add prose. Be lenient.""" + if not content: + return {} + # Try direct parse + for candidate in (content, _strip_code_fence(content), _find_json_block(content)): + if not candidate: + continue + try: + obj = json.loads(candidate) + if isinstance(obj, dict): + return obj + except Exception: + continue + return {} + + +def _strip_code_fence(s: str) -> str: + s = s.strip() + if s.startswith("```"): + lines = s.split("\n") + return "\n".join(lines[1:-1]) if lines[-1].strip().startswith("```") else "\n".join(lines[1:]) + return s + + +def _find_json_block(s: str) -> str: + start = s.find("{") + end = s.rfind("}") + if start >= 0 and end > start: + return s[start:end + 1] + return "" + + +# ── Valkey cache ──────────────────────────────────────────────────── + +async def cache_get(netloc: str) -> dict | None: + """Read a cached LLM hint for this netloc from Valkey, if any.""" + client = _valkey_client() + if not client: + return None + try: + raw = await client.get(_cache_key(netloc)) + return json.loads(raw) if raw else None + except Exception as e: + logger.debug("cache_get failed: %s", e) + return None + + +async def cache_set(netloc: str, hint: dict, ttl: int = 604800) -> None: + """Cache an LLM result for 7 days (default).""" + client = _valkey_client() + if not client: + return + try: + await client.set(_cache_key(netloc), json.dumps(hint), ex=ttl) + except Exception as e: + logger.debug("cache_set failed: %s", e) + + +def _cache_key(netloc: str) -> str: + return f"cmp:hint:{netloc.lower()}" + + +_valkey_singleton: Any = None + + +def _valkey_client() -> Any: + """Lazy-init a redis.asyncio client (Valkey-compatible). Returns None if unavailable.""" + global _valkey_singleton + if _valkey_singleton is not None: + return _valkey_singleton + try: + import redis.asyncio as redis # type: ignore[import-not-found] + url = os.getenv("VALKEY_URL", "redis://bp-core-valkey:6379") + _valkey_singleton = redis.from_url( + url, decode_responses=True, socket_connect_timeout=2.0, + ) + return _valkey_singleton + except Exception as e: + logger.debug("valkey client init failed: %s", e) + return None diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index 8b971e72..dffb87c9 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -227,6 +227,26 @@ async def discover_dsi_documents( cmp_capture = CMPCapture() cmp_capture.attach(page) + # Also collect a generic JSON response log for the LLM fallback (Phase C+D) + # if everything else fails. Keep it small (header info only, not bodies). + network_log: list[dict] = [] + + async def _on_response_log(response): + try: + ct = (response.headers.get("content-type") or "").lower() + if "json" not in ct: + return + network_log.append({ + "url": response.url, + "status": response.status, + "content_type": ct, + "size": int(response.headers.get("content-length") or 0), + }) + except Exception: + pass + + page.on("response", _on_response_log) + try: # Step 1: Load the page (with networkidle → domcontentloaded fallback) await goto_resilient(page, url, timeout=60000) @@ -334,6 +354,22 @@ async def discover_dsi_documents( self_text = cmp_text self_wc = cmp_wc + # Phase C/D: LLM cascade fallback. Triggers only when both + # named CMPs (Phase B) and the generic heuristic (Phase A) + # produced nothing AND the DOM is too thin to be a real policy. + if self_wc < 300 and not cmp_capture.payloads: + llm_text, llm_wc = await _try_llm_cascade( + page, url, network_log, + ) + if llm_wc > self_wc: + logger.info( + "Self-extraction via LLM cascade for %s: %d words " + "(replacing %d-word DOM)", + url, llm_wc, self_wc, + ) + self_text = llm_text + self_wc = llm_wc + if self_wc >= 100: page_title = await page.title() or url result.documents.append(DiscoveredDSI( @@ -751,3 +787,101 @@ async def _extract_text_from_iframes(page: Page) -> str: except Exception as e: logger.debug("Iframe extraction failed: %s", e) return "" + + +async def _try_llm_cascade( + page: Page, target_url: str, network_log: list[dict], +) -> tuple[str, int]: + """Phase C/D fallback: ask Qwen (then OVH) where the cookie policy is. + + Returns (text, word_count). On failure or no LLM configured: ("", 0). + + Caches the LLM's suggestion in Valkey per netloc (7d TTL) so subsequent + runs against the same domain skip the LLM call. + """ + from urllib.parse import urlparse + from services.cmp_llm_fallback import ( + LLMCascade, cache_get, cache_set, + ) + + netloc = urlparse(target_url).netloc.lower() + if not netloc: + return "", 0 + + # Cache hit: apply hint directly + cached = await cache_get(netloc) + if cached: + text = await _apply_llm_hint(page, cached) + wc = len(text.split()) if text else 0 + if wc >= 300: + logger.info("LLM cache hit for %s: %d words", netloc, wc) + return text, wc + # Cached hint stale — fall through to fresh LLM call + + # DOM snapshot for the LLM prompt + try: + dom_snapshot = await page.evaluate( + "() => (document.body && document.body.innerText || '').slice(0, 5000)" + ) or "" + except Exception: + dom_snapshot = "" + + cascade = LLMCascade.from_env() + hint = await cascade.analyze(target_url, dom_snapshot, network_log) + if not hint: + return "", 0 + + text = await _apply_llm_hint(page, hint) + wc = len(text.split()) if text else 0 + if wc >= 300: + await cache_set(netloc, hint) + logger.info("LLM cached for %s (%s): %d words", netloc, hint.get("_tier"), wc) + return text, wc + + +async def _apply_llm_hint(page: Page, hint: dict) -> str: + """Execute the LLM's suggested strategy and return extracted text.""" + strategy = hint.get("strategy") + value = hint.get("value", "") + + if strategy == "text": + return value or "" + + if strategy == "selector" and value: + try: + return await page.evaluate( + "(sel) => { const e = document.querySelector(sel); " + "return e ? (e.innerText || e.textContent || '').trim() : ''; }", + value, + ) or "" + except Exception as e: + logger.debug("LLM selector failed (%s): %s", value, e) + return "" + + if strategy == "url" and value: + try: + resp = await page.context.request.get(value, timeout=30000) + if resp.status != 200: + return "" + ct = (resp.headers.get("content-type") or "").lower() + if "json" in ct: + from services.cmp_heuristic import ( + looks_like_cookie_policy, reconstruct_generic, + ) + data = await resp.json() + if looks_like_cookie_policy(data): + return reconstruct_generic(data) + # Even if heuristic rejects, try generic walker + return reconstruct_generic(data) + text = await resp.text() + # Strip HTML if HTML response + if "html" in ct: + import re as _re + text = _re.sub(r"<[^>]+>", " ", text) + text = _re.sub(r"\s+", " ", text).strip() + return text + except Exception as e: + logger.debug("LLM url fetch failed (%s): %s", value[:80], e) + return "" + + return "" diff --git a/docker-compose.yml b/docker-compose.yml index 3af0dc3e..f721ec26 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,7 @@ networks: volumes: dsms_data: + cmp-data: # consent-tester: CMP discovery log + auto-promoted modules services: @@ -256,6 +257,18 @@ services: depends_on: core-health-check: condition: service_completed_successfully + environment: + # LLM fallback for cookie-policy extraction (Phase C+D of cascade) + OLLAMA_URL: "${OLLAMA_URL:-http://bp-core-ollama:11434}" + CMP_LLM_MODEL: "${CMP_LLM_MODEL:-qwen3:30b-a3b}" + OVH_LLM_URL: "${OVH_LLM_URL:-}" + OVH_LLM_KEY: "${OVH_LLM_KEY:-}" + OVH_LLM_MODEL: "${OVH_LLM_MODEL:-}" + VALKEY_URL: "${VALKEY_URL:-redis://bp-core-valkey:6379}" + # CMP discovery log (Phase E auto-promote) + CMP_DISCOVERY_DB: "/data/cmp_discoveries.db" + volumes: + - cmp-data:/data healthcheck: test: ["CMD", "curl", "-f", "http://127.0.0.1:8094/health"] interval: 30s