Files
breakpilot-compliance/backend-compliance/compliance/services/vendor_extractor.py
T
Benjamin Admin fab1e35847 feat(vvt): recipient-type classification + 3-section VVT table
Per user request: BMW (and others) put their own services AND external
vendors in the same cookie-policy widget. The VVT-Tabelle now groups
them by Art. 30(1)(d) DSGVO recipient category so the DSB can act on
the right buckets:

  - INTERNAL      — owner processing for itself ('BMW AG — XYZ')
  - GROUP_COMPANY — same brand family, different legal entity ('BMW Bank')
  - PROCESSOR     — Auftragsverarbeiter, AVV-pflichtig (Adobe, Akamai)
  - CONTROLLER    — independent / joint controller (Meta Pixel, Google
                    Ads, LinkedIn — they run their own profiles)
  - AUTHORITY     — government bodies (rare in cookies)
  - OTHER         — fallback

New module vendor_classifier.py:
- owner_from_url(url) — derive site-owner token (bmw.de -> 'BMW',
  mercedes-benz.de -> 'Mercedes-Benz')
- classify(name, category, owner) — strict 5-tier heuristic:
  * INTERNAL: vendor name first-token is '<Owner>' / '<Owner> AG' /
    '<Owner> SE' / '<Owner> GmbH' / '<Owner> AG & Co. KG'
  * GROUP_COMPANY: starts with '<Owner> ' but isn't '<Owner> AG'
  * CONTROLLER: matches a known joint-controller list (Meta, Google
    Ads, YouTube, LinkedIn Insight, TikTok, Pinterest, Taboola,
    Outbrain, Criteo, Twitter, Reddit, ...)
  * PROCESSOR: legal-form suffix in name (GmbH, AG, Inc., A/S,
    B.V., S.A., Ltd., LLC, ...)
  * OTHER: anything else

vendor_extractor.extract_vendors_from_payloads now takes owner_name:
- Passes it through to classify() for every extracted vendor record
- The route derives owner_name via _company_name_from_url(doc_entries)
- LLM-extracted vendors are classified the same way (so V3 fallback
  also produces tagged records)

agent_doc_check_extras.build_vvt_table_html rewritten:
- Buckets vendors by recipient_type
- Renders one section per non-empty bucket, in canonical order
  (RECIPIENT_TYPE_SECTIONS), each with section header + count + bad
  count + nested table
- Within each section: sorted by compliance_score ascending
- Response JSON cmp_vendors includes recipient_type so the frontend
  can later import per-category into the VVT module

Expected BMW result: ~60 INTERNAL rows (BMW AG own services),
~25 PROCESSOR rows (Adobe, Adform, Akamai, AWS, ...), ~5 CONTROLLER
rows (Meta Pixel, Google, LinkedIn, Pinterest, Outbrain, Taboola).
2026-05-17 12:31:49 +02:00

371 lines
16 KiB
Python

"""
Vendor record extraction from captured CMP payloads.
Mirrors the per-CMP `extract_vendors()` functions in consent-tester's
cmp_library/ — duplicated here because the backend cannot import the
consent-tester package (different containers). Schemas are stable per CMP
vendor, so this is acceptable. When a new CMP is added in consent-tester,
add the matching extractor here.
Returned vendor record schema:
{
"name": str, # e.g. "Adobe Systems Software Ireland Limited"
"country": str, # ISO 2-letter (DE/US/...) when known
"purpose": str, # short description of what they do
"category": str, # marketing/analytics/functional/necessary
"opt_out_url": str, # link to opt out (Art. 7(3) DSGVO)
"privacy_policy_url": str, # link to vendor's privacy policy
"persistence": str, # human-readable retention text
"cookies": [ # cookies this vendor sets
{"name": str, "purpose": str, "expiry": str, "is_third_party": bool}
],
# Compliance scoring (filled after vendor_compliance.evaluate())
"compliance_score": int, # 0-100
"compliance_flags": list[str], # e.g. ["no_opt_out", "broken_opt_out"]
}
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")
def _clean(s: object) -> str:
text = "" if s is None else str(s)
no_tags = _TAG_RE.sub(" ", text)
return _WS_RE.sub(" ", no_tags).strip()
def extract_vendors_from_payloads(
payloads: list[dict],
owner_name: str = "",
) -> list[dict]:
"""Walk every captured CMP payload, dispatch to per-CMP extractor.
Deduplicates vendors across payloads by name (preserves richer record).
Tags each vendor with `recipient_type` (Art. 30(1)(d) DSGVO) using
the owner_name to detect INTERNAL processing.
"""
from compliance.services.vendor_classifier import classify
all_vendors: dict[str, dict] = {}
for payload in payloads or []:
kind = payload.get("kind", "")
data = payload.get("data", {})
if not isinstance(data, dict):
continue
try:
if kind == "epaas":
vendors = _extract_epaas(data)
elif kind == "onetrust":
vendors = _extract_onetrust(data)
elif kind == "cookiebot":
vendors = _extract_cookiebot(data)
elif kind == "usercentrics":
vendors = _extract_usercentrics(data)
elif kind == "didomi":
vendors = _extract_didomi(data)
elif kind == "trustarc":
vendors = _extract_trustarc(data)
else:
# Generic fallback: walk data for vendor-like dicts
vendors = _extract_generic(data)
except Exception as e:
logger.warning("vendor extractor failed for %s: %s", kind, e)
continue
for v in vendors:
name = (v.get("name") or "").strip()
if not name:
continue
v["recipient_type"] = classify(
vendor_name=name,
category=v.get("category", ""),
owner_name=owner_name,
)
existing = all_vendors.get(name)
if existing:
for k, v_val in v.items():
if not existing.get(k) and v_val:
existing[k] = v_val
existing.setdefault("cookies", []).extend(v.get("cookies", []))
else:
all_vendors[name] = v
return list(all_vendors.values())
# ── ePaaS (BMW Group) ───────────────────────────────────────────────
# Maps ePaaS categoryId -> canonical category used by the VVT scorer.
_EPAAS_CATEGORY_MAP = {
"advertising": "marketing",
"marketing": "marketing",
"strictlyNecessary": "necessary",
"necessary": "necessary",
"statistics": "statistics",
"functional": "functional",
}
def _extract_epaas(d: dict) -> list[dict]:
"""Convert ePaaS payload into one row per *processing* (not provider).
ePaaS schema (BMW):
providers[].processings[].persistences[]
provider: {id, name, description}
processing: {id, name, description, categoryId, optOutLink,
privacyPolicyLink, persistences}
persistence: {id, name, domain, type, expiry, description}
Each processing is a separate displayable unit in the cookie widget
(Adobe Analytics, Adobe Campaign, Adobe Target Personalisation, …) —
matching the website layout one-to-one in the VVT table. Provider name
becomes the prefix so the data-controller entity is visible.
"""
out: list[dict] = []
for provider in d.get("providers", []) or []:
provider_name = provider.get("name") or provider.get("id") or ""
provider_desc = _clean(provider.get("description"))
for processing in provider.get("processings", []) or []:
name = (processing.get("name") or processing.get("id")
or provider_name)
purpose = _clean(processing.get("description")
or processing.get("name") or provider_desc)
cat_raw = processing.get("categoryId", "")
category = _EPAAS_CATEGORY_MAP.get(cat_raw, cat_raw or "")
cookies: list[dict] = []
for c in processing.get("persistences", []) or []:
cookies.append({
"name": c.get("name") or c.get("id") or "",
"purpose": _clean(c.get("description")),
"expiry": _clean(c.get("expiry")),
"is_third_party": True,
})
display_name = (f"{provider_name}{name}"
if name and name != provider_name
else (provider_name or name))
out.append({
"name": display_name,
"country": "", # ePaaS doesn't surface vendor country
"purpose": purpose,
"category": category,
"opt_out_url": (processing.get("optOutLink") or "").strip(),
"privacy_policy_url": (processing.get("privacyPolicyLink")
or "").strip(),
"persistence": "",
"cookies": cookies,
})
return out
# ── OneTrust ────────────────────────────────────────────────────────
def _extract_onetrust(d: dict) -> list[dict]:
out_by_name: dict[str, dict] = {}
for g in d.get("Groups") or d.get("groups") or []:
category = g.get("GroupName") or g.get("name") or ""
for c in g.get("Cookies") or g.get("cookies") or []:
provider = (c.get("Provider") or c.get("provider")
or c.get("Host") or c.get("host") or "").strip()
if not provider:
continue
cookie_entry = {
"name": c.get("Name") or c.get("name") or "",
"purpose": _clean(c.get("description") or c.get("Description")),
"expiry": _clean(c.get("Length") or c.get("expires")),
"is_third_party": bool(c.get("IsThirdParty") or c.get("isThirdParty")),
}
if provider in out_by_name:
out_by_name[provider]["cookies"].append(cookie_entry)
else:
out_by_name[provider] = {
"name": provider,
"country": "",
"purpose": _clean(g.get("GroupDescription") or c.get("description")),
"category": category,
"opt_out_url": "",
"privacy_policy_url": (c.get("PolicyUrl") or c.get("policyUrl") or ""),
"persistence": "",
"cookies": [cookie_entry],
}
return list(out_by_name.values())
# ── Cookiebot ───────────────────────────────────────────────────────
def _extract_cookiebot(d: dict) -> list[dict]:
"""Cookiebot stores 'Categories[*].Cookies[*]' with Vendor/Host."""
out: dict[str, dict] = {}
for cat in d.get("Categories") or d.get("categories") or []:
category = cat.get("Name") or cat.get("name") or ""
for c in cat.get("Cookies") or cat.get("cookies") or []:
provider = (c.get("Vendor") or c.get("vendor")
or c.get("Host") or c.get("host") or "").strip()
if not provider:
continue
cookie = {
"name": c.get("Name") or c.get("name") or "",
"purpose": _clean(c.get("Purpose") or c.get("purpose")),
"expiry": _clean(c.get("Expires") or c.get("expires")),
"is_third_party": bool(c.get("IsThirdParty")),
}
if provider in out:
out[provider]["cookies"].append(cookie)
else:
out[provider] = {
"name": provider,
"country": "",
"purpose": _clean(c.get("Purpose") or category),
"category": category,
"opt_out_url": "",
"privacy_policy_url": (c.get("PrivacyPolicyUrl")
or c.get("policyUrl") or ""),
"persistence": "",
"cookies": [cookie],
}
return list(out.values())
# ── Usercentrics ────────────────────────────────────────────────────
def _extract_usercentrics(d: dict) -> list[dict]:
"""Usercentrics 'services' / 'dataProcessingServices' shape."""
out: list[dict] = []
services = (d.get("services") or d.get("dataProcessingServices")
or (d.get("settings") or {}).get("services") or [])
for s in services:
name = s.get("name") or s.get("dataProcessor") or ""
if not name:
continue
max_age = s.get("cookieMaxAgeSeconds")
persistence = ""
if isinstance(max_age, int) and max_age > 0:
persistence = f"{max_age // 86400} Tage"
out.append({
"name": name,
"country": (s.get("processingCompanyCountry")
or s.get("country") or "").strip(),
"purpose": _clean(s.get("dataPurpose") or s.get("description")),
"category": (s.get("categorySlug") or s.get("category") or "").strip(),
"opt_out_url": (s.get("optOutUrl") or "").strip(),
"privacy_policy_url": (s.get("policyOfProcessorUrl")
or s.get("urls", {}).get("privacyPolicy", "")
or "").strip(),
"persistence": persistence or _clean(s.get("retentionPeriodDescription")),
"cookies": [],
})
return out
# ── Didomi ──────────────────────────────────────────────────────────
def _extract_didomi(d: dict) -> list[dict]:
"""Didomi 'app.vendors[]' with name, country, policyUrl."""
out: list[dict] = []
app = d.get("app", d) or {}
for v in app.get("vendors") or d.get("vendors") or []:
name = v.get("name") or ""
if not name:
continue
out.append({
"name": name,
"country": (v.get("country") or "").strip(),
"purpose": _clean(v.get("description") or v.get("purpose")),
"category": (v.get("category") or "").strip(),
"opt_out_url": (v.get("optOutUrl") or "").strip(),
"privacy_policy_url": (v.get("policyUrl") or v.get("policy_url")
or "").strip(),
"persistence": "",
"cookies": [],
})
return out
# ── TrustArc ────────────────────────────────────────────────────────
def _extract_trustarc(d: dict) -> list[dict]:
"""TrustArc 'vendors[]' or per-category 'Cookies' with provider."""
out_by_name: dict[str, dict] = {}
# vendors
for v in d.get("vendors") or d.get("Vendors") or []:
name = v.get("name") or v.get("Name") or ""
if not name:
continue
out_by_name[name] = {
"name": name,
"country": (v.get("country") or "").strip(),
"purpose": _clean(v.get("description") or v.get("Description")),
"category": (v.get("category") or "").strip(),
"opt_out_url": (v.get("optOutUrl") or "").strip(),
"privacy_policy_url": (v.get("policyUrl") or "").strip(),
"persistence": "",
"cookies": [],
}
# cookies per category
for cat in d.get("categories") or d.get("Categories") or []:
cat_name = cat.get("name") or cat.get("Name") or ""
for c in cat.get("cookies") or cat.get("Cookies") or []:
provider = c.get("provider") or c.get("Provider") or ""
if not provider:
continue
cookie = {
"name": c.get("name") or c.get("Name") or "",
"purpose": _clean(c.get("purpose") or c.get("Purpose")),
"expiry": _clean(c.get("expires") or c.get("Expires")),
"is_third_party": True,
}
if provider in out_by_name:
out_by_name[provider]["cookies"].append(cookie)
else:
out_by_name[provider] = {
"name": provider, "country": "", "purpose": "",
"category": cat_name, "opt_out_url": "",
"privacy_policy_url": "", "persistence": "",
"cookies": [cookie],
}
return list(out_by_name.values())
# ── Generic fallback (other CMPs / heuristic captures) ──────────────
def _extract_generic(d: dict) -> list[dict]:
"""Best-effort walk for unknown CMP shapes.
Looks for top-level keys named 'vendors' / 'providers' / 'services' and
extracts name/purpose/country fields from each entry.
"""
out: list[dict] = []
for key in ("vendors", "providers", "services", "dataProcessingServices",
"Vendors", "Providers"):
lst = d.get(key)
if not isinstance(lst, list):
continue
for entry in lst:
if not isinstance(entry, dict):
continue
name = (entry.get("name") or entry.get("vendor")
or entry.get("dataProcessor") or "").strip()
if not name:
continue
out.append({
"name": name,
"country": (entry.get("country") or "").strip(),
"purpose": _clean(entry.get("purpose") or entry.get("description")
or entry.get("dataPurpose")),
"category": (entry.get("category") or "").strip(),
"opt_out_url": (entry.get("optOutUrl") or entry.get("opt_out_url")
or "").strip(),
"privacy_policy_url": (entry.get("policyUrl") or entry.get("privacyPolicyUrl")
or entry.get("privacy_policy_url") or "").strip(),
"persistence": _clean(entry.get("retentionPeriodDescription")),
"cookies": [],
})
return out