feat(vvt): recipient-type classification + 3-section VVT table

Per user request: BMW (and others) put their own services AND external
vendors in the same cookie-policy widget. The VVT-Tabelle now groups
them by Art. 30(1)(d) DSGVO recipient category so the DSB can act on
the right buckets:

  - INTERNAL      — owner processing for itself ('BMW AG — XYZ')
  - GROUP_COMPANY — same brand family, different legal entity ('BMW Bank')
  - PROCESSOR     — Auftragsverarbeiter, AVV-pflichtig (Adobe, Akamai)
  - CONTROLLER    — independent / joint controller (Meta Pixel, Google
                    Ads, LinkedIn — they run their own profiles)
  - AUTHORITY     — government bodies (rare in cookies)
  - OTHER         — fallback

New module vendor_classifier.py:
- owner_from_url(url) — derive site-owner token (bmw.de -> 'BMW',
  mercedes-benz.de -> 'Mercedes-Benz')
- classify(name, category, owner) — strict 5-tier heuristic:
  * INTERNAL: vendor name first-token is '<Owner>' / '<Owner> AG' /
    '<Owner> SE' / '<Owner> GmbH' / '<Owner> AG & Co. KG'
  * GROUP_COMPANY: starts with '<Owner> ' but isn't '<Owner> AG'
  * CONTROLLER: matches a known joint-controller list (Meta, Google
    Ads, YouTube, LinkedIn Insight, TikTok, Pinterest, Taboola,
    Outbrain, Criteo, Twitter, Reddit, ...)
  * PROCESSOR: legal-form suffix in name (GmbH, AG, Inc., A/S,
    B.V., S.A., Ltd., LLC, ...)
  * OTHER: anything else

vendor_extractor.extract_vendors_from_payloads now takes owner_name:
- Passes it through to classify() for every extracted vendor record
- The route derives owner_name via _company_name_from_url(doc_entries)
- LLM-extracted vendors are classified the same way (so V3 fallback
  also produces tagged records)

agent_doc_check_extras.build_vvt_table_html rewritten:
- Buckets vendors by recipient_type
- Renders one section per non-empty bucket, in canonical order
  (RECIPIENT_TYPE_SECTIONS), each with section header + count + bad
  count + nested table
- Within each section: sorted by compliance_score ascending
- Response JSON cmp_vendors includes recipient_type so the frontend
  can later import per-category into the VVT module

Expected BMW result: ~60 INTERNAL rows (BMW AG own services),
~25 PROCESSOR rows (Adobe, Adform, Akamai, AWS, ...), ~5 CONTROLLER
rows (Meta Pixel, Google, LinkedIn, Pinterest, Outbrain, Taboola).
This commit is contained in:
Benjamin Admin
2026-05-17 12:31:49 +02:00
parent 6c7d4c7552
commit fab1e35847
4 changed files with 272 additions and 56 deletions
@@ -390,8 +390,13 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
cookie_payloads.extend(e["cmp_payloads"])
if e.get("text"):
cookie_text = e["text"]
# Site-owner derived from the submitted URLs — drives the
# INTERNAL/GROUP_COMPANY classification of vendor records.
owner_name = _company_name_from_url(doc_entries) or ""
if cookie_payloads:
cmp_vendors = extract_vendors_from_payloads(cookie_payloads)
cmp_vendors = extract_vendors_from_payloads(
cookie_payloads, owner_name=owner_name,
)
# V3 fallback: no named CMP captured but we have substantive
# cookie text → ask Qwen/OVH to extract vendor list from the text.
# Skip on very short text (likely navigation) to save LLM cost.
@@ -399,8 +404,17 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
from compliance.services.vendor_llm_extractor import (
extract_vendors_via_llm,
)
from compliance.services.vendor_classifier import classify
_update(check_id, "Vendor-Liste per LLM extrahieren...", 94)
cmp_vendors = await extract_vendors_via_llm(cookie_text)
# LLM path doesn't run through extract_vendors_from_payloads,
# so classify here.
for v in cmp_vendors:
v["recipient_type"] = classify(
vendor_name=v.get("name", ""),
category=v.get("category", ""),
owner_name=owner_name,
)
if cmp_vendors:
logger.info("VVT: %d vendors extracted, validating links",
len(cmp_vendors))
@@ -237,58 +237,32 @@ def _category_label(kat: str) -> str:
def build_vvt_table_html(vendors: list[dict]) -> str:
"""Render the per-vendor VVT-style table for the email report.
One row per vendor. Columns: Name | Kategorie | Sitz | Cookies |
Opt-Out (Status) | Privacy (Status) | Compliance-Score.
Splits vendors into 3-4 sections by recipient_type (Art. 30(1)(d)
DSGVO):
Vendors are expected to come from vendor_extractor.extract_vendors_from_payloads
and have already been scored by cookie_link_validator.score_vendors.
1. INTERNAL — own departments / own systems
2. GROUP_COMPANY — parent/subsidiary (if any)
3. PROCESSOR — Auftragsverarbeiter (AVV-pflichtig)
4. CONTROLLER — joint/independent controllers (Meta, Google,
LinkedIn — they build own profiles)
5. AUTHORITY / OTHER — rest
Within each section: rows sorted by compliance_score ascending so
the weakest entries surface first.
"""
if not vendors:
return ""
vendors = sorted(vendors, key=lambda v: v.get("compliance_score", 0))
rows: list[str] = []
# Import here to avoid pulling backend service deps at module load
from compliance.services.vendor_classifier import RECIPIENT_TYPE_SECTIONS
# Bucket vendors by recipient_type
by_type: dict[str, list[dict]] = {}
for v in vendors:
name = v.get("name") or "Unbekannt"
category = _category_label(v.get("category", ""))
country = v.get("country") or ""
cookies = v.get("cookies") or []
n_cookies = len(cookies)
score = int(v.get("compliance_score", 0))
flags = v.get("compliance_flags") or []
opt_status = _link_status_badge(
v.get("opt_out_url"), v.get("opt_out_ok"),
v.get("opt_out_status"),
)
privacy_status = _link_status_badge(
v.get("privacy_policy_url"), v.get("privacy_ok"),
v.get("privacy_status"),
)
score_color = ("#16a34a" if score >= 80 else
"#d97706" if score >= 50 else "#dc2626")
flag_str = ""
if flags:
flag_str = (
f'<div style="font-size:10px;color:#94a3b8;margin-top:2px">'
f'{", ".join(flags[:4])}</div>'
)
rows.append(
f'<tr style="border-top:1px solid #e2e8f0">'
f'<td style="padding:6px 8px;color:#1e293b;font-size:11px">'
f'{name}{flag_str}</td>'
f'<td style="padding:6px 8px;color:#475569;font-size:11px">{category}</td>'
f'<td style="padding:6px 8px;color:#475569;font-size:11px">{country}</td>'
f'<td style="padding:6px 8px;text-align:center;color:#475569;font-size:11px">'
f'{n_cookies}</td>'
f'<td style="padding:6px 8px;text-align:center">{opt_status}</td>'
f'<td style="padding:6px 8px;text-align:center">{privacy_status}</td>'
f'<td style="padding:6px 8px;text-align:right;font-weight:600;'
f'color:{score_color};font-size:11px">{score}%</td>'
f'</tr>'
)
rt = (v.get("recipient_type") or "OTHER").upper()
by_type.setdefault(rt, []).append(v)
# Top summary
n_total = len(vendors)
n_critical = sum(1 for v in vendors if v.get("compliance_score", 0) < 50)
summary = (
@@ -297,15 +271,40 @@ def build_vvt_table_html(vendors: list[dict]) -> str:
if n_critical else " — alle ueber 50%")
)
return (
out: list[str] = [
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:760px;margin:0 auto 16px;padding:12px 16px;'
'background:#fafafa;border:1px solid #e5e7eb;border-radius:8px">'
'background:#fafafa;border:1px solid #e5e7eb;border-radius:8px">',
'<h3 style="margin:0 0 4px;font-size:14px;color:#334155">'
'VVT-Vorschlag: Drittanbieter aus Cookie-Richtlinie</h3>'
'VVT-Vorschlag: Drittanbieter aus Cookie-Richtlinie</h3>',
f'<p style="margin:0 0 10px;font-size:11px;color:#6b7280">{summary}. '
'Sortiert nach Compliance-Score (niedrig zuerst — diese Eintraege '
'pruefen).</p>'
'Gruppiert nach Empfaengerkategorie (Art. 30(1)(d) DSGVO), innerhalb '
'jeder Gruppe nach Compliance-Score sortiert.</p>',
]
for rtype, section_label in RECIPIENT_TYPE_SECTIONS:
rows = by_type.get(rtype) or []
if not rows:
continue
rows = sorted(rows, key=lambda v: v.get("compliance_score", 0))
n = len(rows)
n_bad = sum(1 for v in rows if v.get("compliance_score", 0) < 50)
bad_hint = (f' <span style="color:#dc2626">({n_bad} unter 50%)</span>'
if n_bad else "")
out.append(
f'<h4 style="margin:14px 0 4px;font-size:12px;color:#1e293b;'
f'border-top:1px solid #e2e8f0;padding-top:8px">'
f'{section_label} <span style="color:#94a3b8;font-weight:400">'
f'({n}){bad_hint}</span></h4>'
)
out.append(_render_vendor_section(rows))
out.append('</div>')
return "".join(out)
def _render_vendor_section(rows: list[dict]) -> str:
body: list[str] = [
'<table style="width:100%;border-collapse:collapse;font-size:11px">'
'<thead><tr style="background:#f1f5f9;color:#475569;text-align:left">'
'<th style="padding:5px 8px">Name</th>'
@@ -315,9 +314,50 @@ def build_vvt_table_html(vendors: list[dict]) -> str:
'<th style="padding:5px 8px;text-align:center">Opt-Out</th>'
'<th style="padding:5px 8px;text-align:center">Privacy</th>'
'<th style="padding:5px 8px;text-align:right">Score</th>'
'</tr></thead><tbody>'
+ "".join(rows)
+ '</tbody></table></div>'
'</tr></thead><tbody>',
]
for v in rows:
body.append(_render_vendor_row_full(v))
body.append('</tbody></table>')
return "".join(body)
def _render_vendor_row_full(v: dict) -> str:
name = v.get("name") or "Unbekannt"
category = _category_label(v.get("category", ""))
country = v.get("country") or ""
cookies = v.get("cookies") or []
n_cookies = len(cookies)
score = int(v.get("compliance_score", 0))
flags = v.get("compliance_flags") or []
opt_status = _link_status_badge(
v.get("opt_out_url"), v.get("opt_out_ok"), v.get("opt_out_status"),
)
privacy_status = _link_status_badge(
v.get("privacy_policy_url"), v.get("privacy_ok"),
v.get("privacy_status"),
)
score_color = ("#16a34a" if score >= 80 else
"#d97706" if score >= 50 else "#dc2626")
flag_str = ""
if flags:
flag_str = (
f'<div style="font-size:10px;color:#94a3b8;margin-top:2px">'
f'{", ".join(flags[:4])}</div>'
)
return (
f'<tr style="border-top:1px solid #e2e8f0">'
f'<td style="padding:6px 8px;color:#1e293b;font-size:11px">'
f'{name}{flag_str}</td>'
f'<td style="padding:6px 8px;color:#475569;font-size:11px">{category}</td>'
f'<td style="padding:6px 8px;color:#475569;font-size:11px">{country}</td>'
f'<td style="padding:6px 8px;text-align:center;color:#475569;font-size:11px">'
f'{n_cookies}</td>'
f'<td style="padding:6px 8px;text-align:center">{opt_status}</td>'
f'<td style="padding:6px 8px;text-align:center">{privacy_status}</td>'
f'<td style="padding:6px 8px;text-align:right;font-weight:600;'
f'color:{score_color};font-size:11px">{score}%</td>'
f'</tr>'
)
@@ -0,0 +1,151 @@
"""
Recipient-type classifier for vendor records (Art. 30(1)(d) DSGVO).
Tags each extracted vendor entry with one of the canonical
RecipientCategoryType values used by the VVT module:
- INTERNAL — owner's own department / own system (BMW AG processing
for itself, e.g. 'BMW AG — Form Validation')
- GROUP_COMPANY — parent/subsidiary/sister of the owner (BMW Bank,
BMW Motorrad, BMW Financial Services)
- PROCESSOR — external Auftragsverarbeiter under AVV (Adobe,
Akamai, AWS, Salesforce — they process on behalf)
- CONTROLLER — independent / joint controller (Meta Pixel, Google
YouTube — they run their own profiles)
- AUTHORITY — government bodies (rare in cookie contexts)
- OTHER — fallback
Heuristic only — does not query Vault or external sources. A site-owner
name is derived from the user-submitted URL (e.g. bmw.de -> 'BMW AG' or
'BMW'). Classification compares the vendor name to that owner name.
"""
from __future__ import annotations
import re
from urllib.parse import urlparse
# Known tracking/advertising platforms that typically act as INDEPENDENT
# or JOINT CONTROLLERS rather than processors. They build their own user
# profiles across many sites; the site owner has limited control over
# what they do with the data once collected.
_JOINT_CONTROLLER_HINTS = {
"meta", # Meta Pixel (Facebook/Instagram)
"facebook",
"instagram",
"google adverti", # Google Advertising
"google ads",
"youtube",
"doubleclick",
"linkedin insight",
"linkedin",
"tiktok",
"pinterest",
"twitter",
"x.com",
"snapchat",
"taboola",
"outbrain",
"criteo",
"amazon adverti", # Amazon Advertising (vs AWS)
"microsoft adverti",
"yandex",
"reddit",
"quora",
"spotify",
}
def owner_from_url(url: str) -> str:
"""Derive a short owner name from a URL.
bmw.de -> 'BMW', mercedes-benz.de -> 'Mercedes-Benz',
deutsche-bahn.de -> 'Deutsche-Bahn'. Used to detect the INTERNAL
case when a vendor record's provider name starts with or contains
this token.
"""
if not url or "://" not in url:
return ""
netloc = urlparse(url).netloc.lower()
if netloc.startswith("www."):
netloc = netloc[4:]
parts = netloc.split(".")
if len(parts) < 2:
return ""
sld = parts[-2] if len(parts) <= 2 else parts[-2] # bmw
# Acronym (<=4 chars, no hyphen) -> uppercase (BMW, ARD, ZDF)
if len(sld) <= 4 and "-" not in sld:
return sld.upper()
return "-".join(p.capitalize() for p in sld.split("-"))
def classify(
vendor_name: str,
category: str,
owner_name: str,
) -> str:
"""Return one of INTERNAL / GROUP_COMPANY / PROCESSOR / CONTROLLER / OTHER.
Args:
vendor_name: the provider/processing name as it appears in the
cookie policy (e.g. 'BMW AG — Form Validation' or 'Adobe Systems
Software Ireland Limited — Adobe Analytics').
category: canonical category ('marketing', 'necessary', 'statistics',
'functional'). Used to distinguish controller vs processor for ad
platforms.
owner_name: short token derived from the site URL ('BMW',
'Mercedes-Benz'). Empty string disables INTERNAL detection.
"""
name = (vendor_name or "").strip()
if not name:
return "OTHER"
lower = name.lower()
# 1. INTERNAL — owner processing for itself.
# Strict: provider must BE the owner's main legal entity:
# '<Owner> AG', '<Owner> SE', '<Owner> GmbH', '<Owner>' alone, or
# '<Owner> AG — <processing>' / '<Owner> SE — <processing>'.
if owner_name:
ow = owner_name.lower()
first_token = lower.split("", 1)[0].strip() # text before ' — '
if (first_token == ow
or first_token == f"{ow} ag"
or first_token == f"{ow} se"
or first_token == f"{ow} gmbh"
or first_token == f"{ow} ag & co. kg"):
return "INTERNAL"
# 2. GROUP_COMPANY — provider is in the owner's brand family but a
# different legal entity (BMW Bank GmbH, BMW Motorrad GmbH,
# BMW Financial Services).
if owner_name:
ow = owner_name.lower()
first_token = lower.split("", 1)[0].strip()
if first_token.startswith(f"{ow} ") and first_token != f"{ow} ag":
return "GROUP_COMPANY"
# 3. CONTROLLER — known tracking/ad platforms
if any(hint in lower for hint in _JOINT_CONTROLLER_HINTS):
return "CONTROLLER"
# 4. PROCESSOR — everything else with a corporate name is most likely
# an Auftragsverarbeiter (hosting/CDN/analytics/chat/captcha/CRM)
if any(suffix in lower for suffix in (
"gmbh", "ag ", " ag", "ag—", "ag ", "se ", "kg", "ohg",
"inc.", "inc ", "ltd", "limited", "llc", "corp", "b.v.",
"a/s", "s.a.", "s.l.", "s.r.l", "oy ", "ab ", "as ",
)):
return "PROCESSOR"
return "OTHER"
# Section ordering + display labels for the VVT email table
RECIPIENT_TYPE_SECTIONS = [
("INTERNAL", "Eigene Verarbeitung"),
("GROUP_COMPANY", "Konzernunternehmen (Mutter/Tochter)"),
("PROCESSOR", "Auftragsverarbeiter (AVV-pflichtig)"),
("CONTROLLER", "Eigenverantwortliche Dritte / Joint Controller"),
("AUTHORITY", "Behoerden"),
("OTHER", "Sonstige Empfaenger"),
]
@@ -42,11 +42,18 @@ def _clean(s: object) -> str:
return _WS_RE.sub(" ", no_tags).strip()
def extract_vendors_from_payloads(payloads: list[dict]) -> list[dict]:
def extract_vendors_from_payloads(
payloads: list[dict],
owner_name: str = "",
) -> list[dict]:
"""Walk every captured CMP payload, dispatch to per-CMP extractor.
Deduplicates vendors across payloads by name (preserves richer record).
Tags each vendor with `recipient_type` (Art. 30(1)(d) DSGVO) using
the owner_name to detect INTERNAL processing.
"""
from compliance.services.vendor_classifier import classify
all_vendors: dict[str, dict] = {}
for payload in payloads or []:
kind = payload.get("kind", "")
@@ -76,9 +83,13 @@ def extract_vendors_from_payloads(payloads: list[dict]) -> list[dict]:
name = (v.get("name") or "").strip()
if not name:
continue
v["recipient_type"] = classify(
vendor_name=name,
category=v.get("category", ""),
owner_name=owner_name,
)
existing = all_vendors.get(name)
if existing:
# Merge cookies + fill empty fields
for k, v_val in v.items():
if not existing.get(k) and v_val:
existing[k] = v_val