""" Vendor record extraction from captured CMP payloads. Mirrors the per-CMP `extract_vendors()` functions in consent-tester's cmp_library/ — duplicated here because the backend cannot import the consent-tester package (different containers). Schemas are stable per CMP vendor, so this is acceptable. When a new CMP is added in consent-tester, add the matching extractor here. Returned vendor record schema: { "name": str, # e.g. "Adobe Systems Software Ireland Limited" "country": str, # ISO 2-letter (DE/US/...) when known "purpose": str, # short description of what they do "category": str, # marketing/analytics/functional/necessary "opt_out_url": str, # link to opt out (Art. 7(3) DSGVO) "privacy_policy_url": str, # link to vendor's privacy policy "persistence": str, # human-readable retention text "cookies": [ # cookies this vendor sets {"name": str, "purpose": str, "expiry": str, "is_third_party": bool} ], # Compliance scoring (filled after vendor_compliance.evaluate()) "compliance_score": int, # 0-100 "compliance_flags": list[str], # e.g. ["no_opt_out", "broken_opt_out"] } """ from __future__ import annotations import logging import re logger = logging.getLogger(__name__) _TAG_RE = re.compile(r"<[^>]+>") _WS_RE = re.compile(r"\s+") def _clean(s: object) -> str: text = "" if s is None else str(s) no_tags = _TAG_RE.sub(" ", text) return _WS_RE.sub(" ", no_tags).strip() def extract_vendors_from_payloads(payloads: list[dict]) -> list[dict]: """Walk every captured CMP payload, dispatch to per-CMP extractor. Deduplicates vendors across payloads by name (preserves richer record). """ all_vendors: dict[str, dict] = {} for payload in payloads or []: kind = payload.get("kind", "") data = payload.get("data", {}) if not isinstance(data, dict): continue try: if kind == "epaas": vendors = _extract_epaas(data) elif kind == "onetrust": vendors = _extract_onetrust(data) elif kind == "cookiebot": vendors = _extract_cookiebot(data) elif kind == "usercentrics": vendors = _extract_usercentrics(data) elif kind == "didomi": vendors = _extract_didomi(data) elif kind == "trustarc": vendors = _extract_trustarc(data) else: # Generic fallback: walk data for vendor-like dicts vendors = _extract_generic(data) except Exception as e: logger.warning("vendor extractor failed for %s: %s", kind, e) continue for v in vendors: name = (v.get("name") or "").strip() if not name: continue existing = all_vendors.get(name) if existing: # Merge cookies + fill empty fields for k, v_val in v.items(): if not existing.get(k) and v_val: existing[k] = v_val existing.setdefault("cookies", []).extend(v.get("cookies", [])) else: all_vendors[name] = v return list(all_vendors.values()) # ── ePaaS (BMW Group) ─────────────────────────────────────────────── # Maps ePaaS categoryId -> canonical category used by the VVT scorer. _EPAAS_CATEGORY_MAP = { "advertising": "marketing", "marketing": "marketing", "strictlyNecessary": "necessary", "necessary": "necessary", "statistics": "statistics", "functional": "functional", } def _extract_epaas(d: dict) -> list[dict]: """Convert ePaaS payload into one row per *processing* (not provider). ePaaS schema (BMW): providers[].processings[].persistences[] provider: {id, name, description} processing: {id, name, description, categoryId, optOutLink, privacyPolicyLink, persistences} persistence: {id, name, domain, type, expiry, description} Each processing is a separate displayable unit in the cookie widget (Adobe Analytics, Adobe Campaign, Adobe Target Personalisation, …) — matching the website layout one-to-one in the VVT table. Provider name becomes the prefix so the data-controller entity is visible. """ out: list[dict] = [] for provider in d.get("providers", []) or []: provider_name = provider.get("name") or provider.get("id") or "" provider_desc = _clean(provider.get("description")) for processing in provider.get("processings", []) or []: name = (processing.get("name") or processing.get("id") or provider_name) purpose = _clean(processing.get("description") or processing.get("name") or provider_desc) cat_raw = processing.get("categoryId", "") category = _EPAAS_CATEGORY_MAP.get(cat_raw, cat_raw or "") cookies: list[dict] = [] for c in processing.get("persistences", []) or []: cookies.append({ "name": c.get("name") or c.get("id") or "", "purpose": _clean(c.get("description")), "expiry": _clean(c.get("expiry")), "is_third_party": True, }) display_name = (f"{provider_name} — {name}" if name and name != provider_name else (provider_name or name)) out.append({ "name": display_name, "country": "", # ePaaS doesn't surface vendor country "purpose": purpose, "category": category, "opt_out_url": (processing.get("optOutLink") or "").strip(), "privacy_policy_url": (processing.get("privacyPolicyLink") or "").strip(), "persistence": "", "cookies": cookies, }) return out # ── OneTrust ──────────────────────────────────────────────────────── def _extract_onetrust(d: dict) -> list[dict]: out_by_name: dict[str, dict] = {} for g in d.get("Groups") or d.get("groups") or []: category = g.get("GroupName") or g.get("name") or "" for c in g.get("Cookies") or g.get("cookies") or []: provider = (c.get("Provider") or c.get("provider") or c.get("Host") or c.get("host") or "").strip() if not provider: continue cookie_entry = { "name": c.get("Name") or c.get("name") or "", "purpose": _clean(c.get("description") or c.get("Description")), "expiry": _clean(c.get("Length") or c.get("expires")), "is_third_party": bool(c.get("IsThirdParty") or c.get("isThirdParty")), } if provider in out_by_name: out_by_name[provider]["cookies"].append(cookie_entry) else: out_by_name[provider] = { "name": provider, "country": "", "purpose": _clean(g.get("GroupDescription") or c.get("description")), "category": category, "opt_out_url": "", "privacy_policy_url": (c.get("PolicyUrl") or c.get("policyUrl") or ""), "persistence": "", "cookies": [cookie_entry], } return list(out_by_name.values()) # ── Cookiebot ─────────────────────────────────────────────────────── def _extract_cookiebot(d: dict) -> list[dict]: """Cookiebot stores 'Categories[*].Cookies[*]' with Vendor/Host.""" out: dict[str, dict] = {} for cat in d.get("Categories") or d.get("categories") or []: category = cat.get("Name") or cat.get("name") or "" for c in cat.get("Cookies") or cat.get("cookies") or []: provider = (c.get("Vendor") or c.get("vendor") or c.get("Host") or c.get("host") or "").strip() if not provider: continue cookie = { "name": c.get("Name") or c.get("name") or "", "purpose": _clean(c.get("Purpose") or c.get("purpose")), "expiry": _clean(c.get("Expires") or c.get("expires")), "is_third_party": bool(c.get("IsThirdParty")), } if provider in out: out[provider]["cookies"].append(cookie) else: out[provider] = { "name": provider, "country": "", "purpose": _clean(c.get("Purpose") or category), "category": category, "opt_out_url": "", "privacy_policy_url": (c.get("PrivacyPolicyUrl") or c.get("policyUrl") or ""), "persistence": "", "cookies": [cookie], } return list(out.values()) # ── Usercentrics ──────────────────────────────────────────────────── def _extract_usercentrics(d: dict) -> list[dict]: """Usercentrics 'services' / 'dataProcessingServices' shape.""" out: list[dict] = [] services = (d.get("services") or d.get("dataProcessingServices") or (d.get("settings") or {}).get("services") or []) for s in services: name = s.get("name") or s.get("dataProcessor") or "" if not name: continue max_age = s.get("cookieMaxAgeSeconds") persistence = "" if isinstance(max_age, int) and max_age > 0: persistence = f"{max_age // 86400} Tage" out.append({ "name": name, "country": (s.get("processingCompanyCountry") or s.get("country") or "").strip(), "purpose": _clean(s.get("dataPurpose") or s.get("description")), "category": (s.get("categorySlug") or s.get("category") or "").strip(), "opt_out_url": (s.get("optOutUrl") or "").strip(), "privacy_policy_url": (s.get("policyOfProcessorUrl") or s.get("urls", {}).get("privacyPolicy", "") or "").strip(), "persistence": persistence or _clean(s.get("retentionPeriodDescription")), "cookies": [], }) return out # ── Didomi ────────────────────────────────────────────────────────── def _extract_didomi(d: dict) -> list[dict]: """Didomi 'app.vendors[]' with name, country, policyUrl.""" out: list[dict] = [] app = d.get("app", d) or {} for v in app.get("vendors") or d.get("vendors") or []: name = v.get("name") or "" if not name: continue out.append({ "name": name, "country": (v.get("country") or "").strip(), "purpose": _clean(v.get("description") or v.get("purpose")), "category": (v.get("category") or "").strip(), "opt_out_url": (v.get("optOutUrl") or "").strip(), "privacy_policy_url": (v.get("policyUrl") or v.get("policy_url") or "").strip(), "persistence": "", "cookies": [], }) return out # ── TrustArc ──────────────────────────────────────────────────────── def _extract_trustarc(d: dict) -> list[dict]: """TrustArc 'vendors[]' or per-category 'Cookies' with provider.""" out_by_name: dict[str, dict] = {} # vendors for v in d.get("vendors") or d.get("Vendors") or []: name = v.get("name") or v.get("Name") or "" if not name: continue out_by_name[name] = { "name": name, "country": (v.get("country") or "").strip(), "purpose": _clean(v.get("description") or v.get("Description")), "category": (v.get("category") or "").strip(), "opt_out_url": (v.get("optOutUrl") or "").strip(), "privacy_policy_url": (v.get("policyUrl") or "").strip(), "persistence": "", "cookies": [], } # cookies per category for cat in d.get("categories") or d.get("Categories") or []: cat_name = cat.get("name") or cat.get("Name") or "" for c in cat.get("cookies") or cat.get("Cookies") or []: provider = c.get("provider") or c.get("Provider") or "" if not provider: continue cookie = { "name": c.get("name") or c.get("Name") or "", "purpose": _clean(c.get("purpose") or c.get("Purpose")), "expiry": _clean(c.get("expires") or c.get("Expires")), "is_third_party": True, } if provider in out_by_name: out_by_name[provider]["cookies"].append(cookie) else: out_by_name[provider] = { "name": provider, "country": "", "purpose": "", "category": cat_name, "opt_out_url": "", "privacy_policy_url": "", "persistence": "", "cookies": [cookie], } return list(out_by_name.values()) # ── Generic fallback (other CMPs / heuristic captures) ────────────── def _extract_generic(d: dict) -> list[dict]: """Best-effort walk for unknown CMP shapes. Looks for top-level keys named 'vendors' / 'providers' / 'services' and extracts name/purpose/country fields from each entry. """ out: list[dict] = [] for key in ("vendors", "providers", "services", "dataProcessingServices", "Vendors", "Providers"): lst = d.get(key) if not isinstance(lst, list): continue for entry in lst: if not isinstance(entry, dict): continue name = (entry.get("name") or entry.get("vendor") or entry.get("dataProcessor") or "").strip() if not name: continue out.append({ "name": name, "country": (entry.get("country") or "").strip(), "purpose": _clean(entry.get("purpose") or entry.get("description") or entry.get("dataPurpose")), "category": (entry.get("category") or "").strip(), "opt_out_url": (entry.get("optOutUrl") or entry.get("opt_out_url") or "").strip(), "privacy_policy_url": (entry.get("policyUrl") or entry.get("privacyPolicyUrl") or entry.get("privacy_policy_url") or "").strip(), "persistence": _clean(entry.get("retentionPeriodDescription")), "cookies": [], }) return out