diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index b9823792..08e6c183 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -171,12 +171,13 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): pct = int(1 + (i / n_docs) * 29) _update(check_id, f"Texte laden {i+1}/{n_docs}: {doc.doc_type}...", pct) text = doc.text + cmp_payloads: list[dict] = [] if not text and doc.url: url_key = doc.url.strip().rstrip("/").lower() if url_key in url_text_cache: text = url_text_cache[url_key] else: - text = await _fetch_text(doc.url, doc_type=doc.doc_type) + text, cmp_payloads = await _fetch_text(doc.url, doc_type=doc.doc_type) if text: url_text_cache[url_key] = text if text: @@ -188,6 +189,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): "word_count": len(text.split()) if text else 0, "auto_discovered": False, "discovery_attempted": False, + "cmp_payloads": cmp_payloads, }) # Step 1a-bis: AUTO-DISCOVERY. For each canonical doc_type the user @@ -367,14 +369,42 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): build_scanned_urls_html, build_provider_list_html, ) + from .agent_doc_check_extras import build_vvt_table_html + + # Extract structured vendor records from any CMP payloads captured + # for the cookie doc (BMW ePaaS, OneTrust, etc.), validate their + # opt-out + privacy URLs concurrently, score each entry. + cmp_vendors: list[dict] = [] + try: + from compliance.services.vendor_extractor import ( + extract_vendors_from_payloads, + ) + from compliance.services.cookie_link_validator import ( + validate_vendor_urls, score_vendors, + ) + cookie_payloads = [] + for e in doc_entries: + if e.get("doc_type") == "cookie" and e.get("cmp_payloads"): + cookie_payloads.extend(e["cmp_payloads"]) + if cookie_payloads: + cmp_vendors = extract_vendors_from_payloads(cookie_payloads) + if cmp_vendors: + logger.info("VVT: %d vendors extracted, validating links", + len(cmp_vendors)) + cmp_vendors = await validate_vendor_urls(cmp_vendors) + cmp_vendors = score_vendors(cmp_vendors) + except Exception as e: + logger.warning("VVT vendor extraction skipped: %s", e) + summary_html = build_management_summary(results) scanned_html = build_scanned_urls_html(doc_entries) providers_html = build_provider_list_html(banner_result, vvt_entries) + vvt_html = build_vvt_table_html(cmp_vendors) report_html = build_html_report(results, None) profile_html = _build_profile_html(profile) full_html = ( summary_html + scanned_html + profile_html - + providers_html + report_html + + providers_html + vvt_html + report_html ) # Step 6: Send email — derive site name primarily from entered URL. @@ -404,6 +434,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): "tcf_vendor_count": len(tcf_vendors), } if banner_result else None, "tcf_vendors": vvt_entries if tcf_vendors else [], + "cmp_vendors": cmp_vendors, "total_documents": len(results), "total_findings": total_findings, "email_status": email_result.get("status", "failed"), @@ -428,15 +459,13 @@ def _update(check_id: str, msg: str, pct: int | None = None): job["progress_pct"] = max(0, min(100, int(pct))) -async def _fetch_text(url: str, doc_type: str = "") -> str: +async def _fetch_text(url: str, doc_type: str = "") -> tuple[str, list[dict]]: """Fetch text from URL via consent-tester, with HTTP fallback. - 1. Try consent-tester (Playwright) — handles JS-heavy SPAs - 2. Fallback: direct HTTP fetch + HTML strip — fast, works for SSR pages - - doc_type controls how aggressively we follow sub-links — cookie/dse - pages prefer self-extract only (CMP capture is authoritative); legal/ - imprint pages need to follow sub-pages (Versicherungsvermittler etc). + Returns (text, cmp_payloads). cmp_payloads is the raw CMP JSON captured + during navigation (ePaaS, OneTrust, …) — empty when no CMP fired or + HTTP fallback was used. Backend turns payloads into structured vendor + records for the VVT table in the email. """ # 1. Consent-tester (Playwright-based, full JS rendering). # max_documents depends on doc_type: @@ -456,7 +485,9 @@ async def _fetch_text(url: str, doc_type: str = "") -> str: timeout=120.0, ) if resp.status_code == 200: - docs = resp.json().get("documents", []) + payload = resp.json() + docs = payload.get("documents", []) + cmp_payloads = payload.get("cmp_payloads") or [] if docs: texts = [] for doc in docs: @@ -468,7 +499,7 @@ async def _fetch_text(url: str, doc_type: str = "") -> str: if len(texts) > 1: logger.info("Merged %d docs from %s (%d words)", len(texts), url, len(merged.split())) - return merged + return merged, cmp_payloads except Exception as e: logger.warning("Consent-tester fetch failed for %s: %s", url, e) @@ -486,11 +517,11 @@ async def _fetch_text(url: str, doc_type: str = "") -> str: text = _re.sub(r"\s+", " ", text).strip() if len(text.split()) > 100: logger.info("HTTP fallback for %s: %d words", url, len(text.split())) - return text + return text, [] except Exception as e: logger.warning("HTTP fallback failed for %s: %s", url, e) - return "" + return "", [] async def _autodiscover_missing( @@ -559,11 +590,15 @@ async def _autodiscover_missing( if resp.status_code != 200: logger.warning("auto-discovery: HTTP %d for %s", resp.status_code, base) discovered: list[dict] = [] + disc_payloads: list[dict] = [] else: - discovered = resp.json().get("documents", []) + disc_body = resp.json() + discovered = disc_body.get("documents", []) + disc_payloads = disc_body.get("cmp_payloads") or [] except Exception as e: logger.warning("auto-discovery failed for %s: %s", base, e) discovered = [] + disc_payloads = [] # Classify each discovered doc into a canonical doc_type by_type: dict[str, dict] = {} @@ -585,6 +620,7 @@ async def _autodiscover_missing( new_entry: dict = { "doc_type": dt, "url": "", "text": "", "word_count": 0, "auto_discovered": False, "discovery_attempted": True, + "cmp_payloads": [], } d = by_type.get(dt) if d: @@ -594,6 +630,11 @@ async def _autodiscover_missing( new_entry["url"] = d.get("url", "") new_entry["word_count"] = len(full.split()) new_entry["auto_discovered"] = True + # Auto-discovery happens on the HOMEPAGE — any CMP payload + # captured at that level likely belongs to the cookie page + # (CMP widget loaded site-wide). Attach to 'cookie' entry. + if dt == "cookie" and disc_payloads: + new_entry["cmp_payloads"] = disc_payloads doc_texts[dt] = full filled += 1 logger.info( diff --git a/backend-compliance/compliance/api/agent_doc_check_extras.py b/backend-compliance/compliance/api/agent_doc_check_extras.py index c9c121af..d8c89c8f 100644 --- a/backend-compliance/compliance/api/agent_doc_check_extras.py +++ b/backend-compliance/compliance/api/agent_doc_check_extras.py @@ -229,4 +229,105 @@ def _category_label(kat: str) -> str: "functional": "Funktional", "statistics": "Statistik", "marketing": "Marketing", + "strictlyNecessary": "Notwendig", + "advertising": "Marketing", }.get(kat, kat or "—") + + +def build_vvt_table_html(vendors: list[dict]) -> str: + """Render the per-vendor VVT-style table for the email report. + + One row per vendor. Columns: Name | Kategorie | Sitz | Cookies | + Opt-Out (Status) | Privacy (Status) | Compliance-Score. + + Vendors are expected to come from vendor_extractor.extract_vendors_from_payloads + and have already been scored by cookie_link_validator.score_vendors. + """ + if not vendors: + return "" + + vendors = sorted(vendors, key=lambda v: v.get("compliance_score", 0)) + rows: list[str] = [] + for v in vendors: + name = v.get("name") or "Unbekannt" + category = _category_label(v.get("category", "")) + country = v.get("country") or "—" + cookies = v.get("cookies") or [] + n_cookies = len(cookies) + score = int(v.get("compliance_score", 0)) + flags = v.get("compliance_flags") or [] + + opt_status = _link_status_badge( + v.get("opt_out_url"), v.get("opt_out_ok"), + v.get("opt_out_status"), + ) + privacy_status = _link_status_badge( + v.get("privacy_policy_url"), v.get("privacy_ok"), + v.get("privacy_status"), + ) + + score_color = ("#16a34a" if score >= 80 else + "#d97706" if score >= 50 else "#dc2626") + flag_str = "" + if flags: + flag_str = ( + f'
' + f'{", ".join(flags[:4])}
' + ) + rows.append( + f'' + f'' + f'{name}{flag_str}' + f'{category}' + f'{country}' + f'' + f'{n_cookies}' + f'{opt_status}' + f'{privacy_status}' + f'{score}%' + f'' + ) + + n_total = len(vendors) + n_critical = sum(1 for v in vendors if v.get("compliance_score", 0) < 50) + summary = ( + f"{n_total} Anbieter erfasst" + + (f", {n_critical} unter 50%" + if n_critical else " — alle ueber 50%") + ) + + return ( + '
' + '

' + 'VVT-Vorschlag: Drittanbieter aus Cookie-Richtlinie

' + f'

{summary}. ' + 'Sortiert nach Compliance-Score (niedrig zuerst — diese Eintraege ' + 'pruefen).

' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '' + + "".join(rows) + + '
NameKategorieSitzCookiesOpt-OutPrivacyScore
' + ) + + +def _link_status_badge(url: str | None, ok: bool | None, status: int | None) -> str: + if not url: + return ('' + '✗') + if ok: + return ('') + status_str = str(status) if status else "?" + return ('✗ ({status_str})') diff --git a/backend-compliance/compliance/services/cookie_link_validator.py b/backend-compliance/compliance/services/cookie_link_validator.py index 603dcd3e..a049d4e4 100644 --- a/backend-compliance/compliance/services/cookie_link_validator.py +++ b/backend-compliance/compliance/services/cookie_link_validator.py @@ -120,6 +120,127 @@ async def validate_links(links: list[LinkCheck]) -> list[LinkCheck]: return links +# ── Per-vendor link validation ────────────────────────────────────── + +async def validate_vendor_urls(vendors: list[dict]) -> list[dict]: + """Probe opt-out and privacy URLs of each vendor. Mutates each vendor: + + vendor["opt_out_status"] = int (0 = unreachable, 2xx/3xx = ok) + vendor["opt_out_ok"] = bool + vendor["privacy_status"] = int + vendor["privacy_ok"] = bool + """ + if not vendors: + return vendors + + # Flatten into one list of LinkCheck (with back-reference to vendor) + probes: list[tuple[dict, str, str]] = [] # (vendor, url, kind) + for v in vendors: + if v.get("opt_out_url"): + probes.append((v, v["opt_out_url"], "opt_out")) + if v.get("privacy_policy_url"): + probes.append((v, v["privacy_policy_url"], "privacy")) + + if not probes: + return vendors + + sem = asyncio.Semaphore(_MAX_CONCURRENT) + async with httpx.AsyncClient( + timeout=_PER_URL_TIMEOUT, + follow_redirects=True, + headers={"User-Agent": "BreakPilot-LinkChecker/1.0"}, + ) as client: + async def probe(vendor: dict, url: str, kind: str) -> None: + async with sem: + try: + resp = await client.head(url) + if resp.status_code in (405, 403): + resp = await client.get(url) + vendor[f"{kind}_status"] = resp.status_code + vendor[f"{kind}_ok"] = 200 <= resp.status_code < 400 + except Exception as e: + vendor[f"{kind}_status"] = 0 + vendor[f"{kind}_ok"] = False + vendor[f"{kind}_error"] = str(e)[:60] + try: + await asyncio.wait_for( + asyncio.gather(*[probe(v, u, k) for v, u, k in probes]), + timeout=_BATCH_TIMEOUT, + ) + except asyncio.TimeoutError: + logger.warning("vendor-link batch timeout (%d probes)", len(probes)) + return vendors + + +def score_vendors(vendors: list[dict]) -> list[dict]: + """Compute per-vendor compliance score (0-100) and flags. Mutates.""" + for v in vendors: + score = 0 + max_score = 0 + flags: list[str] = [] + + # Name (always required) — 20 + max_score += 20 + if v.get("name"): + score += 20 + else: + flags.append("no_name") + + # Purpose — 15 + max_score += 15 + if v.get("purpose"): + score += 15 + else: + flags.append("no_purpose") + + # Country (3rd-country transfer relevance) — 10 + max_score += 10 + if v.get("country"): + score += 10 + else: + flags.append("no_country") + + # Opt-Out URL present + reachable — 25 + max_score += 25 + if not v.get("opt_out_url"): + flags.append("no_opt_out_url") + elif v.get("opt_out_ok") is False: + flags.append("broken_opt_out") + score += 5 # at least they tried + else: + score += 25 + + # Privacy policy URL present + reachable — 15 + max_score += 15 + if not v.get("privacy_policy_url"): + flags.append("no_privacy_url") + elif v.get("privacy_ok") is False: + flags.append("broken_privacy_url") + score += 5 + else: + score += 15 + + # Cookies disclosed (names + expiry) — 15 + max_score += 15 + cookies = v.get("cookies") or [] + if cookies: + named = sum(1 for c in cookies if c.get("name")) + with_expiry = sum(1 for c in cookies if c.get("expiry")) + if named >= 1 and with_expiry >= 1: + score += 15 + elif named >= 1: + score += 8 + flags.append("cookies_no_expiry") + else: + flags.append("cookies_no_names") + else: + flags.append("no_cookies_listed") + + v["compliance_score"] = round(score / max_score * 100) if max_score else 0 + v["compliance_flags"] = flags + return vendors + + # ── CheckItem rendering ────────────────────────────────────────────── def build_check_items(validated: list[LinkCheck]) -> list[dict]: diff --git a/backend-compliance/compliance/services/vendor_extractor.py b/backend-compliance/compliance/services/vendor_extractor.py new file mode 100644 index 00000000..4e020945 --- /dev/null +++ b/backend-compliance/compliance/services/vendor_extractor.py @@ -0,0 +1,190 @@ +""" +Vendor record extraction from captured CMP payloads. + +Mirrors the per-CMP `extract_vendors()` functions in consent-tester's +cmp_library/ — duplicated here because the backend cannot import the +consent-tester package (different containers). Schemas are stable per CMP +vendor, so this is acceptable. When a new CMP is added in consent-tester, +add the matching extractor here. + +Returned vendor record schema: + { + "name": str, # e.g. "Adobe Systems Software Ireland Limited" + "country": str, # ISO 2-letter (DE/US/...) when known + "purpose": str, # short description of what they do + "category": str, # marketing/analytics/functional/necessary + "opt_out_url": str, # link to opt out (Art. 7(3) DSGVO) + "privacy_policy_url": str, # link to vendor's privacy policy + "persistence": str, # human-readable retention text + "cookies": [ # cookies this vendor sets + {"name": str, "purpose": str, "expiry": str, "is_third_party": bool} + ], + # Compliance scoring (filled after vendor_compliance.evaluate()) + "compliance_score": int, # 0-100 + "compliance_flags": list[str], # e.g. ["no_opt_out", "broken_opt_out"] + } +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + +_TAG_RE = re.compile(r"<[^>]+>") +_WS_RE = re.compile(r"\s+") + + +def _clean(s: object) -> str: + text = "" if s is None else str(s) + no_tags = _TAG_RE.sub(" ", text) + return _WS_RE.sub(" ", no_tags).strip() + + +def extract_vendors_from_payloads(payloads: list[dict]) -> list[dict]: + """Walk every captured CMP payload, dispatch to per-CMP extractor. + + Deduplicates vendors across payloads by name (preserves richer record). + """ + all_vendors: dict[str, dict] = {} + for payload in payloads or []: + kind = payload.get("kind", "") + data = payload.get("data", {}) + if not isinstance(data, dict): + continue + try: + if kind == "epaas": + vendors = _extract_epaas(data) + elif kind == "onetrust": + vendors = _extract_onetrust(data) + else: + # Generic fallback: walk data for vendor-like dicts + vendors = _extract_generic(data) + except Exception as e: + logger.warning("vendor extractor failed for %s: %s", kind, e) + continue + for v in vendors: + name = (v.get("name") or "").strip() + if not name: + continue + existing = all_vendors.get(name) + if existing: + # Merge cookies + fill empty fields + for k, v_val in v.items(): + if not existing.get(k) and v_val: + existing[k] = v_val + existing.setdefault("cookies", []).extend(v.get("cookies", [])) + else: + all_vendors[name] = v + return list(all_vendors.values()) + + +# ── ePaaS (BMW Group) ─────────────────────────────────────────────── + +def _extract_epaas(d: dict) -> list[dict]: + out: list[dict] = [] + providers = d.get("providers", []) or [] + cookies_by_provider: dict[str, list[dict]] = {} + + for c in d.get("cookies", []) or []: + pid = str(c.get("providerId") or c.get("provider") or c.get("vendor") or "") + if pid: + cookies_by_provider.setdefault(pid, []).append({ + "name": c.get("name") or c.get("id") or "", + "purpose": _clean(c.get("purpose") or c.get("description")), + "expiry": _clean(c.get("expiry") or c.get("retention") or c.get("persistence")), + "is_third_party": bool(c.get("isThirdParty") or c.get("third_party")), + }) + + for p in providers: + pid = str(p.get("id") or p.get("vendorId") or p.get("name") or "") + cookies = cookies_by_provider.get(pid, []) or [{ + "name": c.get("name", ""), + "purpose": _clean(c.get("purpose")), + "expiry": _clean(c.get("expiry") or c.get("persistence")), + "is_third_party": True, + } for c in (p.get("cookies", []) or [])] + out.append({ + "name": p.get("name") or pid or "", + "country": (p.get("country") or "").strip(), + "purpose": _clean(p.get("purpose")), + "category": (p.get("category") or "").strip(), + "opt_out_url": (p.get("optOutUrl") or p.get("optoutUrl") + or p.get("opt_out_url") or "").strip(), + "privacy_policy_url": (p.get("policyUrl") or p.get("policy_url") + or p.get("privacyPolicyUrl") or "").strip(), + "persistence": _clean(p.get("persistencePurposeDescription")), + "cookies": cookies, + }) + return out + + +# ── OneTrust ──────────────────────────────────────────────────────── + +def _extract_onetrust(d: dict) -> list[dict]: + out_by_name: dict[str, dict] = {} + for g in d.get("Groups") or d.get("groups") or []: + category = g.get("GroupName") or g.get("name") or "" + for c in g.get("Cookies") or g.get("cookies") or []: + provider = (c.get("Provider") or c.get("provider") + or c.get("Host") or c.get("host") or "").strip() + if not provider: + continue + cookie_entry = { + "name": c.get("Name") or c.get("name") or "", + "purpose": _clean(c.get("description") or c.get("Description")), + "expiry": _clean(c.get("Length") or c.get("expires")), + "is_third_party": bool(c.get("IsThirdParty") or c.get("isThirdParty")), + } + if provider in out_by_name: + out_by_name[provider]["cookies"].append(cookie_entry) + else: + out_by_name[provider] = { + "name": provider, + "country": "", + "purpose": _clean(g.get("GroupDescription") or c.get("description")), + "category": category, + "opt_out_url": "", + "privacy_policy_url": (c.get("PolicyUrl") or c.get("policyUrl") or ""), + "persistence": "", + "cookies": [cookie_entry], + } + return list(out_by_name.values()) + + +# ── Generic fallback (other CMPs / heuristic captures) ────────────── + +def _extract_generic(d: dict) -> list[dict]: + """Best-effort walk for unknown CMP shapes. + + Looks for top-level keys named 'vendors' / 'providers' / 'services' and + extracts name/purpose/country fields from each entry. + """ + out: list[dict] = [] + for key in ("vendors", "providers", "services", "dataProcessingServices", + "Vendors", "Providers"): + lst = d.get(key) + if not isinstance(lst, list): + continue + for entry in lst: + if not isinstance(entry, dict): + continue + name = (entry.get("name") or entry.get("vendor") + or entry.get("dataProcessor") or "").strip() + if not name: + continue + out.append({ + "name": name, + "country": (entry.get("country") or "").strip(), + "purpose": _clean(entry.get("purpose") or entry.get("description") + or entry.get("dataPurpose")), + "category": (entry.get("category") or "").strip(), + "opt_out_url": (entry.get("optOutUrl") or entry.get("opt_out_url") + or "").strip(), + "privacy_policy_url": (entry.get("policyUrl") or entry.get("privacyPolicyUrl") + or entry.get("privacy_policy_url") or "").strip(), + "persistence": _clean(entry.get("retentionPeriodDescription")), + "cookies": [], + }) + return out diff --git a/consent-tester/main.py b/consent-tester/main.py index dd66d008..a04c2c4c 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -293,6 +293,9 @@ class DSIDiscoveryResponse(BaseModel): languages_detected: list[str] errors: list[str] scanned_at: str + # Raw CMP payloads captured during navigation (ePaaS, OneTrust, etc.). + # Backend uses these to build the per-vendor compliance table. + cmp_payloads: list[dict] = [] @app.post("/dsi-discovery", response_model=DSIDiscoveryResponse) @@ -343,6 +346,7 @@ async def dsi_discovery(req: DSIDiscoveryRequest): languages_detected=result.languages_detected, errors=result.errors, scanned_at=datetime.now(timezone.utc).isoformat(), + cmp_payloads=result.cmp_payloads, ) diff --git a/consent-tester/services/cmp_library/epaas.py b/consent-tester/services/cmp_library/epaas.py index 4cc8e063..162dbc43 100644 --- a/consent-tester/services/cmp_library/epaas.py +++ b/consent-tester/services/cmp_library/epaas.py @@ -67,3 +67,53 @@ def reconstruct(d: dict) -> str: parts.append(_clean(str(meta["persistencePurposeText"]))) return "\n".join(parts) + + +def extract_vendors(d: dict) -> list[dict]: + """Return structured vendor records from ePaaS policy JSON. + + Schema returned (per vendor): + {name, country, purpose, category, opt_out_url, privacy_policy_url, + persistence, cookies: [{name, purpose, expiry, is_third_party}]} + """ + out: list[dict] = [] + providers = d.get("providers", []) or [] + cookies_by_provider: dict[str, list[dict]] = {} + + # ePaaS sometimes stores cookies in a separate 'cookies' array referenced + # by providerId. If so, group them by provider. + cookies_list = d.get("cookies", []) or [] + for c in cookies_list: + pid = (c.get("providerId") or c.get("provider") + or c.get("vendorId") or c.get("vendor") or "") + if pid: + cookies_by_provider.setdefault(str(pid), []).append({ + "name": c.get("name") or c.get("id") or "", + "purpose": _clean(str(c.get("purpose") or c.get("description") or "")), + "expiry": _clean(str(c.get("expiry") or c.get("retention") + or c.get("persistence") or "")), + "is_third_party": bool(c.get("isThirdParty") + or c.get("third_party")), + }) + + for p in providers: + pid = str(p.get("id") or p.get("vendorId") or p.get("name") or "") + cookies = (cookies_by_provider.get(pid, []) + or [{"name": c.get("name", ""), + "purpose": _clean(str(c.get("purpose", ""))), + "expiry": _clean(str(c.get("expiry") or c.get("persistence") or "")), + "is_third_party": True} + for c in (p.get("cookies", []) or [])]) + out.append({ + "name": p.get("name") or pid or "", + "country": (p.get("country") or "").strip(), + "purpose": _clean(str(p.get("purpose") or "")), + "category": (p.get("category") or "").strip(), + "opt_out_url": (p.get("optOutUrl") or p.get("optoutUrl") + or p.get("opt_out_url") or "").strip(), + "privacy_policy_url": (p.get("policyUrl") or p.get("policy_url") + or p.get("privacyPolicyUrl") or "").strip(), + "persistence": _clean(str(p.get("persistencePurposeDescription") or "")), + "cookies": cookies, + }) + return out diff --git a/consent-tester/services/cmp_library/onetrust.py b/consent-tester/services/cmp_library/onetrust.py index 4b97a684..eba4fd8f 100644 --- a/consent-tester/services/cmp_library/onetrust.py +++ b/consent-tester/services/cmp_library/onetrust.py @@ -54,3 +54,61 @@ def reconstruct(d: dict) -> str: parts.append(line) return "\n".join(parts) + + +def extract_vendors(d: dict) -> list[dict]: + """Return structured vendor records from OneTrust JSON. + + OneTrust groups cookies into 'Groups' (Strictly Necessary, Analytics, + Marketing, etc). Within each group, cookies are listed with Provider, + Host, Length (retention) and optional Privacy/Opt-Out URLs. + """ + out: list[dict] = [] + seen: set[str] = set() + + groups = d.get("Groups") or d.get("groups") or [] + for g in groups: + category = g.get("GroupName") or g.get("name") or "" + for c in g.get("Cookies") or g.get("cookies") or []: + provider = (c.get("Provider") or c.get("provider") + or c.get("Host") or c.get("host") or "").strip() + if not provider: + continue + cookie_entry = { + "name": c.get("Name") or c.get("name") or "", + "purpose": _clean(str(c.get("description") + or c.get("Description") or "")), + "expiry": _clean(str(c.get("Length") or c.get("expires") or "")), + "is_third_party": (c.get("IsThirdParty") + or c.get("isThirdParty") or False), + } + if provider in seen: + # Append cookie to existing vendor + for entry in out: + if entry["name"] == provider: + entry["cookies"].append(cookie_entry) + break + else: + seen.add(provider) + out.append({ + "name": provider, + "country": "", + "purpose": _clean(str(c.get("description") + or g.get("GroupDescription") or "")), + "category": category, + "opt_out_url": "", + "privacy_policy_url": (c.get("PolicyUrl") + or c.get("policyUrl") or ""), + "persistence": "", + "cookies": [cookie_entry], + }) + return out + + +_TAG_RE = __import__("re").compile(r"<[^>]+>") +_WS_RE = __import__("re").compile(r"\s+") + + +def _clean(text: str) -> str: + no_tags = _TAG_RE.sub(" ", text or "") + return _WS_RE.sub(" ", no_tags).strip() diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index 24c1134c..b06db3fb 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -168,6 +168,10 @@ class DSIDiscoveryResult: total_found: int = 0 languages_detected: list[str] = field(default_factory=list) errors: list[str] = field(default_factory=list) + # Raw CMP payloads captured during navigation (one per matched JSON). + # Schema: [{"kind": str, "url": str, "data": dict}, ...] + # Backend uses these to build vendor records + run per-vendor checks. + cmp_payloads: list[dict] = field(default_factory=list) def _matches_dsi_keyword(text: str) -> tuple[bool, str]: """Check if text contains any DSI keyword. Returns (match, language).""" @@ -270,6 +274,10 @@ async def discover_dsi_documents( logger.info("PDF redirect detected: %s -> %s", url, final_url) # Return early — a PDF redirect means no HTML content to scan result.total_found = len(result.documents) + result.cmp_payloads = [ + {"kind": kind, "data": data} + for kind, data in cmp_capture.payloads + ] return result # Step 1b: Try dismissing cookie consent banners before extraction. @@ -534,8 +542,11 @@ async def discover_dsi_documents( result.languages_detected = list(set( d.language for d in result.documents if d.language )) - logger.info("DSI discovery complete: %d documents found in %s", - result.total_found, result.languages_detected) + result.cmp_payloads = [ + {"kind": kind, "data": data} for kind, data in cmp_capture.payloads + ] + logger.info("DSI discovery complete: %d documents found in %s, %d CMP payloads", + result.total_found, result.languages_detected, len(result.cmp_payloads)) return result # Nav elements, not real documents