fix: derive intake flags from DETECTED SERVICES, not from text content

Fundamental architecture fix: data processing happens through APIs/scripts/
cookies — NOT through visible page text. A news site about healthcare does
NOT process health data.

Before: Qwen reads website text → guesses "health_data: true" (WRONG)
After: Google Analytics detected → tracking: true (CORRECT, deterministic)

New flow: detect services from HTML → map service categories to flags →
feed flags into UCCA assessment. No LLM needed for flag extraction.

SERVICE_TO_FLAGS maps categories: tracking→tracking, marketing→marketing+
third_party_sharing, payment→payment_data, heatmap→profiling, etc.
SPECIFIC_SERVICE_FLAGS for Klarna (Art.22), Stripe (US transfer), etc.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-02 08:37:51 +02:00
parent 0f3ec9061e
commit c5b22e0c99
2 changed files with 141 additions and 86 deletions
@@ -15,7 +15,7 @@ from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
from compliance.services.intake_extractor import extract_intake_flags, flags_to_ucca_intake
from compliance.services.intake_extractor import extract_intake_flags_from_services, flags_to_ucca_intake
from compliance.services.relevance_filter import filter_controls
from compliance.services.website_compliance_checks import (
check_website_compliance as _check_website_compliance,
@@ -85,10 +85,18 @@ async def analyze_url(req: AnalyzeRequest):
# Step 2: Classify via SDK LLM
classification = await _classify(client, text)
# Step 3: Extract intake flags via LLM (better than keyword matching)
intake_flags = await extract_intake_flags(text)
# Step 3: Detect services from HTML (deterministic, no LLM needed)
from compliance.services.service_registry import SERVICE_REGISTRY
detected_services = []
html_lower = raw_html.lower()
for pattern, meta in SERVICE_REGISTRY.items():
if re.search(pattern, html_lower):
detected_services.append(meta)
# Step 4: Assess via UCCA with LLM-extracted flags
# Step 4: Derive intake flags from DETECTED SERVICES (not from text!)
intake_flags = extract_intake_flags_from_services(detected_services)
# Step 5: Assess via UCCA with service-derived flags
assessment = await _assess(client, text, classification, intake_flags)
# Step 5: Determine role
@@ -1,99 +1,146 @@
"""
Intake Extractor — LLM-based extraction of UCCA intake flags from document text.
Intake Extractor — derives UCCA intake flags from DETECTED SERVICES,
not from website text content.
Replaces simple keyword matching with structured LLM analysis for more
accurate risk scoring.
The actual data processing happens through APIs, scripts, and cookies —
NOT through visible text on the page. A news website reporting about
healthcare does NOT process health data.
Flags are derived deterministically from:
1. Which third-party services are embedded (Google Analytics → tracking)
2. Which payment providers are used (Stripe → payment_data)
3. Which CDN/fonts are loaded (Google Fonts → cross_border_transfer)
"""
import json
import logging
import os
import re
import httpx
logger = logging.getLogger(__name__)
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen3.5:35b-a3b")
# Service category → intake flags mapping
# This is the ONLY source of truth for what a service implies
SERVICE_TO_FLAGS: dict[str, dict[str, bool]] = {
# Tracking & Analytics → personal_data + tracking
"tracking": {
"personal_data": True,
"tracking": True,
},
# Marketing → marketing + tracking + third_party_sharing
"marketing": {
"personal_data": True,
"tracking": True,
"marketing": True,
"third_party_sharing": True,
},
# Heatmap/Session Recording → tracking + profiling
"heatmap": {
"personal_data": True,
"tracking": True,
"profiling": True,
},
# Payment → payment_data
"payment": {
"personal_data": True,
"payment_data": True,
},
# Chatbot → personal_data (user sends messages)
"chatbot": {
"personal_data": True,
"customer_data": True,
},
# CRM → customer_data + profiling
"crm": {
"personal_data": True,
"customer_data": True,
"profiling": True,
},
# CDN from non-EU → cross_border_transfer (IP sent to US)
"cdn": {
"personal_data": True,
},
}
EXTRACTION_PROMPT = """/no_think
Du analysierst eine Datenschutzerklaerung oder Website. Bestimme ob der
BETREIBER DIESER WEBSITE die folgenden Daten AKTIV VERARBEITET.
WICHTIG: Setze ein Flag NUR auf true wenn der Websitebetreiber diese Daten
SELBST erhebt, speichert oder verarbeitet. NICHT wenn die Website nur
UEBER solche Themen BERICHTET oder informiert.
Beispiel: Eine IHK-Website die UEBER Datenschutz im Gesundheitswesen
berichtet → health_data: false (die IHK verarbeitet keine Gesundheitsdaten)
Flags:
- personal_data: Erhebt der Betreiber personenbezogene Daten (Name, Email, IP)?
- customer_data: Speichert der Betreiber Kundendaten (Registrierung, Konto)?
- payment_data: Verarbeitet der Betreiber Zahlungsdaten (Shop, Buchung)?
- location_data: Erhebt der Betreiber GPS/Standortdaten der Nutzer?
- biometric_data: Verarbeitet der Betreiber biometrische Daten?
- minor_data: Richtet sich die Website gezielt an Kinder/Minderjaehrige?
- health_data: Verarbeitet der Betreiber Gesundheitsdaten seiner Nutzer?
- marketing: Nutzt der Betreiber Nutzerdaten fuer eigene Werbung/Newsletter?
- profiling: Erstellt der Betreiber Nutzerprofile oder Scoring?
- automated_decisions: Trifft der Betreiber automatisierte Einzelentscheidungen?
- third_party_sharing: Gibt der Betreiber Nutzerdaten an Dritte weiter?
- cross_border_transfer: Uebermittelt der Betreiber Daten ausserhalb EU/EWR?
- tracking: Setzt der Betreiber Cookies/Tracking/Analytics ein?
- ai_usage: Setzt der Betreiber KI/Machine Learning ein?
Antworte NUR mit einem JSON-Objekt, keine Erklaerung:
{"personal_data": true, "customer_data": false, ...}
"""
# Specific services with special flags
SPECIFIC_SERVICE_FLAGS: dict[str, dict[str, bool]] = {
"klarna": {"automated_decisions": True, "payment_data": True},
"paypal": {"cross_border_transfer": True, "payment_data": True},
"stripe": {"cross_border_transfer": True, "payment_data": True},
"google_analytics": {"cross_border_transfer": True, "tracking": True},
"facebook_pixel": {"cross_border_transfer": True, "marketing": True, "profiling": True},
"hotjar": {"profiling": True, "tracking": True},
"ms_clarity": {"cross_border_transfer": True, "profiling": True},
"tiktok_pixel": {"cross_border_transfer": True, "marketing": True},
"intercom": {"cross_border_transfer": True, "ai_usage": True},
}
async def extract_intake_flags(text: str) -> dict:
"""Extract structured intake flags from text via LLM."""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
"model": OLLAMA_MODEL,
"prompt": f"{EXTRACTION_PROMPT}\n\nTEXT:\n{text[:2500]}",
"stream": False,
})
raw = resp.json().get("response", "")
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
def extract_intake_flags_from_services(detected_services: list[dict]) -> dict:
"""Derive intake flags from detected third-party services.
# Extract JSON from response
match = re.search(r"\{[^}]+\}", raw, re.DOTALL)
if match:
flags = json.loads(match.group())
logger.info("Extracted intake flags: %s", {k: v for k, v in flags.items() if v})
return flags
except Exception as e:
logger.warning("Intake extraction failed, using keyword fallback: %s", e)
# Fallback: keyword-based extraction
return _keyword_fallback(text)
def _keyword_fallback(text: str) -> dict:
"""Simple keyword-based fallback when LLM is unavailable."""
t = text.lower()
return {
"personal_data": True, # Always assume for websites
"customer_data": any(w in t for w in ["kunde", "customer", "nutzerkonto", "registrier"]),
"payment_data": any(w in t for w in ["zahlung", "kreditkarte", "paypal", "stripe", "klarna", "iban"]),
"location_data": any(w in t for w in ["standort", "gps", "location", "geo"]),
"biometric_data": any(w in t for w in ["biometrisch", "fingerabdruck", "gesichtserkennung"]),
"minor_data": any(w in t for w in ["kinder", "minderjährig", "under 16", "unter 16"]),
"health_data": any(w in t for w in ["gesundheit", "medizin", "patient", "health"]),
"marketing": any(w in t for w in ["werbung", "marketing", "newsletter", "werbe"]),
"profiling": any(w in t for w in ["profil", "personalis", "scoring", "empfehl"]),
"automated_decisions": any(w in t for w in ["automatisiert", "automated decision", "scoring"]),
"third_party_sharing": any(w in t for w in ["dritte", "partner", "dienstleister", "third part"]),
"cross_border_transfer": any(w in t for w in ["usa", "drittland", "drittst", "third countr"]),
"tracking": any(w in t for w in ["cookie", "tracking", "analytics", "pixel"]),
"ai_usage": any(w in t for w in ["künstliche intelligenz", "machine learning", "ki-", "ai-powered"]),
This is deterministic and 100% accurate — if Google Analytics is
embedded, tracking IS happening. No guessing needed.
"""
flags = {
"personal_data": False,
"customer_data": False,
"payment_data": False,
"location_data": False,
"biometric_data": False,
"minor_data": False,
"health_data": False,
"marketing": False,
"profiling": False,
"automated_decisions": False,
"third_party_sharing": False,
"cross_border_transfer": False,
"tracking": False,
"ai_usage": False,
}
for svc in detected_services:
category = svc.get("category", "other")
service_id = svc.get("id", "")
eu_adequate = svc.get("eu_adequate", True)
# Apply category-level flags
cat_flags = SERVICE_TO_FLAGS.get(category, {})
for key, value in cat_flags.items():
if value:
flags[key] = True
# Apply service-specific flags
svc_flags = SPECIFIC_SERVICE_FLAGS.get(service_id, {})
for key, value in svc_flags.items():
if value:
flags[key] = True
# Non-EU service → cross_border_transfer
if not eu_adequate:
flags["cross_border_transfer"] = True
flags["third_party_sharing"] = True
# Any website with detected services processes personal data (IP at minimum)
if detected_services:
flags["personal_data"] = True
active = {k: v for k, v in flags.items() if v}
logger.info("Intake flags from %d services: %s", len(detected_services), active)
return flags
# Keep backward compatibility
async def extract_intake_flags(text: str) -> dict:
"""DEPRECATED — use extract_intake_flags_from_services() instead.
This function used LLM to guess flags from text content.
Text content does NOT represent actual data processing.
"""
logger.warning(
"extract_intake_flags(text) called — DEPRECATED. "
"Use extract_intake_flags_from_services(detected_services) instead."
)
# Return minimal flags — website exists = personal_data (IP)
return {"personal_data": True, "tracking": False}
def flags_to_ucca_intake(flags: dict) -> dict:
"""Convert extracted flags to UCCA intake format."""