feat(vvt): V3 — LLM vendor extraction fallback for unknown CMPs
When the cookie text has no captured CMP payload (long-tail sites that
don't use ePaaS/OneTrust/Cookiebot/etc.) we now fall back to a Qwen → OVH
LLM cascade to extract a structured vendor list from the policy text.
New module backend/compliance/services/vendor_llm_extractor.py:
- extract_vendors_via_llm(cookie_text): runs Qwen first (local Ollama),
then OVH if Qwen returns nothing usable.
- System prompt instructs the model to return STRICT JSON only:
{vendors: [{name, country, purpose, category, opt_out_url,
privacy_policy_url, persistence, cookies: [...]}]}
- Lenient JSON parser tolerates code-fences, prose wrappers, dict vs list.
- _normalize() caps array sizes (80 vendors, 30 cookies each), validates
URLs (must be http(s)), trims fields to reasonable lengths.
Route integration (agent_compliance_check_routes.py):
- After named-CMP extract: if cmp_vendors is empty AND the cookie text
has ≥500 words (otherwise it's likely navigation chrome), invoke the
LLM extractor. Progress message 'Vendor-Liste per LLM extrahieren...'.
- Vendors then run through the same validate_vendor_urls + score_vendors
pipeline → VVT table rendered identically regardless of source.
docker-compose.yml: backend-compliance gains OLLAMA_URL, CMP_LLM_MODEL,
OVH_LLM_URL/KEY/MODEL env vars (same names as consent-tester so the
configuration is unified).
This closes the 'every site eventually gets a VVT table' goal:
- Known CMP → V1/V2 structured extraction (fast, exact)
- Unknown CMP → V3 LLM extraction (slow, best-effort)
- No text at all → no vendors, but other compliance checks still run.
This commit is contained in:
@@ -383,16 +383,29 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
validate_vendor_urls, score_vendors,
|
||||
)
|
||||
cookie_payloads = []
|
||||
cookie_text = ""
|
||||
for e in doc_entries:
|
||||
if e.get("doc_type") == "cookie" and e.get("cmp_payloads"):
|
||||
cookie_payloads.extend(e["cmp_payloads"])
|
||||
if e.get("doc_type") == "cookie":
|
||||
if e.get("cmp_payloads"):
|
||||
cookie_payloads.extend(e["cmp_payloads"])
|
||||
if e.get("text"):
|
||||
cookie_text = e["text"]
|
||||
if cookie_payloads:
|
||||
cmp_vendors = extract_vendors_from_payloads(cookie_payloads)
|
||||
if cmp_vendors:
|
||||
logger.info("VVT: %d vendors extracted, validating links",
|
||||
len(cmp_vendors))
|
||||
cmp_vendors = await validate_vendor_urls(cmp_vendors)
|
||||
cmp_vendors = score_vendors(cmp_vendors)
|
||||
# V3 fallback: no named CMP captured but we have substantive
|
||||
# cookie text → ask Qwen/OVH to extract vendor list from the text.
|
||||
# Skip on very short text (likely navigation) to save LLM cost.
|
||||
if not cmp_vendors and cookie_text and len(cookie_text.split()) >= 500:
|
||||
from compliance.services.vendor_llm_extractor import (
|
||||
extract_vendors_via_llm,
|
||||
)
|
||||
_update(check_id, "Vendor-Liste per LLM extrahieren...", 94)
|
||||
cmp_vendors = await extract_vendors_via_llm(cookie_text)
|
||||
if cmp_vendors:
|
||||
logger.info("VVT: %d vendors extracted, validating links",
|
||||
len(cmp_vendors))
|
||||
cmp_vendors = await validate_vendor_urls(cmp_vendors)
|
||||
cmp_vendors = score_vendors(cmp_vendors)
|
||||
except Exception as e:
|
||||
logger.warning("VVT vendor extraction skipped: %s", e)
|
||||
|
||||
|
||||
@@ -0,0 +1,209 @@
|
||||
"""
|
||||
LLM-based vendor extraction (V3 fallback).
|
||||
|
||||
When the cookie-policy text does not come from a known CMP (so we have no
|
||||
structured JSON payload) we ask Qwen (local Ollama) → OVH (managed 120B)
|
||||
to extract a vendor list as JSON. Output is then mapped to the same
|
||||
VendorRecord schema used by vendor_extractor.py — so the rest of the
|
||||
pipeline (URL probing, scoring, VVT table) works unchanged.
|
||||
|
||||
This bridges the long tail of cookie-policy implementations where the
|
||||
content sits in DOM accordions rather than a CMP JSON endpoint.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_SYSTEM_PROMPT = (
|
||||
"Du bist ein Compliance-Tester. Extrahiere aus einer deutschen "
|
||||
"Cookie-Richtlinie alle erwaehnten Drittanbieter (Dienste, Vendors, "
|
||||
"Cookie-Provider).\n\n"
|
||||
"Gib NUR ein JSON-Objekt zurueck:\n"
|
||||
'{"vendors": [\n'
|
||||
' {"name": "<Firmenname>", "country": "<DE|US|IE|...|>", '
|
||||
'"purpose": "<Kurz>", "category": "<marketing|analytics|functional|necessary>", '
|
||||
'"opt_out_url": "<vollstaendige URL oder \\"\\">", '
|
||||
'"privacy_policy_url": "<vollstaendige URL oder \\"\\">", '
|
||||
'"persistence": "<Speicherdauer in Worten>", '
|
||||
'"cookies": [{"name": "<Cookie-Name>", "purpose": "<Kurz>", '
|
||||
'"expiry": "<Dauer>", "is_third_party": true}]\n'
|
||||
' }\n'
|
||||
"]}\n\n"
|
||||
"Regeln:\n"
|
||||
"- Wenn ein Feld nicht im Text steht: leerer String oder leere Liste.\n"
|
||||
"- KEINE Anbieter erfinden oder halluzinieren.\n"
|
||||
"- Max 80 Anbieter, max 30 Cookies pro Anbieter.\n"
|
||||
"- Nur reines JSON, keine Prosa, keine Code-Fences."
|
||||
)
|
||||
|
||||
|
||||
async def extract_vendors_via_llm(
|
||||
cookie_text: str,
|
||||
max_text_chars: int = 12000,
|
||||
) -> list[dict]:
|
||||
"""Run the Qwen → OVH cascade. Returns vendor records (possibly empty)."""
|
||||
if not cookie_text or len(cookie_text) < 500:
|
||||
return []
|
||||
excerpt = cookie_text[:max_text_chars]
|
||||
user_prompt = f"Cookie-Richtlinie-Text (gekuerzt):\n\n{excerpt}"
|
||||
|
||||
# Stage 1: local Qwen
|
||||
content = await _call_ollama(user_prompt)
|
||||
vendors = _parse_vendor_list(content)
|
||||
if vendors:
|
||||
logger.info("LLM vendor extraction (Qwen): %d vendors", len(vendors))
|
||||
return vendors
|
||||
|
||||
# Stage 2: OVH backup
|
||||
content = await _call_ovh(user_prompt)
|
||||
vendors = _parse_vendor_list(content)
|
||||
if vendors:
|
||||
logger.info("LLM vendor extraction (OVH): %d vendors", len(vendors))
|
||||
return vendors
|
||||
|
||||
|
||||
async def _call_ollama(user_prompt: str) -> str:
|
||||
base = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
||||
model = os.getenv("CMP_LLM_MODEL", os.getenv("OLLAMA_MODEL", "qwen3:30b-a3b"))
|
||||
payload = {
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": _SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_prompt},
|
||||
],
|
||||
"stream": False, "format": "json",
|
||||
"options": {"temperature": 0.05, "num_predict": 6000},
|
||||
}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
resp = await client.post(f"{base.rstrip('/')}/api/chat", json=payload)
|
||||
resp.raise_for_status()
|
||||
return (resp.json().get("message") or {}).get("content", "")
|
||||
except Exception as e:
|
||||
logger.warning("Qwen vendor-extract failed: %s", e)
|
||||
return ""
|
||||
|
||||
|
||||
async def _call_ovh(user_prompt: str) -> str:
|
||||
base = os.getenv("OVH_LLM_URL", "").strip()
|
||||
key = os.getenv("OVH_LLM_KEY", "").strip()
|
||||
model = os.getenv("OVH_LLM_MODEL", "").strip()
|
||||
if not base or not model:
|
||||
return ""
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if key:
|
||||
headers["Authorization"] = f"Bearer {key}"
|
||||
payload = {
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": _SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_prompt},
|
||||
],
|
||||
"temperature": 0.05, "max_tokens": 6000,
|
||||
"response_format": {"type": "json_object"},
|
||||
}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=90.0) as client:
|
||||
resp = await client.post(
|
||||
f"{base.rstrip('/')}/v1/chat/completions",
|
||||
json=payload, headers=headers,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
choice = (resp.json().get("choices") or [{}])[0]
|
||||
return (choice.get("message") or {}).get("content", "") or ""
|
||||
except Exception as e:
|
||||
logger.warning("OVH vendor-extract failed: %s", e)
|
||||
return ""
|
||||
|
||||
|
||||
def _parse_vendor_list(content: str) -> list[dict]:
|
||||
"""Be lenient about JSON wrappers / code-fences."""
|
||||
if not content:
|
||||
return []
|
||||
for candidate in (content, _strip_fence(content), _grab_json(content)):
|
||||
if not candidate:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(candidate)
|
||||
except Exception:
|
||||
continue
|
||||
if isinstance(obj, dict):
|
||||
vendors = obj.get("vendors") or obj.get("Vendors")
|
||||
if isinstance(vendors, list):
|
||||
return _normalize(vendors)
|
||||
if isinstance(obj, list):
|
||||
return _normalize(obj)
|
||||
return []
|
||||
|
||||
|
||||
def _normalize(items: list) -> list[dict]:
|
||||
out: list[dict] = []
|
||||
for item in items[:80]:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
name = (item.get("name") or "").strip()
|
||||
if not name:
|
||||
continue
|
||||
cookies_raw = item.get("cookies") or []
|
||||
cookies: list[dict] = []
|
||||
for c in cookies_raw[:30]:
|
||||
if not isinstance(c, dict):
|
||||
continue
|
||||
cookies.append({
|
||||
"name": (c.get("name") or "").strip(),
|
||||
"purpose": (c.get("purpose") or "").strip(),
|
||||
"expiry": (c.get("expiry") or "").strip(),
|
||||
"is_third_party": bool(c.get("is_third_party", True)),
|
||||
})
|
||||
out.append({
|
||||
"name": name,
|
||||
"country": (item.get("country") or "").strip()[:4],
|
||||
"purpose": (item.get("purpose") or "").strip()[:500],
|
||||
"category": (item.get("category") or "").strip(),
|
||||
"opt_out_url": _safe_url(item.get("opt_out_url")),
|
||||
"privacy_policy_url": _safe_url(item.get("privacy_policy_url")),
|
||||
"persistence": (item.get("persistence") or "").strip()[:200],
|
||||
"cookies": cookies,
|
||||
})
|
||||
return out
|
||||
|
||||
|
||||
def _safe_url(value: Optional[str]) -> str:
|
||||
if not value or not isinstance(value, str):
|
||||
return ""
|
||||
v = value.strip()
|
||||
if v.startswith(("http://", "https://")):
|
||||
return v[:500]
|
||||
return ""
|
||||
|
||||
|
||||
def _strip_fence(s: str) -> str:
|
||||
s = s.strip()
|
||||
if s.startswith("```"):
|
||||
lines = s.split("\n")
|
||||
return "\n".join(lines[1:-1]) if lines[-1].strip().startswith("```") else "\n".join(lines[1:])
|
||||
return s
|
||||
|
||||
|
||||
def _grab_json(s: str) -> str:
|
||||
a, b = s.find("{"), s.rfind("}")
|
||||
if 0 <= a < b:
|
||||
return s[a:b + 1]
|
||||
a, b = s.find("["), s.rfind("]")
|
||||
if 0 <= a < b:
|
||||
return s[a:b + 1]
|
||||
return ""
|
||||
|
||||
|
||||
# Defensive import to make optional dependency obvious
|
||||
_ = re # pragma: no cover
|
||||
@@ -116,6 +116,14 @@ services:
|
||||
SMTP_FROM_NAME: ${SMTP_FROM_NAME:-BreakPilot Compliance}
|
||||
SMTP_FROM_ADDR: ${SMTP_FROM_ADDR:-compliance@breakpilot.app}
|
||||
RAG_SERVICE_URL: http://bp-core-rag-service:8097
|
||||
# LLM cascade for V3 vendor extraction (unknown CMPs).
|
||||
# Reuses the same env vars as the consent-tester so both can be
|
||||
# configured in one place.
|
||||
OLLAMA_URL: ${OLLAMA_URL:-http://host.docker.internal:11434}
|
||||
CMP_LLM_MODEL: ${CMP_LLM_MODEL:-qwen3:30b-a3b}
|
||||
OVH_LLM_URL: ${OVH_LLM_URL:-}
|
||||
OVH_LLM_KEY: ${OVH_LLM_KEY:-}
|
||||
OVH_LLM_MODEL: ${OVH_LLM_MODEL:-}
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
depends_on:
|
||||
|
||||
Reference in New Issue
Block a user