""" Vendor record extraction from captured CMP payloads. Mirrors the per-CMP `extract_vendors()` functions in consent-tester's cmp_library/ — duplicated here because the backend cannot import the consent-tester package (different containers). Schemas are stable per CMP vendor, so this is acceptable. When a new CMP is added in consent-tester, add the matching extractor here. Returned vendor record schema: { "name": str, # e.g. "Adobe Systems Software Ireland Limited" "country": str, # ISO 2-letter (DE/US/...) when known "purpose": str, # short description of what they do "category": str, # marketing/analytics/functional/necessary "opt_out_url": str, # link to opt out (Art. 7(3) DSGVO) "privacy_policy_url": str, # link to vendor's privacy policy "persistence": str, # human-readable retention text "cookies": [ # cookies this vendor sets {"name": str, "purpose": str, "expiry": str, "is_third_party": bool} ], # Compliance scoring (filled after vendor_compliance.evaluate()) "compliance_score": int, # 0-100 "compliance_flags": list[str], # e.g. ["no_opt_out", "broken_opt_out"] } """ from __future__ import annotations import logging import re logger = logging.getLogger(__name__) _TAG_RE = re.compile(r"<[^>]+>") _WS_RE = re.compile(r"\s+") def _clean(s: object) -> str: text = "" if s is None else str(s) no_tags = _TAG_RE.sub(" ", text) return _WS_RE.sub(" ", no_tags).strip() def extract_vendors_from_payloads(payloads: list[dict]) -> list[dict]: """Walk every captured CMP payload, dispatch to per-CMP extractor. Deduplicates vendors across payloads by name (preserves richer record). """ all_vendors: dict[str, dict] = {} for payload in payloads or []: kind = payload.get("kind", "") data = payload.get("data", {}) if not isinstance(data, dict): continue try: if kind == "epaas": vendors = _extract_epaas(data) elif kind == "onetrust": vendors = _extract_onetrust(data) else: # Generic fallback: walk data for vendor-like dicts vendors = _extract_generic(data) except Exception as e: logger.warning("vendor extractor failed for %s: %s", kind, e) continue for v in vendors: name = (v.get("name") or "").strip() if not name: continue existing = all_vendors.get(name) if existing: # Merge cookies + fill empty fields for k, v_val in v.items(): if not existing.get(k) and v_val: existing[k] = v_val existing.setdefault("cookies", []).extend(v.get("cookies", [])) else: all_vendors[name] = v return list(all_vendors.values()) # ── ePaaS (BMW Group) ─────────────────────────────────────────────── def _extract_epaas(d: dict) -> list[dict]: out: list[dict] = [] providers = d.get("providers", []) or [] cookies_by_provider: dict[str, list[dict]] = {} for c in d.get("cookies", []) or []: pid = str(c.get("providerId") or c.get("provider") or c.get("vendor") or "") if pid: cookies_by_provider.setdefault(pid, []).append({ "name": c.get("name") or c.get("id") or "", "purpose": _clean(c.get("purpose") or c.get("description")), "expiry": _clean(c.get("expiry") or c.get("retention") or c.get("persistence")), "is_third_party": bool(c.get("isThirdParty") or c.get("third_party")), }) for p in providers: pid = str(p.get("id") or p.get("vendorId") or p.get("name") or "") cookies = cookies_by_provider.get(pid, []) or [{ "name": c.get("name", ""), "purpose": _clean(c.get("purpose")), "expiry": _clean(c.get("expiry") or c.get("persistence")), "is_third_party": True, } for c in (p.get("cookies", []) or [])] out.append({ "name": p.get("name") or pid or "", "country": (p.get("country") or "").strip(), "purpose": _clean(p.get("purpose")), "category": (p.get("category") or "").strip(), "opt_out_url": (p.get("optOutUrl") or p.get("optoutUrl") or p.get("opt_out_url") or "").strip(), "privacy_policy_url": (p.get("policyUrl") or p.get("policy_url") or p.get("privacyPolicyUrl") or "").strip(), "persistence": _clean(p.get("persistencePurposeDescription")), "cookies": cookies, }) return out # ── OneTrust ──────────────────────────────────────────────────────── def _extract_onetrust(d: dict) -> list[dict]: out_by_name: dict[str, dict] = {} for g in d.get("Groups") or d.get("groups") or []: category = g.get("GroupName") or g.get("name") or "" for c in g.get("Cookies") or g.get("cookies") or []: provider = (c.get("Provider") or c.get("provider") or c.get("Host") or c.get("host") or "").strip() if not provider: continue cookie_entry = { "name": c.get("Name") or c.get("name") or "", "purpose": _clean(c.get("description") or c.get("Description")), "expiry": _clean(c.get("Length") or c.get("expires")), "is_third_party": bool(c.get("IsThirdParty") or c.get("isThirdParty")), } if provider in out_by_name: out_by_name[provider]["cookies"].append(cookie_entry) else: out_by_name[provider] = { "name": provider, "country": "", "purpose": _clean(g.get("GroupDescription") or c.get("description")), "category": category, "opt_out_url": "", "privacy_policy_url": (c.get("PolicyUrl") or c.get("policyUrl") or ""), "persistence": "", "cookies": [cookie_entry], } return list(out_by_name.values()) # ── Generic fallback (other CMPs / heuristic captures) ────────────── def _extract_generic(d: dict) -> list[dict]: """Best-effort walk for unknown CMP shapes. Looks for top-level keys named 'vendors' / 'providers' / 'services' and extracts name/purpose/country fields from each entry. """ out: list[dict] = [] for key in ("vendors", "providers", "services", "dataProcessingServices", "Vendors", "Providers"): lst = d.get(key) if not isinstance(lst, list): continue for entry in lst: if not isinstance(entry, dict): continue name = (entry.get("name") or entry.get("vendor") or entry.get("dataProcessor") or "").strip() if not name: continue out.append({ "name": name, "country": (entry.get("country") or "").strip(), "purpose": _clean(entry.get("purpose") or entry.get("description") or entry.get("dataPurpose")), "category": (entry.get("category") or "").strip(), "opt_out_url": (entry.get("optOutUrl") or entry.get("opt_out_url") or "").strip(), "privacy_policy_url": (entry.get("policyUrl") or entry.get("privacyPolicyUrl") or entry.get("privacy_policy_url") or "").strip(), "persistence": _clean(entry.get("retentionPeriodDescription")), "cookies": [], }) return out