Files
breakpilot-compliance/backend-compliance/compliance/services/cookie_link_validator.py
T
Benjamin Admin 662327e8b4
CI / nodejs-build (push) Successful in 2m47s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / detect-changes (push) Successful in 10s
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
feat(compliance-check): MC-Classification + Embedding + Vendor-Redundanz + Action-Recipes + Borlabs-Features
Massiv-Update auf Basis BMW-Test-Iterationen (v1→v9):

Core Compliance-Check
- Sonnet check_type Klassifikation: text/process/review fuer alle 1874 MCs
  in compliance.doc_check_controls (script + Sidecar /data/mc_classification.db).
  rag_document_checker filtert auf check_type='text' fuer doc_check.
  Plus fits_doc_type-Audit (v2) + ui_only-Audit fuer DSA/E-Commerce-MCs in
  falscher doc_type-Schublade.
- scope_requires-Filter: biometric/ai_decision/child_targeting MCs werden
  per business_profile gefiltert (FRT skipped fuer BMW etc.).
- Embedding-Match (BGE-M3) als Phase-3 nach Regex-Match:
  Per-doc_type-Threshold-Override (impressum 0.50, dse/cookie 0.60),
  Short-Field-Rescue (15-Wort-Chunks) fuer Pflichtfelder im Impressum.
  Title+check_question als Embedding-Input fuer mehr Kontext.
- Cookie-Text-Routing: consent-tester gibt cmp_cookie_text aus dem
  CMP-Reconstruct zurueck, Backend bevorzugt das gegen DOM-Extraction
  wenn richer (BMW 1824 vs 600 Worte).

Vendor-Redundanz + EU-Alternativen + Cost-Saving
- vendor_redundancy.analyze() — funktionale Kategorisierung der CMP-Vendors,
  Detektion von Mehrfach-Anbietern pro Kategorie, EU-Alternative-Lookup
  (Matomo, IONOS, HERE, Friendly Captcha, Smart AdServer, ...).
- vendor_cost_estimator: Tier-Inferenz aus Cookie-Footprint (Cookie-Anzahl
  + Premium-Feature-Cookies + Third-Party-Quote → starter/professional/
  enterprise/premier).
- Self-Service-Werbung (Google/Meta/Pinterest/...) = 0 Lizenz-Kosten
  (nur Media-Spend, separat). DSP-Plattformen behalten enge Range.
- Tier-aware Saving-Range: bei Enterprise/Premier nutzen wir den
  oberen 40-100%-Band der Listpreise, nicht starter→premier.
- Multi-Function-Tools (Matomo Pro, SAP CX, IONOS Cloud, Userlike, Smart
  AdServer, HERE Maps, Vimeo Pro, LamaPoll) — ein Tool ersetzt mehrere
  Kategorien gleichzeitig.

Cookie-Wissens-DB + Funktionale Klassifikation
- cookie_knowledge_db: 50 kuratierte Top-Cookies (Google/Meta/Adobe/MS/...)
  mit vendor, exact_purpose, data_collected, IAB-TCF-IDs, reid_risk,
  schrems_ii_status, EuGH-Urteile, EU-Alternative.
- cookie_function_classifier: pro Cookie funktionale Rolle (tracking_id,
  ad_pixel, session_id, ab_test, csrf, ...) + blocking_impact.

Country-Inferenz aus Rechtsform
- cookie_link_validator: Country-Field wird aus Vendor-Name abgeleitet
  (A/S=DK, GmbH=DE, Inc=US, B.V.=NL, ...) plus Vendor-Lookup-Table.
  Reduziert false-positive no_country-Flags bei eindeutig-EU-Vendors
  (Adform DK, Pinterest IE).

Action-Recipes + Doc-Anchor-Locator
- finding_action_recipes: pro Finding-Typ (no_cookies_listed, no_country,
  broken_opt_out, "Auftragsverarbeiter erwaehnen", "Art. 22 Profiling",
  ...) eine strukturierte Anweisung mit what/why/fix_text/where/example.
  Zum 1:1-Einfuegen in Kunden-Dokumente.
- doc_anchor_locator: Embedding-basiert (BGE-M3 cosine) — sucht den
  passenden Absatz im existierenden Kundendokument fuer jeden Finding.
  Per-Run Thread-Local-Cache. Fallback: keyword-Match.
- Email-Rendering integriert Recipe + Anchor pro Doc-Pruefungs-Fail
  + Vendor-Flag-Liste mit aufklappbarer Action-Liste.
- Score-Erklaerung pro Vendor-Zeile (3/5-Untertitel + Tooltip).

Migration-Pipeline (Compliance-Check -> Customer Banner/Documents)
- migration_to_banner.py: Vendor-Liste -> CookieBannerConfig mit
  4 Kategorien + Review-Flags.
- migration_to_document.py: Vendor-Liste -> Cookie-Policy + VVT-Register
  + Privacy-Policy-Pre-Fills.
- agent_migration_routes: 3 Preview-Endpoints (banner-preview,
  document-preview, summary). Persistierung der cmp_vendors in
  /data/compliance_audits.db check_payloads-Tabelle.

Borlabs-Parity Cookie-Banner-Features
- Consent-Historie im Banner: window.bpShowConsentHistory() + localStorage.
- Content-Blocker: cookie-banner-content-blocker.ts — YouTube/Maps/Video
  Placeholder bis Einwilligung.
- Google Consent Mode v2 erweitert: wait_for_update + region=EEA/CH/GB.
- Consent-Log Export (CSV/JSON) per einwilligungen_export_routes.

Bug-Fixes
- canonical_control_routes: _jsonish-Helper fuer string-typed jsonb,
  similar-controls-Endpoint mit _has_embedding_col()-Cache (kein 500 mehr).
- Control-Library Frontend: defensive .map-Coercer in 2 Detail-Views.
- Embedding-Service-Batching (32er Batches statt 165 in einem Call).
- KeyError 'control_id' in MC-Result-Aggregation (defensive .get).
- Master-Controls-Klick-Through von /sdk/master-controls auf
  /sdk/control-library?control=<id> mit URL-Param-Auto-Open.
- Dockerfile: /data pre-chowned auf appuser (Audit-DB-Schreibrecht).
- Cookie-Text-Routing-Bug (cmp_reconstructed > DOM-extraction).
- doc_type-aware MC-Filter (statt all-text-MCs).
- Master-Contract-Dedup (60 BMW-Internal-Eintraege = 1 Adobe-Vertrag).
- A3-v2-Audit hat 24 UI-Sprache-MCs als 'process' reklassifiziert.

Tests
- test_migration_mappers.py (9 Tests)
- test_migration_endpoints.py (4 Tests)

Skripte (one-shot)
- classify_mc_check_type.py (v1) + _v2 (PK=control_id,doc_type)
- audit_mc_doctype_fit.py (v1 fits) + _v2 (ui_only + scope_requires)

BMW-Run-Bilanz v1 (broken) -> v9 (alle Fixes):
  DSE     7,5% -> 81-83%
  Impressum 4%   -> 100% (6 echte MCs alle erfuellt)
  Cookie  0%    -> 79-83% (CMP-Text-Routing + Embedding)
  Plus: 10 Konsolidierungs-Kategorien, geschaetzte Saving 200k-3M / Jahr
  Plus: Action-Recipes + Doc-Anchors fuer jeden Fail

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 18:30:08 +02:00

480 lines
18 KiB
Python

"""
Cookie-Richtlinie Opt-Out and Privacy-Policy link validator.
Art. 7(3) DSGVO: "Der Widerruf der Einwilligung muss so einfach wie die
Erteilung sein". Per third-party provider in the cookie policy there must
be a working opt-out mechanism. A missing or broken link makes that
provider entry legally non-compliant.
This module extracts the URLs from the cookie-policy text and tests each
one via async HTTP (HEAD first, GET fallback). Returns structured findings
the route layer turns into CheckItems for the email + frontend report.
"""
from __future__ import annotations
import asyncio
import logging
import re
from typing import TypedDict
import httpx
logger = logging.getLogger(__name__)
# URL extraction patterns. Each captures the URL that follows the keyword.
_URL_RE = r"https?://[\w\-./?#&=:%~+@]+"
_OPTOUT_PATTERN = re.compile(
rf"opt[\-\s]?out[\-\s]?(?:link)?\s*[:\|]?\s*({_URL_RE})",
re.IGNORECASE,
)
_PRIVACY_PATTERN = re.compile(
rf"(?:link\s+zur?\s+(?:privacy[\-\s]?policy|datenschutz\w*)|privacy[\-\s]?policy)\s*[:\|]?\s*({_URL_RE})",
re.IGNORECASE,
)
# Concurrency + timeout budget. 10 parallel requests, 8s per request,
# whole batch capped at 60s — keeps the cookie check inside the existing
# 120s backend → consent-tester budget.
_MAX_CONCURRENT = 10
_PER_URL_TIMEOUT = 8.0
_BATCH_TIMEOUT = 60.0
class LinkCheck(TypedDict, total=False):
url: str
kind: str # "opt-out" | "privacy-policy"
status: int # 0 = unreachable
final_url: str
error: str
reachable: bool
def extract_links(text: str) -> list[LinkCheck]:
"""Pull all Opt-Out + Privacy-Policy URLs from a cookie-policy text.
Deduplicates by URL+kind. Strips trailing punctuation/quotes commonly
captured by greedy URL regex.
"""
found: dict[tuple[str, str], LinkCheck] = {}
for kind, pattern in (("opt-out", _OPTOUT_PATTERN),
("privacy-policy", _PRIVACY_PATTERN)):
for match in pattern.finditer(text):
url = match.group(1).rstrip(".,;:\"')(]").strip()
if not url.startswith(("http://", "https://")):
continue
key = (url, kind)
if key not in found:
found[key] = LinkCheck(url=url, kind=kind)
return list(found.values())
async def validate_links(links: list[LinkCheck]) -> list[LinkCheck]:
"""HTTP-probe each link concurrently. Adds status + reachable flag.
Uses HEAD first (fast), falls back to GET for servers that reject HEAD.
Accepts any 2xx/3xx as reachable; 4xx/5xx and timeouts as broken.
"""
if not links:
return []
sem = asyncio.Semaphore(_MAX_CONCURRENT)
async with httpx.AsyncClient(
timeout=_PER_URL_TIMEOUT,
follow_redirects=True,
headers={"User-Agent": "BreakPilot-LinkChecker/1.0"},
) as client:
async def probe(link: LinkCheck) -> LinkCheck:
async with sem:
try:
resp = await client.head(link["url"])
if resp.status_code in (405, 403):
# Some servers reject HEAD; try GET
resp = await client.get(link["url"])
link["status"] = resp.status_code
link["final_url"] = str(resp.url)
link["reachable"] = 200 <= resp.status_code < 400
except httpx.TimeoutException:
link["status"] = 0
link["error"] = "timeout"
link["reachable"] = False
except Exception as e:
link["status"] = 0
link["error"] = str(e)[:80]
link["reachable"] = False
return link
try:
results = await asyncio.wait_for(
asyncio.gather(*[probe(link) for link in links]),
timeout=_BATCH_TIMEOUT,
)
return list(results)
except asyncio.TimeoutError:
logger.warning(
"Cookie-link batch timeout after %.0fs — %d urls",
_BATCH_TIMEOUT, len(links),
)
# Best-effort: return whatever links got updated
return links
# ── Per-vendor link validation ──────────────────────────────────────
async def validate_vendor_urls(vendors: list[dict]) -> list[dict]:
"""Probe opt-out and privacy URLs of each vendor. Mutates each vendor:
vendor["opt_out_status"] = int (0 = unreachable, 2xx/3xx = ok)
vendor["opt_out_ok"] = bool
vendor["privacy_status"] = int
vendor["privacy_ok"] = bool
"""
if not vendors:
return vendors
# Flatten into one list of LinkCheck (with back-reference to vendor)
probes: list[tuple[dict, str, str]] = [] # (vendor, url, kind)
for v in vendors:
if v.get("opt_out_url"):
probes.append((v, v["opt_out_url"], "opt_out"))
if v.get("privacy_policy_url"):
probes.append((v, v["privacy_policy_url"], "privacy"))
if not probes:
return vendors
sem = asyncio.Semaphore(_MAX_CONCURRENT)
async with httpx.AsyncClient(
timeout=_PER_URL_TIMEOUT,
follow_redirects=True,
headers={"User-Agent": "BreakPilot-LinkChecker/1.0"},
) as client:
async def probe(vendor: dict, url: str, kind: str) -> None:
async with sem:
try:
resp = await client.head(url)
if resp.status_code in (405, 403):
resp = await client.get(url)
vendor[f"{kind}_status"] = resp.status_code
vendor[f"{kind}_ok"] = 200 <= resp.status_code < 400
except Exception as e:
vendor[f"{kind}_status"] = 0
vendor[f"{kind}_ok"] = False
vendor[f"{kind}_error"] = str(e)[:60]
try:
await asyncio.wait_for(
asyncio.gather(*[probe(v, u, k) for v, u, k in probes]),
timeout=_BATCH_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning("vendor-link batch timeout (%d probes)", len(probes))
return vendors
def score_vendors(vendors: list[dict]) -> list[dict]:
"""Compute per-vendor compliance score (0-100) and flags.
Scoring is recipient-type AND category aware. Two orthogonal axes
influence which fields are required:
recipient_type == INTERNAL / GROUP_COMPANY
Own processing — the user's consent + main DSI cover privacy +
opt-out for ALL of these. Per-row opt-out / privacy URLs are
NOT a compliance gap. What matters: VVT-relevante Fields
(purpose, cookies with names + expiry).
category == 'necessary' (§25 Abs. 2 TDDDG)
Technically necessary cookies don't need consent → no opt-out
required even for external processors.
For each non-applicable field we set flag '<field>_n_a' instead of
a penalty flag, so the report can render it neutrally.
"""
for v in vendors:
rtype = (v.get("recipient_type") or "OTHER").upper()
is_own = rtype in ("INTERNAL", "GROUP_COMPANY")
is_necessary = (v.get("category") or "").lower() in (
"necessary", "strictlynecessary",
)
opt_out_required = not is_own and not is_necessary
privacy_required = not is_own
country_required = not is_own
score = 0
max_score = 0
flags: list[str] = []
# Name (always required) — 20
max_score += 20
if v.get("name"):
score += 20
else:
flags.append("no_name")
# Purpose — 20
max_score += 20
if v.get("purpose"):
score += 20
else:
flags.append("no_purpose")
# Country — only for external processors / controllers
# Falls country leer ist, ableiten aus Rechtsform-Suffix im Namen.
if country_required:
max_score += 10
if v.get("country"):
score += 10
elif _country_from_name(v.get("name", "")):
inferred = _country_from_name(v.get("name", ""))
v["country"] = inferred
v["country_inferred"] = True
score += 10
else:
flags.append("no_country")
# Opt-Out URL — only when consent-based AND external
if opt_out_required:
max_score += 25
if not v.get("opt_out_url"):
flags.append("no_opt_out_url")
elif v.get("opt_out_ok") is False:
flags.append("broken_opt_out")
score += 5
else:
score += 25
# Privacy policy URL — required for external (own = via main DSI)
if privacy_required:
weight = 10 if is_necessary else 15
max_score += weight
if not v.get("privacy_policy_url"):
flags.append("no_privacy_url")
elif v.get("privacy_ok") is False:
flags.append("broken_privacy_url")
score += weight // 3
else:
score += weight
# Cookies disclosed (names + expiry) — required for ALL types
# (own processing too: BMW must list its own cookies for the VVT)
weight = 50 if is_own or is_necessary else 15
max_score += weight
cookies = v.get("cookies") or []
if cookies:
named = sum(1 for c in cookies if c.get("name"))
with_expiry = sum(1 for c in cookies if c.get("expiry"))
if named >= 1 and with_expiry >= 1:
score += weight
elif named >= 1:
score += weight // 2
flags.append("cookies_no_expiry")
else:
flags.append("cookies_no_names")
else:
flags.append("no_cookies_listed")
v["compliance_score"] = round(score / max_score * 100) if max_score else 0
v["compliance_flags"] = flags
return vendors
# ── CheckItem rendering ──────────────────────────────────────────────
def build_check_items(validated: list[LinkCheck]) -> list[dict]:
"""Turn validator results into compliance-check items (one per kind).
Always returns 2 items (opt-out + privacy-policy) so the report layout
is stable. Skipped if no links of that kind were extracted.
"""
items: list[dict] = []
for kind, label in (
("opt-out", "Opt-Out-Links der Drittanbieter erreichbar"),
("privacy-policy", "Privacy-Policy-Links der Drittanbieter erreichbar"),
):
of_kind = [l for l in validated if l.get("kind") == kind]
if not of_kind:
continue
total = len(of_kind)
ok = sum(1 for l in of_kind if l.get("reachable"))
broken = [l for l in of_kind if not l.get("reachable")]
all_pass = ok == total
hint = ""
matched = ""
if all_pass:
matched = f"{ok}/{total} Links erreichbar (HTTP 2xx/3xx)"
else:
broken_summary = ", ".join(
f"{l['url'][:60]} ({l.get('status') or l.get('error', '?')})"
for l in broken[:5]
)
hint = (
f"{len(broken)}/{total} Links sind defekt. Defekte "
f"Provider-Eintraege erfuellen Art. 7(3) DSGVO nicht — der "
f"Widerruf der Einwilligung ist fuer diese Anbieter unmoeglich. "
f"Beispiele: {broken_summary}"
)
items.append({
"id": f"cookie_links_{kind.replace('-', '_')}",
"label": label,
"passed": all_pass,
"severity": "MEDIUM" if kind == "opt-out" else "LOW",
"matched_text": matched,
"level": 2,
"parent": "opt_out",
"skipped": False,
"hint": hint,
})
return items
# ─── Country-Inferenz aus Rechtsform-Suffix ────────────────────────
#
# Wenn ein Vendor das "country"-Feld leer hat, koennen wir es oft aus
# dem Firmen-Suffix ableiten:
# Adform A/S → DK (Dänemark, Aktieselskab)
# Pinterest Europe Ltd. → IE (Irland, Limited)
# Salesforce Inc. → US (Incorporated)
# Adobe ... Ireland Limited → IE
# Genesys ... B.V. → NL (Niederlande, Besloten Vennootschap)
# Equativ S.A. → FR (Société Anonyme)
# SAP SE → DE (Societas Europaea — meist DE-eingetragen)
#
# Kombi-Strategie:
# 1) Suffix-Pattern
# 2) Laendername im Firmen-Namen ('Ireland', 'Deutschland')
# 3) Specific Vendor (Google Inc / Meta Platforms Ireland Ltd → vendor-specific)
import re as _re
_SUFFIX_COUNTRY: list[tuple[str, str]] = [
# Pattern (am Wort-Ende oder vor weiteren Tokens) → ISO-Code
(r"\bA/S\b", "DK"), # Aktieselskab
(r"\bApS\b", "DK"), # Anpartsselskab
(r"\bAB\b", "SE"), # Aktiebolag
(r"\bAS\b(?!\w)", "NO"), # Aksjeselskap
(r"\bOy\b", "FI"), # Osakeyhtiö
(r"\bAG\b(?!\w)", "DE"), # auch CH/AT moeglich, default DE
(r"\bGmbH\b", "DE"),
(r"\bUG\b", "DE"),
(r"\beG\b", "DE"),
(r"\bKG\b", "DE"),
(r"\bOHG\b", "DE"),
(r"\bSE\b", "DE"), # Societas Europaea — pruefen ob SAP SE etc.
(r"\bS\.A\.\b", "FR"), # France / SE / ES
(r"\bSAS\b", "FR"),
(r"\bS\.A\.S\.\b", "FR"),
(r"\bSARL\b", "FR"),
(r"\bS\.r\.l\.\b", "IT"),
(r"\bS\.p\.A\.\b", "IT"),
(r"\bSpA\b", "IT"),
(r"\bB\.V\.\b", "NL"),
(r"\bN\.V\.\b", "NL"),
(r"\bSL\b", "ES"),
(r"\bS\.A\.\sde C\.V\.\b", "MX"),
(r"\bd\.o\.o\.\b", "SI"), # Slowenien
(r"\bd\.d\.\b", "HR"), # Kroatien
(r"\bz\s?o\.o\.\b", "PL"),
(r"\bInc\.?\b", "US"),
(r"\bIncorporated\b", "US"),
(r"\bCorp\.?\b", "US"),
(r"\bCorporation\b", "US"),
(r"\bLLC\b", "US"),
(r"\bL\.L\.C\.\b", "US"),
(r"\bLtd\.?\b", "GB"), # UK Limited, default
(r"\bLimited\b", "GB"),
(r"\bPLC\b", "GB"),
(r"\bPty\b", "AU"),
(r"\bK\.K\.\b", "JP"), # Kabushiki-Kaisha
(r"\bPte\.?\sLtd\.?\b", "SG"),
]
# Country-Namen im Firmen-Namen (z.B. "Adobe Systems Software Ireland Limited")
_COUNTRY_NAME_TOKENS: list[tuple[str, str]] = [
("ireland", "IE"),
("deutschland", "DE"),
("germany", "DE"),
("netherlands", "NL"),
("france", "FR"),
("united kingdom", "GB"),
("uk", "GB"),
("usa", "US"),
("united states", "US"),
("austria", "AT"),
("oesterreich", "AT"),
("schweiz", "CH"),
("switzerland", "CH"),
("luxembourg", "LU"),
("luxemburg", "LU"),
("denmark", "DK"),
("daenemark", "DK"),
("sweden", "SE"),
("schweden", "SE"),
("norway", "NO"),
("norwegen", "NO"),
("finland", "FI"),
("finnland", "FI"),
]
# Bekannte Vendors mit eindeutigem Sitz (override)
_KNOWN_VENDOR_COUNTRY: dict[str, str] = {
"google inc": "US",
"google llc": "US",
"google ireland": "IE",
"meta platforms ireland": "IE",
"facebook ireland": "IE",
"amazon.com inc": "US",
"amazon web services": "US",
"amazon web services inc": "US",
"linkedin inc": "US",
"salesforce inc": "US",
"salesforce.com": "US",
"outbrain inc": "US",
"taboola inc": "US",
"pinterest europe ltd": "IE",
"intuition machines inc": "US",
"akamai technologies inc": "US",
"criteo s.a": "FR",
"criteo sa": "FR",
"adform a/s": "DK",
"speedcurve limited": "GB",
"longtail ad solutions": "US",
"genesys cloud services b.v": "NL",
"qualtrics": "US",
"teads sa": "FR",
"teads s.a": "FR",
"salesviewer gmbh": "DE",
"baqend gmbh": "DE",
"zenweshare sas": "FR",
"nayoki gmbh": "DE",
"psyma": "DE",
"matomo": "NZ", # InnoCraft NZ aber EU-hostbar
"adobe systems software ireland": "IE",
"microsoft corporation": "US",
"microsoft corp": "US",
}
def _country_from_name(vendor_name: str) -> str:
"""Best-effort: ISO-2 Country-Code aus dem Vendor-Namen ableiten."""
if not vendor_name:
return ""
# Vendor-Namen sind oft "<Firma> — <Tool>" — nur Firmen-Teil betrachten
firm = vendor_name.split("")[0].strip()
firm_l = firm.lower()
# 1) Known vendor lookup (most specific)
for k, v in _KNOWN_VENDOR_COUNTRY.items():
if k in firm_l:
return v
# 2) Country-Name im Firmen-Namen
for token, code in _COUNTRY_NAME_TOKENS:
if token in firm_l:
return code
# 3) Rechtsform-Suffix
for pattern, code in _SUFFIX_COUNTRY:
if _re.search(pattern, firm):
return code
return ""