diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 478a58c7..3a62c3a5 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -390,8 +390,13 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): cookie_payloads.extend(e["cmp_payloads"]) if e.get("text"): cookie_text = e["text"] + # Site-owner derived from the submitted URLs — drives the + # INTERNAL/GROUP_COMPANY classification of vendor records. + owner_name = _company_name_from_url(doc_entries) or "" if cookie_payloads: - cmp_vendors = extract_vendors_from_payloads(cookie_payloads) + cmp_vendors = extract_vendors_from_payloads( + cookie_payloads, owner_name=owner_name, + ) # V3 fallback: no named CMP captured but we have substantive # cookie text → ask Qwen/OVH to extract vendor list from the text. # Skip on very short text (likely navigation) to save LLM cost. @@ -399,8 +404,17 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): from compliance.services.vendor_llm_extractor import ( extract_vendors_via_llm, ) + from compliance.services.vendor_classifier import classify _update(check_id, "Vendor-Liste per LLM extrahieren...", 94) cmp_vendors = await extract_vendors_via_llm(cookie_text) + # LLM path doesn't run through extract_vendors_from_payloads, + # so classify here. + for v in cmp_vendors: + v["recipient_type"] = classify( + vendor_name=v.get("name", ""), + category=v.get("category", ""), + owner_name=owner_name, + ) if cmp_vendors: logger.info("VVT: %d vendors extracted, validating links", len(cmp_vendors)) diff --git a/backend-compliance/compliance/api/agent_doc_check_extras.py b/backend-compliance/compliance/api/agent_doc_check_extras.py index d8c89c8f..77497f9a 100644 --- a/backend-compliance/compliance/api/agent_doc_check_extras.py +++ b/backend-compliance/compliance/api/agent_doc_check_extras.py @@ -237,58 +237,32 @@ def _category_label(kat: str) -> str: def build_vvt_table_html(vendors: list[dict]) -> str: """Render the per-vendor VVT-style table for the email report. - One row per vendor. Columns: Name | Kategorie | Sitz | Cookies | - Opt-Out (Status) | Privacy (Status) | Compliance-Score. + Splits vendors into 3-4 sections by recipient_type (Art. 30(1)(d) + DSGVO): - Vendors are expected to come from vendor_extractor.extract_vendors_from_payloads - and have already been scored by cookie_link_validator.score_vendors. + 1. INTERNAL — own departments / own systems + 2. GROUP_COMPANY — parent/subsidiary (if any) + 3. PROCESSOR — Auftragsverarbeiter (AVV-pflichtig) + 4. CONTROLLER — joint/independent controllers (Meta, Google, + LinkedIn — they build own profiles) + 5. AUTHORITY / OTHER — rest + + Within each section: rows sorted by compliance_score ascending so + the weakest entries surface first. """ if not vendors: return "" - vendors = sorted(vendors, key=lambda v: v.get("compliance_score", 0)) - rows: list[str] = [] + # Import here to avoid pulling backend service deps at module load + from compliance.services.vendor_classifier import RECIPIENT_TYPE_SECTIONS + + # Bucket vendors by recipient_type + by_type: dict[str, list[dict]] = {} for v in vendors: - name = v.get("name") or "Unbekannt" - category = _category_label(v.get("category", "")) - country = v.get("country") or "—" - cookies = v.get("cookies") or [] - n_cookies = len(cookies) - score = int(v.get("compliance_score", 0)) - flags = v.get("compliance_flags") or [] - - opt_status = _link_status_badge( - v.get("opt_out_url"), v.get("opt_out_ok"), - v.get("opt_out_status"), - ) - privacy_status = _link_status_badge( - v.get("privacy_policy_url"), v.get("privacy_ok"), - v.get("privacy_status"), - ) - - score_color = ("#16a34a" if score >= 80 else - "#d97706" if score >= 50 else "#dc2626") - flag_str = "" - if flags: - flag_str = ( - f'
' - f'{", ".join(flags[:4])}
' - ) - rows.append( - f'' - f'' - f'{name}{flag_str}' - f'{category}' - f'{country}' - f'' - f'{n_cookies}' - f'{opt_status}' - f'{privacy_status}' - f'{score}%' - f'' - ) + rt = (v.get("recipient_type") or "OTHER").upper() + by_type.setdefault(rt, []).append(v) + # Top summary n_total = len(vendors) n_critical = sum(1 for v in vendors if v.get("compliance_score", 0) < 50) summary = ( @@ -297,15 +271,40 @@ def build_vvt_table_html(vendors: list[dict]) -> str: if n_critical else " — alle ueber 50%") ) - return ( + out: list[str] = [ '
' + 'background:#fafafa;border:1px solid #e5e7eb;border-radius:8px">', '

' - 'VVT-Vorschlag: Drittanbieter aus Cookie-Richtlinie

' + 'VVT-Vorschlag: Drittanbieter aus Cookie-Richtlinie', f'

{summary}. ' - 'Sortiert nach Compliance-Score (niedrig zuerst — diese Eintraege ' - 'pruefen).

' + 'Gruppiert nach Empfaengerkategorie (Art. 30(1)(d) DSGVO), innerhalb ' + 'jeder Gruppe nach Compliance-Score sortiert.

', + ] + + for rtype, section_label in RECIPIENT_TYPE_SECTIONS: + rows = by_type.get(rtype) or [] + if not rows: + continue + rows = sorted(rows, key=lambda v: v.get("compliance_score", 0)) + n = len(rows) + n_bad = sum(1 for v in rows if v.get("compliance_score", 0) < 50) + bad_hint = (f' ({n_bad} unter 50%)' + if n_bad else "") + out.append( + f'

' + f'{section_label} ' + f'({n}){bad_hint}

' + ) + out.append(_render_vendor_section(rows)) + + out.append('
') + return "".join(out) + + +def _render_vendor_section(rows: list[dict]) -> str: + body: list[str] = [ '' '' '' @@ -315,9 +314,50 @@ def build_vvt_table_html(vendors: list[dict]) -> str: '' '' '' - '' - + "".join(rows) - + '
NameOpt-OutPrivacyScore
' + '', + ] + for v in rows: + body.append(_render_vendor_row_full(v)) + body.append('') + return "".join(body) + + +def _render_vendor_row_full(v: dict) -> str: + name = v.get("name") or "Unbekannt" + category = _category_label(v.get("category", "")) + country = v.get("country") or "—" + cookies = v.get("cookies") or [] + n_cookies = len(cookies) + score = int(v.get("compliance_score", 0)) + flags = v.get("compliance_flags") or [] + opt_status = _link_status_badge( + v.get("opt_out_url"), v.get("opt_out_ok"), v.get("opt_out_status"), + ) + privacy_status = _link_status_badge( + v.get("privacy_policy_url"), v.get("privacy_ok"), + v.get("privacy_status"), + ) + score_color = ("#16a34a" if score >= 80 else + "#d97706" if score >= 50 else "#dc2626") + flag_str = "" + if flags: + flag_str = ( + f'
' + f'{", ".join(flags[:4])}
' + ) + return ( + f'' + f'' + f'{name}{flag_str}' + f'{category}' + f'{country}' + f'' + f'{n_cookies}' + f'{opt_status}' + f'{privacy_status}' + f'{score}%' + f'' ) diff --git a/backend-compliance/compliance/services/vendor_classifier.py b/backend-compliance/compliance/services/vendor_classifier.py new file mode 100644 index 00000000..ad712211 --- /dev/null +++ b/backend-compliance/compliance/services/vendor_classifier.py @@ -0,0 +1,151 @@ +""" +Recipient-type classifier for vendor records (Art. 30(1)(d) DSGVO). + +Tags each extracted vendor entry with one of the canonical +RecipientCategoryType values used by the VVT module: + + - INTERNAL — owner's own department / own system (BMW AG processing + for itself, e.g. 'BMW AG — Form Validation') + - GROUP_COMPANY — parent/subsidiary/sister of the owner (BMW Bank, + BMW Motorrad, BMW Financial Services) + - PROCESSOR — external Auftragsverarbeiter under AVV (Adobe, + Akamai, AWS, Salesforce — they process on behalf) + - CONTROLLER — independent / joint controller (Meta Pixel, Google + YouTube — they run their own profiles) + - AUTHORITY — government bodies (rare in cookie contexts) + - OTHER — fallback + +Heuristic only — does not query Vault or external sources. A site-owner +name is derived from the user-submitted URL (e.g. bmw.de -> 'BMW AG' or +'BMW'). Classification compares the vendor name to that owner name. +""" + +from __future__ import annotations + +import re +from urllib.parse import urlparse + +# Known tracking/advertising platforms that typically act as INDEPENDENT +# or JOINT CONTROLLERS rather than processors. They build their own user +# profiles across many sites; the site owner has limited control over +# what they do with the data once collected. +_JOINT_CONTROLLER_HINTS = { + "meta", # Meta Pixel (Facebook/Instagram) + "facebook", + "instagram", + "google adverti", # Google Advertising + "google ads", + "youtube", + "doubleclick", + "linkedin insight", + "linkedin", + "tiktok", + "pinterest", + "twitter", + "x.com", + "snapchat", + "taboola", + "outbrain", + "criteo", + "amazon adverti", # Amazon Advertising (vs AWS) + "microsoft adverti", + "yandex", + "reddit", + "quora", + "spotify", +} + + +def owner_from_url(url: str) -> str: + """Derive a short owner name from a URL. + + bmw.de -> 'BMW', mercedes-benz.de -> 'Mercedes-Benz', + deutsche-bahn.de -> 'Deutsche-Bahn'. Used to detect the INTERNAL + case when a vendor record's provider name starts with or contains + this token. + """ + if not url or "://" not in url: + return "" + netloc = urlparse(url).netloc.lower() + if netloc.startswith("www."): + netloc = netloc[4:] + parts = netloc.split(".") + if len(parts) < 2: + return "" + sld = parts[-2] if len(parts) <= 2 else parts[-2] # bmw + # Acronym (<=4 chars, no hyphen) -> uppercase (BMW, ARD, ZDF) + if len(sld) <= 4 and "-" not in sld: + return sld.upper() + return "-".join(p.capitalize() for p in sld.split("-")) + + +def classify( + vendor_name: str, + category: str, + owner_name: str, +) -> str: + """Return one of INTERNAL / GROUP_COMPANY / PROCESSOR / CONTROLLER / OTHER. + + Args: + vendor_name: the provider/processing name as it appears in the + cookie policy (e.g. 'BMW AG — Form Validation' or 'Adobe Systems + Software Ireland Limited — Adobe Analytics'). + category: canonical category ('marketing', 'necessary', 'statistics', + 'functional'). Used to distinguish controller vs processor for ad + platforms. + owner_name: short token derived from the site URL ('BMW', + 'Mercedes-Benz'). Empty string disables INTERNAL detection. + """ + name = (vendor_name or "").strip() + if not name: + return "OTHER" + lower = name.lower() + + # 1. INTERNAL — owner processing for itself. + # Strict: provider must BE the owner's main legal entity: + # ' AG', ' SE', ' GmbH', '' alone, or + # ' AG — ' / ' SE — '. + if owner_name: + ow = owner_name.lower() + first_token = lower.split(" — ", 1)[0].strip() # text before ' — ' + if (first_token == ow + or first_token == f"{ow} ag" + or first_token == f"{ow} se" + or first_token == f"{ow} gmbh" + or first_token == f"{ow} ag & co. kg"): + return "INTERNAL" + + # 2. GROUP_COMPANY — provider is in the owner's brand family but a + # different legal entity (BMW Bank GmbH, BMW Motorrad GmbH, + # BMW Financial Services). + if owner_name: + ow = owner_name.lower() + first_token = lower.split(" — ", 1)[0].strip() + if first_token.startswith(f"{ow} ") and first_token != f"{ow} ag": + return "GROUP_COMPANY" + + # 3. CONTROLLER — known tracking/ad platforms + if any(hint in lower for hint in _JOINT_CONTROLLER_HINTS): + return "CONTROLLER" + + # 4. PROCESSOR — everything else with a corporate name is most likely + # an Auftragsverarbeiter (hosting/CDN/analytics/chat/captcha/CRM) + if any(suffix in lower for suffix in ( + "gmbh", "ag ", " ag", "ag—", "ag ", "se ", "kg", "ohg", + "inc.", "inc ", "ltd", "limited", "llc", "corp", "b.v.", + "a/s", "s.a.", "s.l.", "s.r.l", "oy ", "ab ", "as ", + )): + return "PROCESSOR" + + return "OTHER" + + +# Section ordering + display labels for the VVT email table +RECIPIENT_TYPE_SECTIONS = [ + ("INTERNAL", "Eigene Verarbeitung"), + ("GROUP_COMPANY", "Konzernunternehmen (Mutter/Tochter)"), + ("PROCESSOR", "Auftragsverarbeiter (AVV-pflichtig)"), + ("CONTROLLER", "Eigenverantwortliche Dritte / Joint Controller"), + ("AUTHORITY", "Behoerden"), + ("OTHER", "Sonstige Empfaenger"), +] diff --git a/backend-compliance/compliance/services/vendor_extractor.py b/backend-compliance/compliance/services/vendor_extractor.py index cabe83fc..1411c047 100644 --- a/backend-compliance/compliance/services/vendor_extractor.py +++ b/backend-compliance/compliance/services/vendor_extractor.py @@ -42,11 +42,18 @@ def _clean(s: object) -> str: return _WS_RE.sub(" ", no_tags).strip() -def extract_vendors_from_payloads(payloads: list[dict]) -> list[dict]: +def extract_vendors_from_payloads( + payloads: list[dict], + owner_name: str = "", +) -> list[dict]: """Walk every captured CMP payload, dispatch to per-CMP extractor. Deduplicates vendors across payloads by name (preserves richer record). + Tags each vendor with `recipient_type` (Art. 30(1)(d) DSGVO) using + the owner_name to detect INTERNAL processing. """ + from compliance.services.vendor_classifier import classify + all_vendors: dict[str, dict] = {} for payload in payloads or []: kind = payload.get("kind", "") @@ -76,9 +83,13 @@ def extract_vendors_from_payloads(payloads: list[dict]) -> list[dict]: name = (v.get("name") or "").strip() if not name: continue + v["recipient_type"] = classify( + vendor_name=name, + category=v.get("category", ""), + owner_name=owner_name, + ) existing = all_vendors.get(name) if existing: - # Merge cookies + fill empty fields for k, v_val in v.items(): if not existing.get(k) and v_val: existing[k] = v_val