Files
breakpilot-compliance/backend-compliance/compliance/services/cookie_link_validator.py
T
Benjamin Admin 6d29191e9b fix(vvt): score INTERNAL/GROUP without opt-out/privacy penalty
User feedback after BMW test:
- 60 'BMW AG — XYZ' rows were rendered as ✗ for Opt-Out/Privacy and
  scored 38-52%. That's misleading: BMW processing for itself doesn't
  need a separate opt-out URL (cookie-banner is the consent
  mechanism) or a separate privacy policy (main DSI covers it).
- Title 'Anbieter' was wrong for 60 of 90 rows (internal services).

Three orthogonal fixes:

1. score_vendors becomes recipient_type aware:
   - INTERNAL/GROUP_COMPANY: opt_out_url, privacy_policy_url, country
     are NOT required (the user's main DSI + cookie-banner cover them).
     What IS required: name, purpose, cookies disclosed with name +
     expiry. Cookies-disclosure weight raised to 50 (was 15) so the
     VVT-relevant data is the score driver.
   - 'necessary' category: opt-out still skipped (§25 Abs. 2 TDDDG).
   - External (PROCESSOR/CONTROLLER): existing strict scoring stays.

2. _link_status_badge accepts na_label and renders a neutral em-dash
   with explanation tooltip instead of red ✗ when the column doesn't
   apply to that row. _render_vendor_row_full passes na_label based on
   recipient_type:
     - INTERNAL/GROUP -> 'Nicht erforderlich (eigene Verarbeitung)'
     - necessary       -> 'Nicht erforderlich (§25 Abs. 2 TDDDG)'

3. Header + summary clarify the split:
   - h3 changed to 'Verarbeitungstaetigkeiten und Empfaenger aus der
     Cookie-Richtlinie' (was 'Drittanbieter aus Cookie-Richtlinie').
   - Top line: '90 Verarbeitungen erfasst — 60 eigene + 30 externe
     Empfaenger'.
   - Disclaimer below: explains the INTERNAL/GROUP exemption so the
     reader understands why those rows don't show ✗ for missing URLs.
   - Section labels enriched with the relevant DSGVO article:
     'Eigene Verarbeitungstaetigkeiten — fuer das VVT (Art. 30)',
     'Auftragsverarbeiter — AVV erforderlich (Art. 28)',
     'Joint Controller — Vereinbarung pruefen (Art. 26)'.

Expected BMW result after fix: ~85% of the 60 BMW-AG rows jump from
~52% to 90-100% (the real issue, fehlende Cookies-Disclosure, stays
flagged). The only true findings remaining are external links that
return 4xx (e.g. Criteo 403, Teads 404).
2026-05-17 13:15:40 +02:00

324 lines
12 KiB
Python

"""
Cookie-Richtlinie Opt-Out and Privacy-Policy link validator.
Art. 7(3) DSGVO: "Der Widerruf der Einwilligung muss so einfach wie die
Erteilung sein". Per third-party provider in the cookie policy there must
be a working opt-out mechanism. A missing or broken link makes that
provider entry legally non-compliant.
This module extracts the URLs from the cookie-policy text and tests each
one via async HTTP (HEAD first, GET fallback). Returns structured findings
the route layer turns into CheckItems for the email + frontend report.
"""
from __future__ import annotations
import asyncio
import logging
import re
from typing import TypedDict
import httpx
logger = logging.getLogger(__name__)
# URL extraction patterns. Each captures the URL that follows the keyword.
_URL_RE = r"https?://[\w\-./?#&=:%~+@]+"
_OPTOUT_PATTERN = re.compile(
rf"opt[\-\s]?out[\-\s]?(?:link)?\s*[:\|]?\s*({_URL_RE})",
re.IGNORECASE,
)
_PRIVACY_PATTERN = re.compile(
rf"(?:link\s+zur?\s+(?:privacy[\-\s]?policy|datenschutz\w*)|privacy[\-\s]?policy)\s*[:\|]?\s*({_URL_RE})",
re.IGNORECASE,
)
# Concurrency + timeout budget. 10 parallel requests, 8s per request,
# whole batch capped at 60s — keeps the cookie check inside the existing
# 120s backend → consent-tester budget.
_MAX_CONCURRENT = 10
_PER_URL_TIMEOUT = 8.0
_BATCH_TIMEOUT = 60.0
class LinkCheck(TypedDict, total=False):
url: str
kind: str # "opt-out" | "privacy-policy"
status: int # 0 = unreachable
final_url: str
error: str
reachable: bool
def extract_links(text: str) -> list[LinkCheck]:
"""Pull all Opt-Out + Privacy-Policy URLs from a cookie-policy text.
Deduplicates by URL+kind. Strips trailing punctuation/quotes commonly
captured by greedy URL regex.
"""
found: dict[tuple[str, str], LinkCheck] = {}
for kind, pattern in (("opt-out", _OPTOUT_PATTERN),
("privacy-policy", _PRIVACY_PATTERN)):
for match in pattern.finditer(text):
url = match.group(1).rstrip(".,;:\"')(]").strip()
if not url.startswith(("http://", "https://")):
continue
key = (url, kind)
if key not in found:
found[key] = LinkCheck(url=url, kind=kind)
return list(found.values())
async def validate_links(links: list[LinkCheck]) -> list[LinkCheck]:
"""HTTP-probe each link concurrently. Adds status + reachable flag.
Uses HEAD first (fast), falls back to GET for servers that reject HEAD.
Accepts any 2xx/3xx as reachable; 4xx/5xx and timeouts as broken.
"""
if not links:
return []
sem = asyncio.Semaphore(_MAX_CONCURRENT)
async with httpx.AsyncClient(
timeout=_PER_URL_TIMEOUT,
follow_redirects=True,
headers={"User-Agent": "BreakPilot-LinkChecker/1.0"},
) as client:
async def probe(link: LinkCheck) -> LinkCheck:
async with sem:
try:
resp = await client.head(link["url"])
if resp.status_code in (405, 403):
# Some servers reject HEAD; try GET
resp = await client.get(link["url"])
link["status"] = resp.status_code
link["final_url"] = str(resp.url)
link["reachable"] = 200 <= resp.status_code < 400
except httpx.TimeoutException:
link["status"] = 0
link["error"] = "timeout"
link["reachable"] = False
except Exception as e:
link["status"] = 0
link["error"] = str(e)[:80]
link["reachable"] = False
return link
try:
results = await asyncio.wait_for(
asyncio.gather(*[probe(link) for link in links]),
timeout=_BATCH_TIMEOUT,
)
return list(results)
except asyncio.TimeoutError:
logger.warning(
"Cookie-link batch timeout after %.0fs — %d urls",
_BATCH_TIMEOUT, len(links),
)
# Best-effort: return whatever links got updated
return links
# ── Per-vendor link validation ──────────────────────────────────────
async def validate_vendor_urls(vendors: list[dict]) -> list[dict]:
"""Probe opt-out and privacy URLs of each vendor. Mutates each vendor:
vendor["opt_out_status"] = int (0 = unreachable, 2xx/3xx = ok)
vendor["opt_out_ok"] = bool
vendor["privacy_status"] = int
vendor["privacy_ok"] = bool
"""
if not vendors:
return vendors
# Flatten into one list of LinkCheck (with back-reference to vendor)
probes: list[tuple[dict, str, str]] = [] # (vendor, url, kind)
for v in vendors:
if v.get("opt_out_url"):
probes.append((v, v["opt_out_url"], "opt_out"))
if v.get("privacy_policy_url"):
probes.append((v, v["privacy_policy_url"], "privacy"))
if not probes:
return vendors
sem = asyncio.Semaphore(_MAX_CONCURRENT)
async with httpx.AsyncClient(
timeout=_PER_URL_TIMEOUT,
follow_redirects=True,
headers={"User-Agent": "BreakPilot-LinkChecker/1.0"},
) as client:
async def probe(vendor: dict, url: str, kind: str) -> None:
async with sem:
try:
resp = await client.head(url)
if resp.status_code in (405, 403):
resp = await client.get(url)
vendor[f"{kind}_status"] = resp.status_code
vendor[f"{kind}_ok"] = 200 <= resp.status_code < 400
except Exception as e:
vendor[f"{kind}_status"] = 0
vendor[f"{kind}_ok"] = False
vendor[f"{kind}_error"] = str(e)[:60]
try:
await asyncio.wait_for(
asyncio.gather(*[probe(v, u, k) for v, u, k in probes]),
timeout=_BATCH_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning("vendor-link batch timeout (%d probes)", len(probes))
return vendors
def score_vendors(vendors: list[dict]) -> list[dict]:
"""Compute per-vendor compliance score (0-100) and flags.
Scoring is recipient-type AND category aware. Two orthogonal axes
influence which fields are required:
recipient_type == INTERNAL / GROUP_COMPANY
Own processing — the user's consent + main DSI cover privacy +
opt-out for ALL of these. Per-row opt-out / privacy URLs are
NOT a compliance gap. What matters: VVT-relevante Fields
(purpose, cookies with names + expiry).
category == 'necessary' (§25 Abs. 2 TDDDG)
Technically necessary cookies don't need consent → no opt-out
required even for external processors.
For each non-applicable field we set flag '<field>_n_a' instead of
a penalty flag, so the report can render it neutrally.
"""
for v in vendors:
rtype = (v.get("recipient_type") or "OTHER").upper()
is_own = rtype in ("INTERNAL", "GROUP_COMPANY")
is_necessary = (v.get("category") or "").lower() in (
"necessary", "strictlynecessary",
)
opt_out_required = not is_own and not is_necessary
privacy_required = not is_own
country_required = not is_own
score = 0
max_score = 0
flags: list[str] = []
# Name (always required) — 20
max_score += 20
if v.get("name"):
score += 20
else:
flags.append("no_name")
# Purpose — 20
max_score += 20
if v.get("purpose"):
score += 20
else:
flags.append("no_purpose")
# Country — only for external processors / controllers
if country_required:
max_score += 10
if v.get("country"):
score += 10
else:
flags.append("no_country")
# Opt-Out URL — only when consent-based AND external
if opt_out_required:
max_score += 25
if not v.get("opt_out_url"):
flags.append("no_opt_out_url")
elif v.get("opt_out_ok") is False:
flags.append("broken_opt_out")
score += 5
else:
score += 25
# Privacy policy URL — required for external (own = via main DSI)
if privacy_required:
weight = 10 if is_necessary else 15
max_score += weight
if not v.get("privacy_policy_url"):
flags.append("no_privacy_url")
elif v.get("privacy_ok") is False:
flags.append("broken_privacy_url")
score += weight // 3
else:
score += weight
# Cookies disclosed (names + expiry) — required for ALL types
# (own processing too: BMW must list its own cookies for the VVT)
weight = 50 if is_own or is_necessary else 15
max_score += weight
cookies = v.get("cookies") or []
if cookies:
named = sum(1 for c in cookies if c.get("name"))
with_expiry = sum(1 for c in cookies if c.get("expiry"))
if named >= 1 and with_expiry >= 1:
score += weight
elif named >= 1:
score += weight // 2
flags.append("cookies_no_expiry")
else:
flags.append("cookies_no_names")
else:
flags.append("no_cookies_listed")
v["compliance_score"] = round(score / max_score * 100) if max_score else 0
v["compliance_flags"] = flags
return vendors
# ── CheckItem rendering ──────────────────────────────────────────────
def build_check_items(validated: list[LinkCheck]) -> list[dict]:
"""Turn validator results into compliance-check items (one per kind).
Always returns 2 items (opt-out + privacy-policy) so the report layout
is stable. Skipped if no links of that kind were extracted.
"""
items: list[dict] = []
for kind, label in (
("opt-out", "Opt-Out-Links der Drittanbieter erreichbar"),
("privacy-policy", "Privacy-Policy-Links der Drittanbieter erreichbar"),
):
of_kind = [l for l in validated if l.get("kind") == kind]
if not of_kind:
continue
total = len(of_kind)
ok = sum(1 for l in of_kind if l.get("reachable"))
broken = [l for l in of_kind if not l.get("reachable")]
all_pass = ok == total
hint = ""
matched = ""
if all_pass:
matched = f"{ok}/{total} Links erreichbar (HTTP 2xx/3xx)"
else:
broken_summary = ", ".join(
f"{l['url'][:60]} ({l.get('status') or l.get('error', '?')})"
for l in broken[:5]
)
hint = (
f"{len(broken)}/{total} Links sind defekt. Defekte "
f"Provider-Eintraege erfuellen Art. 7(3) DSGVO nicht — der "
f"Widerruf der Einwilligung ist fuer diese Anbieter unmoeglich. "
f"Beispiele: {broken_summary}"
)
items.append({
"id": f"cookie_links_{kind.replace('-', '_')}",
"label": label,
"passed": all_pass,
"severity": "MEDIUM" if kind == "opt-out" else "LOW",
"matched_text": matched,
"level": 2,
"parent": "opt_out",
"skipped": False,
"hint": hint,
})
return items