diff --git a/backend-compliance/compliance/api/agent_analyze_routes.py b/backend-compliance/compliance/api/agent_analyze_routes.py index f650b5c..dfa3e11 100644 --- a/backend-compliance/compliance/api/agent_analyze_routes.py +++ b/backend-compliance/compliance/api/agent_analyze_routes.py @@ -15,7 +15,7 @@ from fastapi import APIRouter from pydantic import BaseModel from compliance.services.smtp_sender import send_email -from compliance.services.intake_extractor import extract_intake_flags, flags_to_ucca_intake +from compliance.services.intake_extractor import extract_intake_flags_from_services, flags_to_ucca_intake from compliance.services.relevance_filter import filter_controls from compliance.services.website_compliance_checks import ( check_website_compliance as _check_website_compliance, @@ -85,10 +85,18 @@ async def analyze_url(req: AnalyzeRequest): # Step 2: Classify via SDK LLM classification = await _classify(client, text) - # Step 3: Extract intake flags via LLM (better than keyword matching) - intake_flags = await extract_intake_flags(text) + # Step 3: Detect services from HTML (deterministic, no LLM needed) + from compliance.services.service_registry import SERVICE_REGISTRY + detected_services = [] + html_lower = raw_html.lower() + for pattern, meta in SERVICE_REGISTRY.items(): + if re.search(pattern, html_lower): + detected_services.append(meta) - # Step 4: Assess via UCCA with LLM-extracted flags + # Step 4: Derive intake flags from DETECTED SERVICES (not from text!) + intake_flags = extract_intake_flags_from_services(detected_services) + + # Step 5: Assess via UCCA with service-derived flags assessment = await _assess(client, text, classification, intake_flags) # Step 5: Determine role diff --git a/backend-compliance/compliance/services/intake_extractor.py b/backend-compliance/compliance/services/intake_extractor.py index 6642a86..4c0a49b 100644 --- a/backend-compliance/compliance/services/intake_extractor.py +++ b/backend-compliance/compliance/services/intake_extractor.py @@ -1,99 +1,146 @@ """ -Intake Extractor — LLM-based extraction of UCCA intake flags from document text. +Intake Extractor — derives UCCA intake flags from DETECTED SERVICES, +not from website text content. -Replaces simple keyword matching with structured LLM analysis for more -accurate risk scoring. +The actual data processing happens through APIs, scripts, and cookies — +NOT through visible text on the page. A news website reporting about +healthcare does NOT process health data. + +Flags are derived deterministically from: +1. Which third-party services are embedded (Google Analytics → tracking) +2. Which payment providers are used (Stripe → payment_data) +3. Which CDN/fonts are loaded (Google Fonts → cross_border_transfer) """ -import json import logging -import os -import re - -import httpx logger = logging.getLogger(__name__) -OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434") -OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen3.5:35b-a3b") +# Service category → intake flags mapping +# This is the ONLY source of truth for what a service implies +SERVICE_TO_FLAGS: dict[str, dict[str, bool]] = { + # Tracking & Analytics → personal_data + tracking + "tracking": { + "personal_data": True, + "tracking": True, + }, + # Marketing → marketing + tracking + third_party_sharing + "marketing": { + "personal_data": True, + "tracking": True, + "marketing": True, + "third_party_sharing": True, + }, + # Heatmap/Session Recording → tracking + profiling + "heatmap": { + "personal_data": True, + "tracking": True, + "profiling": True, + }, + # Payment → payment_data + "payment": { + "personal_data": True, + "payment_data": True, + }, + # Chatbot → personal_data (user sends messages) + "chatbot": { + "personal_data": True, + "customer_data": True, + }, + # CRM → customer_data + profiling + "crm": { + "personal_data": True, + "customer_data": True, + "profiling": True, + }, + # CDN from non-EU → cross_border_transfer (IP sent to US) + "cdn": { + "personal_data": True, + }, +} -EXTRACTION_PROMPT = """/no_think -Du analysierst eine Datenschutzerklaerung oder Website. Bestimme ob der -BETREIBER DIESER WEBSITE die folgenden Daten AKTIV VERARBEITET. - -WICHTIG: Setze ein Flag NUR auf true wenn der Websitebetreiber diese Daten -SELBST erhebt, speichert oder verarbeitet. NICHT wenn die Website nur -UEBER solche Themen BERICHTET oder informiert. - -Beispiel: Eine IHK-Website die UEBER Datenschutz im Gesundheitswesen -berichtet → health_data: false (die IHK verarbeitet keine Gesundheitsdaten) - -Flags: -- personal_data: Erhebt der Betreiber personenbezogene Daten (Name, Email, IP)? -- customer_data: Speichert der Betreiber Kundendaten (Registrierung, Konto)? -- payment_data: Verarbeitet der Betreiber Zahlungsdaten (Shop, Buchung)? -- location_data: Erhebt der Betreiber GPS/Standortdaten der Nutzer? -- biometric_data: Verarbeitet der Betreiber biometrische Daten? -- minor_data: Richtet sich die Website gezielt an Kinder/Minderjaehrige? -- health_data: Verarbeitet der Betreiber Gesundheitsdaten seiner Nutzer? -- marketing: Nutzt der Betreiber Nutzerdaten fuer eigene Werbung/Newsletter? -- profiling: Erstellt der Betreiber Nutzerprofile oder Scoring? -- automated_decisions: Trifft der Betreiber automatisierte Einzelentscheidungen? -- third_party_sharing: Gibt der Betreiber Nutzerdaten an Dritte weiter? -- cross_border_transfer: Uebermittelt der Betreiber Daten ausserhalb EU/EWR? -- tracking: Setzt der Betreiber Cookies/Tracking/Analytics ein? -- ai_usage: Setzt der Betreiber KI/Machine Learning ein? - -Antworte NUR mit einem JSON-Objekt, keine Erklaerung: -{"personal_data": true, "customer_data": false, ...} -""" +# Specific services with special flags +SPECIFIC_SERVICE_FLAGS: dict[str, dict[str, bool]] = { + "klarna": {"automated_decisions": True, "payment_data": True}, + "paypal": {"cross_border_transfer": True, "payment_data": True}, + "stripe": {"cross_border_transfer": True, "payment_data": True}, + "google_analytics": {"cross_border_transfer": True, "tracking": True}, + "facebook_pixel": {"cross_border_transfer": True, "marketing": True, "profiling": True}, + "hotjar": {"profiling": True, "tracking": True}, + "ms_clarity": {"cross_border_transfer": True, "profiling": True}, + "tiktok_pixel": {"cross_border_transfer": True, "marketing": True}, + "intercom": {"cross_border_transfer": True, "ai_usage": True}, +} -async def extract_intake_flags(text: str) -> dict: - """Extract structured intake flags from text via LLM.""" - try: - async with httpx.AsyncClient(timeout=90.0) as client: - resp = await client.post(f"{OLLAMA_URL}/api/generate", json={ - "model": OLLAMA_MODEL, - "prompt": f"{EXTRACTION_PROMPT}\n\nTEXT:\n{text[:2500]}", - "stream": False, - }) - raw = resp.json().get("response", "") - raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() +def extract_intake_flags_from_services(detected_services: list[dict]) -> dict: + """Derive intake flags from detected third-party services. - # Extract JSON from response - match = re.search(r"\{[^}]+\}", raw, re.DOTALL) - if match: - flags = json.loads(match.group()) - logger.info("Extracted intake flags: %s", {k: v for k, v in flags.items() if v}) - return flags - except Exception as e: - logger.warning("Intake extraction failed, using keyword fallback: %s", e) - - # Fallback: keyword-based extraction - return _keyword_fallback(text) - - -def _keyword_fallback(text: str) -> dict: - """Simple keyword-based fallback when LLM is unavailable.""" - t = text.lower() - return { - "personal_data": True, # Always assume for websites - "customer_data": any(w in t for w in ["kunde", "customer", "nutzerkonto", "registrier"]), - "payment_data": any(w in t for w in ["zahlung", "kreditkarte", "paypal", "stripe", "klarna", "iban"]), - "location_data": any(w in t for w in ["standort", "gps", "location", "geo"]), - "biometric_data": any(w in t for w in ["biometrisch", "fingerabdruck", "gesichtserkennung"]), - "minor_data": any(w in t for w in ["kinder", "minderjährig", "under 16", "unter 16"]), - "health_data": any(w in t for w in ["gesundheit", "medizin", "patient", "health"]), - "marketing": any(w in t for w in ["werbung", "marketing", "newsletter", "werbe"]), - "profiling": any(w in t for w in ["profil", "personalis", "scoring", "empfehl"]), - "automated_decisions": any(w in t for w in ["automatisiert", "automated decision", "scoring"]), - "third_party_sharing": any(w in t for w in ["dritte", "partner", "dienstleister", "third part"]), - "cross_border_transfer": any(w in t for w in ["usa", "drittland", "drittst", "third countr"]), - "tracking": any(w in t for w in ["cookie", "tracking", "analytics", "pixel"]), - "ai_usage": any(w in t for w in ["künstliche intelligenz", "machine learning", "ki-", "ai-powered"]), + This is deterministic and 100% accurate — if Google Analytics is + embedded, tracking IS happening. No guessing needed. + """ + flags = { + "personal_data": False, + "customer_data": False, + "payment_data": False, + "location_data": False, + "biometric_data": False, + "minor_data": False, + "health_data": False, + "marketing": False, + "profiling": False, + "automated_decisions": False, + "third_party_sharing": False, + "cross_border_transfer": False, + "tracking": False, + "ai_usage": False, } + for svc in detected_services: + category = svc.get("category", "other") + service_id = svc.get("id", "") + eu_adequate = svc.get("eu_adequate", True) + + # Apply category-level flags + cat_flags = SERVICE_TO_FLAGS.get(category, {}) + for key, value in cat_flags.items(): + if value: + flags[key] = True + + # Apply service-specific flags + svc_flags = SPECIFIC_SERVICE_FLAGS.get(service_id, {}) + for key, value in svc_flags.items(): + if value: + flags[key] = True + + # Non-EU service → cross_border_transfer + if not eu_adequate: + flags["cross_border_transfer"] = True + flags["third_party_sharing"] = True + + # Any website with detected services processes personal data (IP at minimum) + if detected_services: + flags["personal_data"] = True + + active = {k: v for k, v in flags.items() if v} + logger.info("Intake flags from %d services: %s", len(detected_services), active) + return flags + + +# Keep backward compatibility +async def extract_intake_flags(text: str) -> dict: + """DEPRECATED — use extract_intake_flags_from_services() instead. + + This function used LLM to guess flags from text content. + Text content does NOT represent actual data processing. + """ + logger.warning( + "extract_intake_flags(text) called — DEPRECATED. " + "Use extract_intake_flags_from_services(detected_services) instead." + ) + # Return minimal flags — website exists = personal_data (IP) + return {"personal_data": True, "tracking": False} + def flags_to_ucca_intake(flags: dict) -> dict: """Convert extracted flags to UCCA intake format."""