4298ae17ab
Phase 0: Qwen extracts 14 structured intake flags (personal_data, marketing, profiling, ai_usage, etc.) instead of keyword matching. Fallback to keywords if LLM unavailable. Flags feed into UCCA for accurate scoring. Phase 1: Control relevance filter removes false positives. C_TRANSPARENCY only recommended if AI/ML keywords found in text. 7 control rules with keyword lists + intake flag fallback. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
126 lines
5.7 KiB
Python
126 lines
5.7 KiB
Python
"""
|
|
Intake Extractor — LLM-based extraction of UCCA intake flags from document text.
|
|
|
|
Replaces simple keyword matching with structured LLM analysis for more
|
|
accurate risk scoring.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434")
|
|
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
|
|
|
EXTRACTION_PROMPT = """/no_think
|
|
Analysiere den folgenden Text (Datenschutzerklaerung oder Website-Inhalt) und
|
|
bestimme fuer JEDES der folgenden Flags ob es zutrifft (true/false).
|
|
|
|
Flags:
|
|
- personal_data: Werden personenbezogene Daten verarbeitet?
|
|
- customer_data: Werden Kundendaten (Name, Email, Adresse) gespeichert?
|
|
- payment_data: Werden Zahlungsdaten (Kreditkarte, IBAN, PayPal) verarbeitet?
|
|
- location_data: Werden Standort-/GPS-Daten erhoben?
|
|
- biometric_data: Werden biometrische Daten verarbeitet?
|
|
- minor_data: Werden Daten von Kindern/Minderjaehrigen verarbeitet?
|
|
- health_data: Werden Gesundheitsdaten verarbeitet?
|
|
- marketing: Werden Daten fuer Werbung/Marketing/Newsletter genutzt?
|
|
- profiling: Findet Profiling, Scoring oder Personalisierung statt?
|
|
- automated_decisions: Werden automatisierte Einzelentscheidungen getroffen (Art. 22)?
|
|
- third_party_sharing: Werden Daten an Dritte/Partner weitergegeben?
|
|
- cross_border_transfer: Findet Datentransfer ausserhalb EU/EWR statt?
|
|
- tracking: Werden Cookies/Tracking-Pixel/Analytics eingesetzt?
|
|
- ai_usage: Wird KI/Machine Learning/Algorithmen eingesetzt?
|
|
|
|
Antworte NUR mit einem JSON-Objekt, keine Erklaerung:
|
|
{"personal_data": true, "customer_data": true, ...}
|
|
"""
|
|
|
|
|
|
async def extract_intake_flags(text: str) -> dict:
|
|
"""Extract structured intake flags from text via LLM."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=90.0) as client:
|
|
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
|
|
"model": OLLAMA_MODEL,
|
|
"prompt": f"{EXTRACTION_PROMPT}\n\nTEXT:\n{text[:2500]}",
|
|
"stream": False,
|
|
})
|
|
raw = resp.json().get("response", "")
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|
|
|
# Extract JSON from response
|
|
match = re.search(r"\{[^}]+\}", raw, re.DOTALL)
|
|
if match:
|
|
flags = json.loads(match.group())
|
|
logger.info("Extracted intake flags: %s", {k: v for k, v in flags.items() if v})
|
|
return flags
|
|
except Exception as e:
|
|
logger.warning("Intake extraction failed, using keyword fallback: %s", e)
|
|
|
|
# Fallback: keyword-based extraction
|
|
return _keyword_fallback(text)
|
|
|
|
|
|
def _keyword_fallback(text: str) -> dict:
|
|
"""Simple keyword-based fallback when LLM is unavailable."""
|
|
t = text.lower()
|
|
return {
|
|
"personal_data": True, # Always assume for websites
|
|
"customer_data": any(w in t for w in ["kunde", "customer", "nutzerkonto", "registrier"]),
|
|
"payment_data": any(w in t for w in ["zahlung", "kreditkarte", "paypal", "stripe", "klarna", "iban"]),
|
|
"location_data": any(w in t for w in ["standort", "gps", "location", "geo"]),
|
|
"biometric_data": any(w in t for w in ["biometrisch", "fingerabdruck", "gesichtserkennung"]),
|
|
"minor_data": any(w in t for w in ["kinder", "minderjährig", "under 16", "unter 16"]),
|
|
"health_data": any(w in t for w in ["gesundheit", "medizin", "patient", "health"]),
|
|
"marketing": any(w in t for w in ["werbung", "marketing", "newsletter", "werbe"]),
|
|
"profiling": any(w in t for w in ["profil", "personalis", "scoring", "empfehl"]),
|
|
"automated_decisions": any(w in t for w in ["automatisiert", "automated decision", "scoring"]),
|
|
"third_party_sharing": any(w in t for w in ["dritte", "partner", "dienstleister", "third part"]),
|
|
"cross_border_transfer": any(w in t for w in ["usa", "drittland", "drittst", "third countr"]),
|
|
"tracking": any(w in t for w in ["cookie", "tracking", "analytics", "pixel"]),
|
|
"ai_usage": any(w in t for w in ["künstliche intelligenz", "machine learning", "ki-", "ai-powered"]),
|
|
}
|
|
|
|
|
|
def flags_to_ucca_intake(flags: dict) -> dict:
|
|
"""Convert extracted flags to UCCA intake format."""
|
|
return {
|
|
"data_types": {
|
|
"personal_data": flags.get("personal_data", False),
|
|
"customer_data": flags.get("customer_data", False),
|
|
"location_data": flags.get("location_data", False),
|
|
"biometric_data": flags.get("biometric_data", False),
|
|
"minor_data": flags.get("minor_data", False),
|
|
"images": False,
|
|
"audio": False,
|
|
"financial_data": flags.get("payment_data", False),
|
|
"employee_data": False,
|
|
"article_9_data": flags.get("health_data", False) or flags.get("biometric_data", False),
|
|
},
|
|
"purpose": {
|
|
"marketing": flags.get("marketing", False),
|
|
"analytics": flags.get("tracking", False),
|
|
"profiling": flags.get("profiling", False),
|
|
"automation": flags.get("ai_usage", False),
|
|
"customer_support": False,
|
|
"evaluation_scoring": flags.get("automated_decisions", False),
|
|
"decision_making": flags.get("automated_decisions", False),
|
|
},
|
|
"automation": "fully_automated" if flags.get("automated_decisions") else
|
|
"partially_automated" if flags.get("ai_usage") else "manual",
|
|
"outputs": {
|
|
"recommendations_to_users": flags.get("profiling", False),
|
|
"data_export": flags.get("cross_border_transfer", False),
|
|
"legal_effects": flags.get("automated_decisions", False),
|
|
},
|
|
"hosting": {
|
|
"region": "non_eu" if flags.get("cross_border_transfer") else "eu",
|
|
},
|
|
}
|