Files
breakpilot-compliance/backend-compliance/compliance/services/intake_extractor.py
T
Benjamin Admin 0f3ec9061e fix: false positive findings + restore docs-src + §312k ecommerce filter
1. Intake prompt: "BETREIBER verarbeitet" statt "Text erwaehnt".
   IHK berichtet ueber Gesundheitsdaten → false. Vorher: true.
2. §312k Check: nur bei E-Commerce/Abo-Websites (Warenkorb, Shop, PayPal etc.)
   IHK hat keine Vertraege → kein Kuendigungsbutton noetig.
3. docs-src/ restored from commit 9824304

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-02 08:26:59 +02:00

133 lines
6.1 KiB
Python

"""
Intake Extractor — LLM-based extraction of UCCA intake flags from document text.
Replaces simple keyword matching with structured LLM analysis for more
accurate risk scoring.
"""
import json
import logging
import os
import re
import httpx
logger = logging.getLogger(__name__)
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen3.5:35b-a3b")
EXTRACTION_PROMPT = """/no_think
Du analysierst eine Datenschutzerklaerung oder Website. Bestimme ob der
BETREIBER DIESER WEBSITE die folgenden Daten AKTIV VERARBEITET.
WICHTIG: Setze ein Flag NUR auf true wenn der Websitebetreiber diese Daten
SELBST erhebt, speichert oder verarbeitet. NICHT wenn die Website nur
UEBER solche Themen BERICHTET oder informiert.
Beispiel: Eine IHK-Website die UEBER Datenschutz im Gesundheitswesen
berichtet → health_data: false (die IHK verarbeitet keine Gesundheitsdaten)
Flags:
- personal_data: Erhebt der Betreiber personenbezogene Daten (Name, Email, IP)?
- customer_data: Speichert der Betreiber Kundendaten (Registrierung, Konto)?
- payment_data: Verarbeitet der Betreiber Zahlungsdaten (Shop, Buchung)?
- location_data: Erhebt der Betreiber GPS/Standortdaten der Nutzer?
- biometric_data: Verarbeitet der Betreiber biometrische Daten?
- minor_data: Richtet sich die Website gezielt an Kinder/Minderjaehrige?
- health_data: Verarbeitet der Betreiber Gesundheitsdaten seiner Nutzer?
- marketing: Nutzt der Betreiber Nutzerdaten fuer eigene Werbung/Newsletter?
- profiling: Erstellt der Betreiber Nutzerprofile oder Scoring?
- automated_decisions: Trifft der Betreiber automatisierte Einzelentscheidungen?
- third_party_sharing: Gibt der Betreiber Nutzerdaten an Dritte weiter?
- cross_border_transfer: Uebermittelt der Betreiber Daten ausserhalb EU/EWR?
- tracking: Setzt der Betreiber Cookies/Tracking/Analytics ein?
- ai_usage: Setzt der Betreiber KI/Machine Learning ein?
Antworte NUR mit einem JSON-Objekt, keine Erklaerung:
{"personal_data": true, "customer_data": false, ...}
"""
async def extract_intake_flags(text: str) -> dict:
"""Extract structured intake flags from text via LLM."""
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
"model": OLLAMA_MODEL,
"prompt": f"{EXTRACTION_PROMPT}\n\nTEXT:\n{text[:2500]}",
"stream": False,
})
raw = resp.json().get("response", "")
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
# Extract JSON from response
match = re.search(r"\{[^}]+\}", raw, re.DOTALL)
if match:
flags = json.loads(match.group())
logger.info("Extracted intake flags: %s", {k: v for k, v in flags.items() if v})
return flags
except Exception as e:
logger.warning("Intake extraction failed, using keyword fallback: %s", e)
# Fallback: keyword-based extraction
return _keyword_fallback(text)
def _keyword_fallback(text: str) -> dict:
"""Simple keyword-based fallback when LLM is unavailable."""
t = text.lower()
return {
"personal_data": True, # Always assume for websites
"customer_data": any(w in t for w in ["kunde", "customer", "nutzerkonto", "registrier"]),
"payment_data": any(w in t for w in ["zahlung", "kreditkarte", "paypal", "stripe", "klarna", "iban"]),
"location_data": any(w in t for w in ["standort", "gps", "location", "geo"]),
"biometric_data": any(w in t for w in ["biometrisch", "fingerabdruck", "gesichtserkennung"]),
"minor_data": any(w in t for w in ["kinder", "minderjährig", "under 16", "unter 16"]),
"health_data": any(w in t for w in ["gesundheit", "medizin", "patient", "health"]),
"marketing": any(w in t for w in ["werbung", "marketing", "newsletter", "werbe"]),
"profiling": any(w in t for w in ["profil", "personalis", "scoring", "empfehl"]),
"automated_decisions": any(w in t for w in ["automatisiert", "automated decision", "scoring"]),
"third_party_sharing": any(w in t for w in ["dritte", "partner", "dienstleister", "third part"]),
"cross_border_transfer": any(w in t for w in ["usa", "drittland", "drittst", "third countr"]),
"tracking": any(w in t for w in ["cookie", "tracking", "analytics", "pixel"]),
"ai_usage": any(w in t for w in ["künstliche intelligenz", "machine learning", "ki-", "ai-powered"]),
}
def flags_to_ucca_intake(flags: dict) -> dict:
"""Convert extracted flags to UCCA intake format."""
return {
"data_types": {
"personal_data": flags.get("personal_data", False),
"customer_data": flags.get("customer_data", False),
"location_data": flags.get("location_data", False),
"biometric_data": flags.get("biometric_data", False),
"minor_data": flags.get("minor_data", False),
"images": False,
"audio": False,
"financial_data": flags.get("payment_data", False),
"employee_data": False,
"article_9_data": flags.get("health_data", False) or flags.get("biometric_data", False),
},
"purpose": {
"marketing": flags.get("marketing", False),
"analytics": flags.get("tracking", False),
"profiling": flags.get("profiling", False),
"automation": flags.get("ai_usage", False),
"customer_support": False,
"evaluation_scoring": flags.get("automated_decisions", False),
"decision_making": flags.get("automated_decisions", False),
},
"automation": "fully_automated" if flags.get("automated_decisions") else
"partially_automated" if flags.get("ai_usage") else "manual",
"outputs": {
"recommendations_to_users": flags.get("profiling", False),
"data_export": flags.get("cross_border_transfer", False),
"legal_effects": flags.get("automated_decisions", False),
},
"hosting": {
"region": "non_eu" if flags.get("cross_border_transfer") else "eu",
},
}