""" Cookie-Richtlinie Opt-Out and Privacy-Policy link validator. Art. 7(3) DSGVO: "Der Widerruf der Einwilligung muss so einfach wie die Erteilung sein". Per third-party provider in the cookie policy there must be a working opt-out mechanism. A missing or broken link makes that provider entry legally non-compliant. This module extracts the URLs from the cookie-policy text and tests each one via async HTTP (HEAD first, GET fallback). Returns structured findings the route layer turns into CheckItems for the email + frontend report. """ from __future__ import annotations import asyncio import logging import re from typing import TypedDict import httpx logger = logging.getLogger(__name__) # URL extraction patterns. Each captures the URL that follows the keyword. _URL_RE = r"https?://[\w\-./?#&=:%~+@]+" _OPTOUT_PATTERN = re.compile( rf"opt[\-\s]?out[\-\s]?(?:link)?\s*[:\|]?\s*({_URL_RE})", re.IGNORECASE, ) _PRIVACY_PATTERN = re.compile( rf"(?:link\s+zur?\s+(?:privacy[\-\s]?policy|datenschutz\w*)|privacy[\-\s]?policy)\s*[:\|]?\s*({_URL_RE})", re.IGNORECASE, ) # Concurrency + timeout budget. 10 parallel requests, 8s per request, # whole batch capped at 60s — keeps the cookie check inside the existing # 120s backend → consent-tester budget. _MAX_CONCURRENT = 10 _PER_URL_TIMEOUT = 8.0 _BATCH_TIMEOUT = 60.0 class LinkCheck(TypedDict, total=False): url: str kind: str # "opt-out" | "privacy-policy" status: int # 0 = unreachable final_url: str error: str reachable: bool def extract_links(text: str) -> list[LinkCheck]: """Pull all Opt-Out + Privacy-Policy URLs from a cookie-policy text. Deduplicates by URL+kind. Strips trailing punctuation/quotes commonly captured by greedy URL regex. """ found: dict[tuple[str, str], LinkCheck] = {} for kind, pattern in (("opt-out", _OPTOUT_PATTERN), ("privacy-policy", _PRIVACY_PATTERN)): for match in pattern.finditer(text): url = match.group(1).rstrip(".,;:\"')(]").strip() if not url.startswith(("http://", "https://")): continue key = (url, kind) if key not in found: found[key] = LinkCheck(url=url, kind=kind) return list(found.values()) async def validate_links(links: list[LinkCheck]) -> list[LinkCheck]: """HTTP-probe each link concurrently. Adds status + reachable flag. Uses HEAD first (fast), falls back to GET for servers that reject HEAD. Accepts any 2xx/3xx as reachable; 4xx/5xx and timeouts as broken. """ if not links: return [] sem = asyncio.Semaphore(_MAX_CONCURRENT) async with httpx.AsyncClient( timeout=_PER_URL_TIMEOUT, follow_redirects=True, headers={"User-Agent": "BreakPilot-LinkChecker/1.0"}, ) as client: async def probe(link: LinkCheck) -> LinkCheck: async with sem: try: resp = await client.head(link["url"]) if resp.status_code in (405, 403): # Some servers reject HEAD; try GET resp = await client.get(link["url"]) link["status"] = resp.status_code link["final_url"] = str(resp.url) link["reachable"] = 200 <= resp.status_code < 400 except httpx.TimeoutException: link["status"] = 0 link["error"] = "timeout" link["reachable"] = False except Exception as e: link["status"] = 0 link["error"] = str(e)[:80] link["reachable"] = False return link try: results = await asyncio.wait_for( asyncio.gather(*[probe(link) for link in links]), timeout=_BATCH_TIMEOUT, ) return list(results) except asyncio.TimeoutError: logger.warning( "Cookie-link batch timeout after %.0fs — %d urls", _BATCH_TIMEOUT, len(links), ) # Best-effort: return whatever links got updated return links # ── Per-vendor link validation ────────────────────────────────────── async def validate_vendor_urls(vendors: list[dict]) -> list[dict]: """Probe opt-out and privacy URLs of each vendor. Mutates each vendor: vendor["opt_out_status"] = int (0 = unreachable, 2xx/3xx = ok) vendor["opt_out_ok"] = bool vendor["privacy_status"] = int vendor["privacy_ok"] = bool """ if not vendors: return vendors # Flatten into one list of LinkCheck (with back-reference to vendor) probes: list[tuple[dict, str, str]] = [] # (vendor, url, kind) for v in vendors: if v.get("opt_out_url"): probes.append((v, v["opt_out_url"], "opt_out")) if v.get("privacy_policy_url"): probes.append((v, v["privacy_policy_url"], "privacy")) if not probes: return vendors sem = asyncio.Semaphore(_MAX_CONCURRENT) async with httpx.AsyncClient( timeout=_PER_URL_TIMEOUT, follow_redirects=True, headers={"User-Agent": "BreakPilot-LinkChecker/1.0"}, ) as client: async def probe(vendor: dict, url: str, kind: str) -> None: async with sem: try: resp = await client.head(url) if resp.status_code in (405, 403): resp = await client.get(url) vendor[f"{kind}_status"] = resp.status_code vendor[f"{kind}_ok"] = 200 <= resp.status_code < 400 except Exception as e: vendor[f"{kind}_status"] = 0 vendor[f"{kind}_ok"] = False vendor[f"{kind}_error"] = str(e)[:60] try: await asyncio.wait_for( asyncio.gather(*[probe(v, u, k) for v, u, k in probes]), timeout=_BATCH_TIMEOUT, ) except asyncio.TimeoutError: logger.warning("vendor-link batch timeout (%d probes)", len(probes)) return vendors def score_vendors(vendors: list[dict]) -> list[dict]: """Compute per-vendor compliance score (0-100) and flags. Scoring is recipient-type AND category aware. Two orthogonal axes influence which fields are required: recipient_type == INTERNAL / GROUP_COMPANY Own processing — the user's consent + main DSI cover privacy + opt-out for ALL of these. Per-row opt-out / privacy URLs are NOT a compliance gap. What matters: VVT-relevante Fields (purpose, cookies with names + expiry). category == 'necessary' (§25 Abs. 2 TDDDG) Technically necessary cookies don't need consent → no opt-out required even for external processors. For each non-applicable field we set flag '_n_a' instead of a penalty flag, so the report can render it neutrally. """ for v in vendors: rtype = (v.get("recipient_type") or "OTHER").upper() is_own = rtype in ("INTERNAL", "GROUP_COMPANY") is_necessary = (v.get("category") or "").lower() in ( "necessary", "strictlynecessary", ) opt_out_required = not is_own and not is_necessary privacy_required = not is_own country_required = not is_own score = 0 max_score = 0 flags: list[str] = [] # Name (always required) — 20 max_score += 20 if v.get("name"): score += 20 else: flags.append("no_name") # Purpose — 20 max_score += 20 if v.get("purpose"): score += 20 else: flags.append("no_purpose") # Country — only for external processors / controllers # Falls country leer ist, ableiten aus Rechtsform-Suffix im Namen. if country_required: max_score += 10 if v.get("country"): score += 10 elif _country_from_name(v.get("name", "")): inferred = _country_from_name(v.get("name", "")) v["country"] = inferred v["country_inferred"] = True score += 10 else: flags.append("no_country") # Opt-Out URL — only when consent-based AND external if opt_out_required: max_score += 25 if not v.get("opt_out_url"): flags.append("no_opt_out_url") elif v.get("opt_out_ok") is False: flags.append("broken_opt_out") score += 5 else: score += 25 # Privacy policy URL — required for external (own = via main DSI) if privacy_required: weight = 10 if is_necessary else 15 max_score += weight if not v.get("privacy_policy_url"): flags.append("no_privacy_url") elif v.get("privacy_ok") is False: flags.append("broken_privacy_url") score += weight // 3 else: score += weight # Cookies disclosed (names + expiry) — required for ALL types # (own processing too: BMW must list its own cookies for the VVT) weight = 50 if is_own or is_necessary else 15 max_score += weight cookies = v.get("cookies") or [] if cookies: named = sum(1 for c in cookies if c.get("name")) with_expiry = sum(1 for c in cookies if c.get("expiry")) if named >= 1 and with_expiry >= 1: score += weight elif named >= 1: score += weight // 2 flags.append("cookies_no_expiry") else: flags.append("cookies_no_names") else: flags.append("no_cookies_listed") v["compliance_score"] = round(score / max_score * 100) if max_score else 0 v["compliance_flags"] = flags return vendors # ── CheckItem rendering ────────────────────────────────────────────── def build_check_items(validated: list[LinkCheck]) -> list[dict]: """Turn validator results into compliance-check items (one per kind). Always returns 2 items (opt-out + privacy-policy) so the report layout is stable. Skipped if no links of that kind were extracted. """ items: list[dict] = [] for kind, label in ( ("opt-out", "Opt-Out-Links der Drittanbieter erreichbar"), ("privacy-policy", "Privacy-Policy-Links der Drittanbieter erreichbar"), ): of_kind = [l for l in validated if l.get("kind") == kind] if not of_kind: continue total = len(of_kind) ok = sum(1 for l in of_kind if l.get("reachable")) broken = [l for l in of_kind if not l.get("reachable")] all_pass = ok == total hint = "" matched = "" if all_pass: matched = f"{ok}/{total} Links erreichbar (HTTP 2xx/3xx)" else: broken_summary = ", ".join( f"{l['url'][:60]} ({l.get('status') or l.get('error', '?')})" for l in broken[:5] ) hint = ( f"{len(broken)}/{total} Links sind defekt. Defekte " f"Provider-Eintraege erfuellen Art. 7(3) DSGVO nicht — der " f"Widerruf der Einwilligung ist fuer diese Anbieter unmoeglich. " f"Beispiele: {broken_summary}" ) items.append({ "id": f"cookie_links_{kind.replace('-', '_')}", "label": label, "passed": all_pass, "severity": "MEDIUM" if kind == "opt-out" else "LOW", "matched_text": matched, "level": 2, "parent": "opt_out", "skipped": False, "hint": hint, }) return items # ─── Country-Inferenz aus Rechtsform-Suffix ──────────────────────── # # Wenn ein Vendor das "country"-Feld leer hat, koennen wir es oft aus # dem Firmen-Suffix ableiten: # Adform A/S → DK (Dänemark, Aktieselskab) # Pinterest Europe Ltd. → IE (Irland, Limited) # Salesforce Inc. → US (Incorporated) # Adobe ... Ireland Limited → IE # Genesys ... B.V. → NL (Niederlande, Besloten Vennootschap) # Equativ S.A. → FR (Société Anonyme) # SAP SE → DE (Societas Europaea — meist DE-eingetragen) # # Kombi-Strategie: # 1) Suffix-Pattern # 2) Laendername im Firmen-Namen ('Ireland', 'Deutschland') # 3) Specific Vendor (Google Inc / Meta Platforms Ireland Ltd → vendor-specific) import re as _re _SUFFIX_COUNTRY: list[tuple[str, str]] = [ # Pattern (am Wort-Ende oder vor weiteren Tokens) → ISO-Code (r"\bA/S\b", "DK"), # Aktieselskab (r"\bApS\b", "DK"), # Anpartsselskab (r"\bAB\b", "SE"), # Aktiebolag (r"\bAS\b(?!\w)", "NO"), # Aksjeselskap (r"\bOy\b", "FI"), # Osakeyhtiö (r"\bAG\b(?!\w)", "DE"), # auch CH/AT moeglich, default DE (r"\bGmbH\b", "DE"), (r"\bUG\b", "DE"), (r"\beG\b", "DE"), (r"\bKG\b", "DE"), (r"\bOHG\b", "DE"), (r"\bSE\b", "DE"), # Societas Europaea — pruefen ob SAP SE etc. (r"\bS\.A\.\b", "FR"), # France / SE / ES (r"\bSAS\b", "FR"), (r"\bS\.A\.S\.\b", "FR"), (r"\bSARL\b", "FR"), (r"\bS\.r\.l\.\b", "IT"), (r"\bS\.p\.A\.\b", "IT"), (r"\bSpA\b", "IT"), (r"\bB\.V\.\b", "NL"), (r"\bN\.V\.\b", "NL"), (r"\bSL\b", "ES"), (r"\bS\.A\.\sde C\.V\.\b", "MX"), (r"\bd\.o\.o\.\b", "SI"), # Slowenien (r"\bd\.d\.\b", "HR"), # Kroatien (r"\bz\s?o\.o\.\b", "PL"), (r"\bInc\.?\b", "US"), (r"\bIncorporated\b", "US"), (r"\bCorp\.?\b", "US"), (r"\bCorporation\b", "US"), (r"\bLLC\b", "US"), (r"\bL\.L\.C\.\b", "US"), (r"\bLtd\.?\b", "GB"), # UK Limited, default (r"\bLimited\b", "GB"), (r"\bPLC\b", "GB"), (r"\bPty\b", "AU"), (r"\bK\.K\.\b", "JP"), # Kabushiki-Kaisha (r"\bPte\.?\sLtd\.?\b", "SG"), ] # Country-Namen im Firmen-Namen (z.B. "Adobe Systems Software Ireland Limited") _COUNTRY_NAME_TOKENS: list[tuple[str, str]] = [ ("ireland", "IE"), ("deutschland", "DE"), ("germany", "DE"), ("netherlands", "NL"), ("france", "FR"), ("united kingdom", "GB"), ("uk", "GB"), ("usa", "US"), ("united states", "US"), ("austria", "AT"), ("oesterreich", "AT"), ("schweiz", "CH"), ("switzerland", "CH"), ("luxembourg", "LU"), ("luxemburg", "LU"), ("denmark", "DK"), ("daenemark", "DK"), ("sweden", "SE"), ("schweden", "SE"), ("norway", "NO"), ("norwegen", "NO"), ("finland", "FI"), ("finnland", "FI"), ] # Bekannte Vendors mit eindeutigem Sitz (override) _KNOWN_VENDOR_COUNTRY: dict[str, str] = { "google inc": "US", "google llc": "US", "google ireland": "IE", "meta platforms ireland": "IE", "facebook ireland": "IE", "amazon.com inc": "US", "amazon web services": "US", "amazon web services inc": "US", "linkedin inc": "US", "salesforce inc": "US", "salesforce.com": "US", "outbrain inc": "US", "taboola inc": "US", "pinterest europe ltd": "IE", "intuition machines inc": "US", "akamai technologies inc": "US", "criteo s.a": "FR", "criteo sa": "FR", "adform a/s": "DK", "speedcurve limited": "GB", "longtail ad solutions": "US", "genesys cloud services b.v": "NL", "qualtrics": "US", "teads sa": "FR", "teads s.a": "FR", "salesviewer gmbh": "DE", "baqend gmbh": "DE", "zenweshare sas": "FR", "nayoki gmbh": "DE", "psyma": "DE", "matomo": "NZ", # InnoCraft NZ aber EU-hostbar "adobe systems software ireland": "IE", "microsoft corporation": "US", "microsoft corp": "US", } def _country_from_name(vendor_name: str) -> str: """Best-effort: ISO-2 Country-Code aus dem Vendor-Namen ableiten.""" if not vendor_name: return "" # Vendor-Namen sind oft "" — nur Firmen-Teil betrachten firm = vendor_name.split(" — ")[0].strip() firm_l = firm.lower() # 1) Known vendor lookup (most specific) for k, v in _KNOWN_VENDOR_COUNTRY.items(): if k in firm_l: return v # 2) Country-Name im Firmen-Namen for token, code in _COUNTRY_NAME_TOKENS: if token in firm_l: return code # 3) Rechtsform-Suffix for pattern, code in _SUFFIX_COUNTRY: if _re.search(pattern, firm): return code return ""