Files
breakpilot-compliance/backend-compliance/compliance/services/cookie_link_validator.py
T
Benjamin Admin c9c0fb5965 feat(cookie-check): enhanced patterns + active opt-out link validator
cookie_checks.py:
- cookie_names_listed: now also matches CMP placeholder notation
  (BMW: 'Adfpc###', 'CT###') and 'Diese Datenverarbeitung verwendet die
  folgenden Cookies oder ähnliche Technologien' as list-shape signal.
  Cryptic vendor names like 'audience', 'adformfrpid' are accepted via
  the surrounding markup, not by hard-coding each one.
- cookie_providers_named: new pattern 'Gesetzt von: <Firma>' (BMW/ePaaS
  per-cookie vendor naming) + recognition of full legal-form names
  (Adform A/S, BMW AG, Adobe Systems Software Ireland Limited).
- cookie_duration_values: now matches 'Ablauf: 1 Jahr' / 'Speicherdauer:
  30 Tage' (BMW format) in addition to the legacy '<n> <unit>'.

New L1 + L2 checks for controller in cookie-policy:
- cookie_controller (L1): the cookie policy must name Verantwortlich(er)
- cookie_controller_address (L2): PLZ + Ort or address keywords
- cookie_controller_contact_or_link (L2): email/phone OR link back to
  Datenschutzerklärung (the practical equivalent — BMW does this)

New L2 checks (parented under opt_out):
- cookie_optout_links: detects per-provider opt-out URLs in the text
- cookie_privacy_policy_links: per-provider privacy-policy URLs

New service: cookie_link_validator.py
- extract_links(text): pulls all https?://… URLs that follow 'Opt-Out
  Link:' / 'Link zur Privacy Policy:' (deduped)
- validate_links(links): probes every URL concurrently (HEAD first, GET
  fallback for 405/403). 10 parallel, 8s per request, 60s batch cap.
  Returns reachable=True/False + status + final_url.
- build_check_items(): renders 2 CheckItems (opt-out + privacy-policy),
  each pass if ALL links 2xx/3xx, fail with up-to-5 broken-link examples.

Hook in _check_single: doc_type=='cookie' triggers the validator after
regex+MC checks. Recomputes correctness with the new L2 items.

This addresses two concrete BMW observations:
1. BMW's per-cookie structure (Name + Zweck + Ablauf, Gesetzt von: …,
   Opt-Out Link: …) now recognised → 'Konkrete Cookie-Namen aufgelistet'
   and 'Konkrete Speicherdauern' should pass.
2. Defective opt-out URLs surface as compliance findings rather than
   silently passing — Art. 7(3) DSGVO requires a working withdrawal
   path per provider.
2026-05-17 09:38:32 +02:00

171 lines
6.1 KiB
Python

"""
Cookie-Richtlinie Opt-Out and Privacy-Policy link validator.
Art. 7(3) DSGVO: "Der Widerruf der Einwilligung muss so einfach wie die
Erteilung sein". Per third-party provider in the cookie policy there must
be a working opt-out mechanism. A missing or broken link makes that
provider entry legally non-compliant.
This module extracts the URLs from the cookie-policy text and tests each
one via async HTTP (HEAD first, GET fallback). Returns structured findings
the route layer turns into CheckItems for the email + frontend report.
"""
from __future__ import annotations
import asyncio
import logging
import re
from typing import TypedDict
import httpx
logger = logging.getLogger(__name__)
# URL extraction patterns. Each captures the URL that follows the keyword.
_URL_RE = r"https?://[\w\-./?#&=:%~+@]+"
_OPTOUT_PATTERN = re.compile(
rf"opt[\-\s]?out[\-\s]?(?:link)?\s*[:\|]?\s*({_URL_RE})",
re.IGNORECASE,
)
_PRIVACY_PATTERN = re.compile(
rf"(?:link\s+zur?\s+(?:privacy[\-\s]?policy|datenschutz\w*)|privacy[\-\s]?policy)\s*[:\|]?\s*({_URL_RE})",
re.IGNORECASE,
)
# Concurrency + timeout budget. 10 parallel requests, 8s per request,
# whole batch capped at 60s — keeps the cookie check inside the existing
# 120s backend → consent-tester budget.
_MAX_CONCURRENT = 10
_PER_URL_TIMEOUT = 8.0
_BATCH_TIMEOUT = 60.0
class LinkCheck(TypedDict, total=False):
url: str
kind: str # "opt-out" | "privacy-policy"
status: int # 0 = unreachable
final_url: str
error: str
reachable: bool
def extract_links(text: str) -> list[LinkCheck]:
"""Pull all Opt-Out + Privacy-Policy URLs from a cookie-policy text.
Deduplicates by URL+kind. Strips trailing punctuation/quotes commonly
captured by greedy URL regex.
"""
found: dict[tuple[str, str], LinkCheck] = {}
for kind, pattern in (("opt-out", _OPTOUT_PATTERN),
("privacy-policy", _PRIVACY_PATTERN)):
for match in pattern.finditer(text):
url = match.group(1).rstrip(".,;:\"')(]").strip()
if not url.startswith(("http://", "https://")):
continue
key = (url, kind)
if key not in found:
found[key] = LinkCheck(url=url, kind=kind)
return list(found.values())
async def validate_links(links: list[LinkCheck]) -> list[LinkCheck]:
"""HTTP-probe each link concurrently. Adds status + reachable flag.
Uses HEAD first (fast), falls back to GET for servers that reject HEAD.
Accepts any 2xx/3xx as reachable; 4xx/5xx and timeouts as broken.
"""
if not links:
return []
sem = asyncio.Semaphore(_MAX_CONCURRENT)
async with httpx.AsyncClient(
timeout=_PER_URL_TIMEOUT,
follow_redirects=True,
headers={"User-Agent": "BreakPilot-LinkChecker/1.0"},
) as client:
async def probe(link: LinkCheck) -> LinkCheck:
async with sem:
try:
resp = await client.head(link["url"])
if resp.status_code in (405, 403):
# Some servers reject HEAD; try GET
resp = await client.get(link["url"])
link["status"] = resp.status_code
link["final_url"] = str(resp.url)
link["reachable"] = 200 <= resp.status_code < 400
except httpx.TimeoutException:
link["status"] = 0
link["error"] = "timeout"
link["reachable"] = False
except Exception as e:
link["status"] = 0
link["error"] = str(e)[:80]
link["reachable"] = False
return link
try:
results = await asyncio.wait_for(
asyncio.gather(*[probe(link) for link in links]),
timeout=_BATCH_TIMEOUT,
)
return list(results)
except asyncio.TimeoutError:
logger.warning(
"Cookie-link batch timeout after %.0fs — %d urls",
_BATCH_TIMEOUT, len(links),
)
# Best-effort: return whatever links got updated
return links
# ── CheckItem rendering ──────────────────────────────────────────────
def build_check_items(validated: list[LinkCheck]) -> list[dict]:
"""Turn validator results into compliance-check items (one per kind).
Always returns 2 items (opt-out + privacy-policy) so the report layout
is stable. Skipped if no links of that kind were extracted.
"""
items: list[dict] = []
for kind, label in (
("opt-out", "Opt-Out-Links der Drittanbieter erreichbar"),
("privacy-policy", "Privacy-Policy-Links der Drittanbieter erreichbar"),
):
of_kind = [l for l in validated if l.get("kind") == kind]
if not of_kind:
continue
total = len(of_kind)
ok = sum(1 for l in of_kind if l.get("reachable"))
broken = [l for l in of_kind if not l.get("reachable")]
all_pass = ok == total
hint = ""
matched = ""
if all_pass:
matched = f"{ok}/{total} Links erreichbar (HTTP 2xx/3xx)"
else:
broken_summary = ", ".join(
f"{l['url'][:60]} ({l.get('status') or l.get('error', '?')})"
for l in broken[:5]
)
hint = (
f"{len(broken)}/{total} Links sind defekt. Defekte "
f"Provider-Eintraege erfuellen Art. 7(3) DSGVO nicht — der "
f"Widerruf der Einwilligung ist fuer diese Anbieter unmoeglich. "
f"Beispiele: {broken_summary}"
)
items.append({
"id": f"cookie_links_{kind.replace('-', '_')}",
"label": label,
"passed": all_pass,
"severity": "MEDIUM" if kind == "opt-out" else "LOW",
"matched_text": matched,
"level": 2,
"parent": "opt_out",
"skipped": False,
"hint": hint,
})
return items