- website_scanner.py: multi-page crawl, 20+ service patterns (tracking, CDN, chatbots, payment, fonts, captcha, video), AI text detection - dse_service_extractor.py: LLM extracts services from privacy policy text - agent_scan_routes.py: POST /agent/scan — combines scan + DSE comparison, generates findings (undocumented, outdated, third-country transfer), auto-corrections via Qwen in pre-launch mode Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
128 lines
4.5 KiB
Python
128 lines
4.5 KiB
Python
"""
|
|
DSE Service Extractor — extracts mentioned third-party services from
|
|
a privacy policy text using LLM (Qwen) and compares against detected services.
|
|
|
|
Produces SOLL/IST comparison: what's in the DSE vs. what's on the website.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SDK_URL = os.environ.get("AI_SDK_URL", "http://bp-compliance-ai-sdk:8090")
|
|
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
|
USER_ID = "00000000-0000-0000-0000-000000000001"
|
|
|
|
SDK_HEADERS = {
|
|
"Content-Type": "application/json",
|
|
"X-Tenant-ID": TENANT_ID,
|
|
"X-User-ID": USER_ID,
|
|
}
|
|
|
|
|
|
async def extract_dse_services(dse_text: str) -> list[dict]:
|
|
"""Extract mentioned services from privacy policy text via LLM."""
|
|
prompt = (
|
|
"/no_think\n"
|
|
"Extrahiere aus dieser Datenschutzerklaerung ALLE erwaehnten Dienstleister, "
|
|
"Tools und externen Dienste. Fuer jeden nenne:\n"
|
|
"- name: Name des Dienstes (z.B. 'Google Analytics')\n"
|
|
"- purpose: Zweck (z.B. 'Webanalyse')\n"
|
|
"- country: Land/Sitz (z.B. 'USA')\n"
|
|
"- legal_basis: Genannte Rechtsgrundlage (z.B. 'Einwilligung')\n\n"
|
|
"Antworte als JSON-Array. Wenn keine Dienstleister erwaehnt werden, "
|
|
"antworte mit [].\n"
|
|
"Beispiel: [{\"name\": \"Google Analytics\", \"purpose\": \"Webanalyse\", "
|
|
"\"country\": \"USA\", \"legal_basis\": \"Einwilligung\"}]"
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={
|
|
"messages": [
|
|
{"role": "system", "content": prompt},
|
|
{"role": "user", "content": dse_text[:3500]},
|
|
],
|
|
})
|
|
data = resp.json()
|
|
raw = (
|
|
data.get("response", "")
|
|
or (data.get("message", {}) or {}).get("content", "")
|
|
or ""
|
|
).strip()
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|
# Extract JSON array from response
|
|
match = re.search(r"\[.*\]", raw, re.DOTALL)
|
|
if match:
|
|
import json
|
|
return json.loads(match.group())
|
|
except Exception as e:
|
|
logger.warning("DSE service extraction failed: %s", e)
|
|
return []
|
|
|
|
|
|
def compare_services(
|
|
detected: list[dict], dse_services: list[dict],
|
|
) -> dict:
|
|
"""Compare detected website services against DSE-mentioned services.
|
|
|
|
Returns dict with three categories:
|
|
- undocumented: on website but NOT in DSE (Art. 13 violation)
|
|
- outdated: in DSE but NOT on website (cleanup)
|
|
- documented: on website AND in DSE (OK, check details)
|
|
"""
|
|
# Normalize names for matching
|
|
def normalize(name: str) -> str:
|
|
return re.sub(r"[^a-z0-9]", "", name.lower())
|
|
|
|
detected_names = {normalize(d["name"]): d for d in detected}
|
|
dse_names = {normalize(d["name"]): d for d in dse_services}
|
|
|
|
undocumented = []
|
|
documented = []
|
|
outdated = []
|
|
|
|
for key, svc in detected_names.items():
|
|
# Skip CMP — consent managers don't need DSE mention
|
|
if svc.get("category") == "other" and svc.get("id") == "cmp":
|
|
continue
|
|
matched = False
|
|
for dse_key, dse_svc in dse_names.items():
|
|
if key == dse_key or _fuzzy_match(svc["name"], dse_svc["name"]):
|
|
documented.append({"detected": svc, "dse": dse_svc, "status": "ok"})
|
|
matched = True
|
|
break
|
|
if not matched:
|
|
undocumented.append(svc)
|
|
|
|
for key, dse_svc in dse_names.items():
|
|
matched = False
|
|
for det_key in detected_names:
|
|
if key == det_key or _fuzzy_match(dse_svc["name"], detected_names[det_key]["name"]):
|
|
matched = True
|
|
break
|
|
if not matched:
|
|
outdated.append(dse_svc)
|
|
|
|
return {
|
|
"undocumented": undocumented,
|
|
"documented": documented,
|
|
"outdated": outdated,
|
|
}
|
|
|
|
|
|
def _fuzzy_match(a: str, b: str) -> bool:
|
|
"""Simple fuzzy matching — checks if one name contains the core of the other."""
|
|
a_lower = a.lower()
|
|
b_lower = b.lower()
|
|
# Direct substring
|
|
if a_lower in b_lower or b_lower in a_lower:
|
|
return True
|
|
# Core word match (e.g., "Google" in "Google Analytics" and "Google Ireland")
|
|
a_words = set(re.findall(r"\w{4,}", a_lower))
|
|
b_words = set(re.findall(r"\w{4,}", b_lower))
|
|
return bool(a_words & b_words)
|