diff --git a/backend-compliance/compliance/api/agent_analyze_routes.py b/backend-compliance/compliance/api/agent_analyze_routes.py index ac6ceb5..833fa38 100644 --- a/backend-compliance/compliance/api/agent_analyze_routes.py +++ b/backend-compliance/compliance/api/agent_analyze_routes.py @@ -15,6 +15,8 @@ from fastapi import APIRouter from pydantic import BaseModel from compliance.services.smtp_sender import send_email +from compliance.services.intake_extractor import extract_intake_flags, flags_to_ucca_intake +from compliance.services.relevance_filter import filter_controls logger = logging.getLogger(__name__) @@ -77,21 +79,24 @@ async def analyze_url(req: AnalyzeRequest): # Step 2: Classify via SDK LLM classification = await _classify(client, text) - # Step 3: Assess via UCCA - assessment = await _assess(client, text, classification) + # Step 3: Extract intake flags via LLM (better than keyword matching) + intake_flags = await extract_intake_flags(text) - # Step 4: Determine role + # Step 4: Assess via UCCA with LLM-extracted flags + assessment = await _assess(client, text, classification, intake_flags) + + # Step 5: Determine role esc_level = assessment.get("escalation_level", "E0") role = ESCALATION_ROLES.get(esc_level, ESCALATION_ROLES["E0"]) - # Step 5: Website compliance checks (§312k BGB etc.) + # Step 6: Website compliance checks (§312k BGB etc.) site_findings, follow_ups = await _check_website_compliance(client, req.url, raw_html) - # Step 6: Merge findings + # Step 7: Merge and filter findings/controls findings = assessment.get("triggered_rules", []) controls = assessment.get("required_controls", []) findings_str = _to_string_list(findings) + site_findings - controls_str = _to_string_list(controls) + controls_str = filter_controls(_to_string_list(controls), text, intake_flags) # Escalate if website checks found issues if site_findings and esc_level == "E0": @@ -179,34 +184,24 @@ async def _classify(client: httpx.AsyncClient, text: str) -> str: return "other" -async def _assess(client: httpx.AsyncClient, text: str, classification: str) -> dict: +async def _assess(client: httpx.AsyncClient, text: str, classification: str, intake_flags: dict | None = None) -> dict: """Run UCCA assessment via SDK. Returns flattened result dict.""" try: - # UCCA expects boolean intake flags, not string categories + # Use LLM-extracted flags if available, otherwise minimal defaults + if intake_flags: + ucca_intake = flags_to_ucca_intake(intake_flags) + else: + ucca_intake = { + "data_types": {"personal_data": True}, + "purpose": {}, + "automation": "manual", + "outputs": {}, + } + resp = await client.post(f"{SDK_URL}/sdk/v1/ucca/assess", headers=SDK_HEADERS, json={ "use_case_text": text[:3000], "domain": classification, - "data_types": { - "personal_data": True, - "customer_data": True, - "location_data": "tracking" in text.lower() or "standort" in text.lower(), - "images": False, - "biometric_data": "biometrisch" in text.lower(), - "minor_data": "kinder" in text.lower() or "minderjährig" in text.lower(), - }, - "purpose": { - "marketing": "werbung" in text.lower() or "marketing" in text.lower(), - "analytics": "analyse" in text.lower() or "analytics" in text.lower(), - "profiling": "profil" in text.lower() or "personalis" in text.lower(), - "automation": False, - "customer_support": False, - }, - "automation": "partially_automated", - "outputs": { - "content_generation": False, - "recommendations_to_users": "empfehl" in text.lower(), - "data_export": "export" in text.lower() or "uebertrag" in text.lower(), - }, + **ucca_intake, }) data = resp.json() # Flatten: UCCA wraps result under "assessment" and "result" diff --git a/backend-compliance/compliance/services/intake_extractor.py b/backend-compliance/compliance/services/intake_extractor.py new file mode 100644 index 0000000..4c3fb90 --- /dev/null +++ b/backend-compliance/compliance/services/intake_extractor.py @@ -0,0 +1,125 @@ +""" +Intake Extractor — LLM-based extraction of UCCA intake flags from document text. + +Replaces simple keyword matching with structured LLM analysis for more +accurate risk scoring. +""" + +import json +import logging +import os +import re + +import httpx + +logger = logging.getLogger(__name__) + +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434") +OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen3.5:35b-a3b") + +EXTRACTION_PROMPT = """/no_think +Analysiere den folgenden Text (Datenschutzerklaerung oder Website-Inhalt) und +bestimme fuer JEDES der folgenden Flags ob es zutrifft (true/false). + +Flags: +- personal_data: Werden personenbezogene Daten verarbeitet? +- customer_data: Werden Kundendaten (Name, Email, Adresse) gespeichert? +- payment_data: Werden Zahlungsdaten (Kreditkarte, IBAN, PayPal) verarbeitet? +- location_data: Werden Standort-/GPS-Daten erhoben? +- biometric_data: Werden biometrische Daten verarbeitet? +- minor_data: Werden Daten von Kindern/Minderjaehrigen verarbeitet? +- health_data: Werden Gesundheitsdaten verarbeitet? +- marketing: Werden Daten fuer Werbung/Marketing/Newsletter genutzt? +- profiling: Findet Profiling, Scoring oder Personalisierung statt? +- automated_decisions: Werden automatisierte Einzelentscheidungen getroffen (Art. 22)? +- third_party_sharing: Werden Daten an Dritte/Partner weitergegeben? +- cross_border_transfer: Findet Datentransfer ausserhalb EU/EWR statt? +- tracking: Werden Cookies/Tracking-Pixel/Analytics eingesetzt? +- ai_usage: Wird KI/Machine Learning/Algorithmen eingesetzt? + +Antworte NUR mit einem JSON-Objekt, keine Erklaerung: +{"personal_data": true, "customer_data": true, ...} +""" + + +async def extract_intake_flags(text: str) -> dict: + """Extract structured intake flags from text via LLM.""" + try: + async with httpx.AsyncClient(timeout=90.0) as client: + resp = await client.post(f"{OLLAMA_URL}/api/generate", json={ + "model": OLLAMA_MODEL, + "prompt": f"{EXTRACTION_PROMPT}\n\nTEXT:\n{text[:2500]}", + "stream": False, + }) + raw = resp.json().get("response", "") + raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() + + # Extract JSON from response + match = re.search(r"\{[^}]+\}", raw, re.DOTALL) + if match: + flags = json.loads(match.group()) + logger.info("Extracted intake flags: %s", {k: v for k, v in flags.items() if v}) + return flags + except Exception as e: + logger.warning("Intake extraction failed, using keyword fallback: %s", e) + + # Fallback: keyword-based extraction + return _keyword_fallback(text) + + +def _keyword_fallback(text: str) -> dict: + """Simple keyword-based fallback when LLM is unavailable.""" + t = text.lower() + return { + "personal_data": True, # Always assume for websites + "customer_data": any(w in t for w in ["kunde", "customer", "nutzerkonto", "registrier"]), + "payment_data": any(w in t for w in ["zahlung", "kreditkarte", "paypal", "stripe", "klarna", "iban"]), + "location_data": any(w in t for w in ["standort", "gps", "location", "geo"]), + "biometric_data": any(w in t for w in ["biometrisch", "fingerabdruck", "gesichtserkennung"]), + "minor_data": any(w in t for w in ["kinder", "minderjährig", "under 16", "unter 16"]), + "health_data": any(w in t for w in ["gesundheit", "medizin", "patient", "health"]), + "marketing": any(w in t for w in ["werbung", "marketing", "newsletter", "werbe"]), + "profiling": any(w in t for w in ["profil", "personalis", "scoring", "empfehl"]), + "automated_decisions": any(w in t for w in ["automatisiert", "automated decision", "scoring"]), + "third_party_sharing": any(w in t for w in ["dritte", "partner", "dienstleister", "third part"]), + "cross_border_transfer": any(w in t for w in ["usa", "drittland", "drittst", "third countr"]), + "tracking": any(w in t for w in ["cookie", "tracking", "analytics", "pixel"]), + "ai_usage": any(w in t for w in ["künstliche intelligenz", "machine learning", "ki-", "ai-powered"]), + } + + +def flags_to_ucca_intake(flags: dict) -> dict: + """Convert extracted flags to UCCA intake format.""" + return { + "data_types": { + "personal_data": flags.get("personal_data", False), + "customer_data": flags.get("customer_data", False), + "location_data": flags.get("location_data", False), + "biometric_data": flags.get("biometric_data", False), + "minor_data": flags.get("minor_data", False), + "images": False, + "audio": False, + "financial_data": flags.get("payment_data", False), + "employee_data": False, + "article_9_data": flags.get("health_data", False) or flags.get("biometric_data", False), + }, + "purpose": { + "marketing": flags.get("marketing", False), + "analytics": flags.get("tracking", False), + "profiling": flags.get("profiling", False), + "automation": flags.get("ai_usage", False), + "customer_support": False, + "evaluation_scoring": flags.get("automated_decisions", False), + "decision_making": flags.get("automated_decisions", False), + }, + "automation": "fully_automated" if flags.get("automated_decisions") else + "partially_automated" if flags.get("ai_usage") else "manual", + "outputs": { + "recommendations_to_users": flags.get("profiling", False), + "data_export": flags.get("cross_border_transfer", False), + "legal_effects": flags.get("automated_decisions", False), + }, + "hosting": { + "region": "non_eu" if flags.get("cross_border_transfer") else "eu", + }, + } diff --git a/backend-compliance/compliance/services/relevance_filter.py b/backend-compliance/compliance/services/relevance_filter.py new file mode 100644 index 0000000..413c227 --- /dev/null +++ b/backend-compliance/compliance/services/relevance_filter.py @@ -0,0 +1,152 @@ +""" +Control Relevance Filter — filters out controls that are not relevant +for the analyzed document based on keyword matching. + +Prevents false positives like C_TRANSPARENCY being recommended when +no AI usage is evident. +""" + +import logging +import re + +logger = logging.getLogger(__name__) + +# Top controls with their relevance conditions. +# A control is only relevant if ANY keyword from 'requires_any' matches the text. +# If 'requires_any' is empty, the control is always relevant. +CONTROL_RELEVANCE: dict[str, dict] = { + "C_TRANSPARENCY": { + "description": "KI-Transparenz-Hinweis (Art. 52 AI Act)", + "requires_any": [ + "künstliche intelligenz", "kuenstliche intelligenz", + "artificial intelligence", "machine learning", "maschinelles lernen", + "ki-gestützt", "ki-gestuetzt", "ai-powered", "ai system", + "chatbot", "neural", "deep learning", "algorithmus", "algorithmen", + "automatisierte entscheidung", "automated decision", + ], + "reason": "Nur relevant wenn KI/ML tatsaechlich eingesetzt wird", + }, + "C_DSFA_REQUIRED": { + "description": "Datenschutz-Folgenabschaetzung durchfuehren", + "requires_any": [ + "gesundheit", "biometrisch", "genetisch", "health", "biometric", + "scoring", "profiling", "systematisch", "umfangreich", + "videoüberwachung", "videoueberwachung", "kamera", + "minderjährig", "minderjaehrig", "kinder", + ], + "reason": "Nur bei hohem Risiko (Art. 9 Daten, Profiling, Ueberwachung)", + }, + "C_ART22_INFO": { + "description": "Info ueber automatisierte Einzelentscheidung (Art. 22 DSGVO)", + "requires_any": [ + "automatisierte entscheidung", "automated decision", "scoring", + "bonitaet", "kredit", "rating", "algorithmische entscheidung", + "profiling", "klarna", "ratenzahlung", + ], + "reason": "Nur bei automatisierten Einzelentscheidungen mit Rechtswirkung", + }, + "C_DPO_REQUIRED": { + "description": "Datenschutzbeauftragten bestellen", + "requires_any": [], # Always relevant — empty means no filter + "reason": "Generell relevant fuer Unternehmen", + }, + "C_EXPLICIT_CONSENT": { + "description": "Explizite Einwilligung einholen", + "requires_any": [ + "cookie", "tracking", "analytics", "pixel", "marketing", + "werbung", "newsletter", "remarketing", "retargeting", + "einwilligung", "consent", "opt-in", + ], + "reason": "Nur bei Tracking/Marketing das Einwilligung erfordert", + }, + "C_CHILD_PROTECTION": { + "description": "Besonderer Schutz fuer Minderdjaehrige", + "requires_any": [ + "kinder", "minderjährig", "minderjaehrig", "jugend", + "under 16", "unter 16", "schüler", "schueler", "child", + ], + "reason": "Nur wenn Daten von Minderjaehrigen verarbeitet werden", + }, + "C_THIRD_COUNTRY_SAFEGUARDS": { + "description": "Drittlandtransfer absichern (Art. 44-49 DSGVO)", + "requires_any": [ + "usa", "united states", "drittland", "drittst", "third countr", + "standardvertragsklausel", "sccs", "binding corporate", + "angemessenheitsbeschluss", "adequacy", + "google", "meta", "facebook", "amazon", "microsoft", "apple", + "cloudflare", "stripe", "paypal", + ], + "reason": "Nur bei Datentransfer in Drittlaender", + }, +} + + +def filter_controls( + controls: list[str], + source_text: str, + intake_flags: dict | None = None, +) -> list[str]: + """Filter controls based on relevance to the analyzed text. + + Returns only controls that are relevant (keyword match or no filter defined). + """ + if not controls: + return controls + + text_lower = source_text.lower() + filtered = [] + removed = [] + + for control in controls: + # Extract control ID from string like "[C_TRANSPARENCY] Nutzer informieren..." + control_id = _extract_control_id(control) + + if control_id and control_id in CONTROL_RELEVANCE: + rules = CONTROL_RELEVANCE[control_id] + keywords = rules["requires_any"] + + if not keywords: + # No filter = always relevant + filtered.append(control) + continue + + # Check if any keyword matches + if any(kw in text_lower for kw in keywords): + filtered.append(control) + else: + # Also check intake flags as fallback + if intake_flags and _check_flags(control_id, intake_flags): + filtered.append(control) + else: + removed.append((control_id, rules["reason"])) + else: + # Unknown control — keep it (don't filter what we don't understand) + filtered.append(control) + + if removed: + logger.info( + "Relevance filter removed %d controls: %s", + len(removed), + ", ".join(f"{cid} ({reason})" for cid, reason in removed), + ) + + return filtered + + +def _extract_control_id(control: str) -> str | None: + """Extract control ID from '[C_XXX] description' format.""" + match = re.match(r"\[([A-Z_0-9]+)\]", control) + return match.group(1) if match else None + + +def _check_flags(control_id: str, flags: dict) -> bool: + """Check if intake flags make a control relevant.""" + flag_map = { + "C_TRANSPARENCY": flags.get("ai_usage", False), + "C_DSFA_REQUIRED": flags.get("health_data", False) or flags.get("biometric_data", False), + "C_ART22_INFO": flags.get("automated_decisions", False), + "C_EXPLICIT_CONSENT": flags.get("tracking", False) or flags.get("marketing", False), + "C_CHILD_PROTECTION": flags.get("minor_data", False), + "C_THIRD_COUNTRY_SAFEGUARDS": flags.get("cross_border_transfer", False), + } + return flag_map.get(control_id, False)