feat(consent-tester): Phase C+D — LLM cascade fallback (Qwen → OVH)

New module consent-tester/services/cmp_llm_fallback.py:
- LLMCookieExtractor: single-endpoint adapter (Ollama OR OpenAI-compat)
- LLMCascade: tries Qwen (local Mac Mini Ollama) first; falls through to
  OVH (managed 120B) when Qwen returns no usable strategy
- LLMCascade.from_env(): reads OLLAMA_URL/CMP_LLM_MODEL + OVH_LLM_URL/
  OVH_LLM_KEY/OVH_LLM_MODEL from environment
- LLM returns JSON {strategy: url|selector|text, value: ...}
- Valkey-backed cache per netloc (cmp:hint:<netloc>, 7-day TTL) — next run
  against the same domain skips the LLM entirely

dsi_discovery.py:
- Wired network_log collector (URL/status/content-type/size of every JSON
  response on the page) — passed to LLM prompt as observation
- After Named CMP (Phase B) + Heuristic (Phase A) both fail AND DOM
  < 300 words: invoke LLMCascade.analyze(...)
- _apply_llm_hint executes the LLM's strategy: refetch URL via Playwright
  request context, query DOM selector, or use text directly
- Cache HIT path: apply cached hint, only fall back to LLM if cache is stale

docker-compose.yml:
- consent-tester gets env vars + cmp-data volume (for Phase E)
- All LLM endpoints configurable via env, sensible defaults

consent-tester/requirements.txt:
- redis>=5.0 (asyncio client, Valkey-compatible)
- httpx>=0.27
This commit is contained in:
Benjamin Admin
2026-05-16 23:06:05 +02:00
parent e9002175ac
commit 2400aa6a9e
4 changed files with 437 additions and 0 deletions
+134
View File
@@ -227,6 +227,26 @@ async def discover_dsi_documents(
cmp_capture = CMPCapture()
cmp_capture.attach(page)
# Also collect a generic JSON response log for the LLM fallback (Phase C+D)
# if everything else fails. Keep it small (header info only, not bodies).
network_log: list[dict] = []
async def _on_response_log(response):
try:
ct = (response.headers.get("content-type") or "").lower()
if "json" not in ct:
return
network_log.append({
"url": response.url,
"status": response.status,
"content_type": ct,
"size": int(response.headers.get("content-length") or 0),
})
except Exception:
pass
page.on("response", _on_response_log)
try:
# Step 1: Load the page (with networkidle → domcontentloaded fallback)
await goto_resilient(page, url, timeout=60000)
@@ -334,6 +354,22 @@ async def discover_dsi_documents(
self_text = cmp_text
self_wc = cmp_wc
# Phase C/D: LLM cascade fallback. Triggers only when both
# named CMPs (Phase B) and the generic heuristic (Phase A)
# produced nothing AND the DOM is too thin to be a real policy.
if self_wc < 300 and not cmp_capture.payloads:
llm_text, llm_wc = await _try_llm_cascade(
page, url, network_log,
)
if llm_wc > self_wc:
logger.info(
"Self-extraction via LLM cascade for %s: %d words "
"(replacing %d-word DOM)",
url, llm_wc, self_wc,
)
self_text = llm_text
self_wc = llm_wc
if self_wc >= 100:
page_title = await page.title() or url
result.documents.append(DiscoveredDSI(
@@ -751,3 +787,101 @@ async def _extract_text_from_iframes(page: Page) -> str:
except Exception as e:
logger.debug("Iframe extraction failed: %s", e)
return ""
async def _try_llm_cascade(
page: Page, target_url: str, network_log: list[dict],
) -> tuple[str, int]:
"""Phase C/D fallback: ask Qwen (then OVH) where the cookie policy is.
Returns (text, word_count). On failure or no LLM configured: ("", 0).
Caches the LLM's suggestion in Valkey per netloc (7d TTL) so subsequent
runs against the same domain skip the LLM call.
"""
from urllib.parse import urlparse
from services.cmp_llm_fallback import (
LLMCascade, cache_get, cache_set,
)
netloc = urlparse(target_url).netloc.lower()
if not netloc:
return "", 0
# Cache hit: apply hint directly
cached = await cache_get(netloc)
if cached:
text = await _apply_llm_hint(page, cached)
wc = len(text.split()) if text else 0
if wc >= 300:
logger.info("LLM cache hit for %s: %d words", netloc, wc)
return text, wc
# Cached hint stale — fall through to fresh LLM call
# DOM snapshot for the LLM prompt
try:
dom_snapshot = await page.evaluate(
"() => (document.body && document.body.innerText || '').slice(0, 5000)"
) or ""
except Exception:
dom_snapshot = ""
cascade = LLMCascade.from_env()
hint = await cascade.analyze(target_url, dom_snapshot, network_log)
if not hint:
return "", 0
text = await _apply_llm_hint(page, hint)
wc = len(text.split()) if text else 0
if wc >= 300:
await cache_set(netloc, hint)
logger.info("LLM cached for %s (%s): %d words", netloc, hint.get("_tier"), wc)
return text, wc
async def _apply_llm_hint(page: Page, hint: dict) -> str:
"""Execute the LLM's suggested strategy and return extracted text."""
strategy = hint.get("strategy")
value = hint.get("value", "")
if strategy == "text":
return value or ""
if strategy == "selector" and value:
try:
return await page.evaluate(
"(sel) => { const e = document.querySelector(sel); "
"return e ? (e.innerText || e.textContent || '').trim() : ''; }",
value,
) or ""
except Exception as e:
logger.debug("LLM selector failed (%s): %s", value, e)
return ""
if strategy == "url" and value:
try:
resp = await page.context.request.get(value, timeout=30000)
if resp.status != 200:
return ""
ct = (resp.headers.get("content-type") or "").lower()
if "json" in ct:
from services.cmp_heuristic import (
looks_like_cookie_policy, reconstruct_generic,
)
data = await resp.json()
if looks_like_cookie_policy(data):
return reconstruct_generic(data)
# Even if heuristic rejects, try generic walker
return reconstruct_generic(data)
text = await resp.text()
# Strip HTML if HTML response
if "html" in ct:
import re as _re
text = _re.sub(r"<[^>]+>", " ", text)
text = _re.sub(r"\s+", " ", text).strip()
return text
except Exception as e:
logger.debug("LLM url fetch failed (%s): %s", value[:80], e)
return ""
return ""