662327e8b4
CI / nodejs-build (push) Successful in 2m47s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / detect-changes (push) Successful in 10s
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Massiv-Update auf Basis BMW-Test-Iterationen (v1→v9): Core Compliance-Check - Sonnet check_type Klassifikation: text/process/review fuer alle 1874 MCs in compliance.doc_check_controls (script + Sidecar /data/mc_classification.db). rag_document_checker filtert auf check_type='text' fuer doc_check. Plus fits_doc_type-Audit (v2) + ui_only-Audit fuer DSA/E-Commerce-MCs in falscher doc_type-Schublade. - scope_requires-Filter: biometric/ai_decision/child_targeting MCs werden per business_profile gefiltert (FRT skipped fuer BMW etc.). - Embedding-Match (BGE-M3) als Phase-3 nach Regex-Match: Per-doc_type-Threshold-Override (impressum 0.50, dse/cookie 0.60), Short-Field-Rescue (15-Wort-Chunks) fuer Pflichtfelder im Impressum. Title+check_question als Embedding-Input fuer mehr Kontext. - Cookie-Text-Routing: consent-tester gibt cmp_cookie_text aus dem CMP-Reconstruct zurueck, Backend bevorzugt das gegen DOM-Extraction wenn richer (BMW 1824 vs 600 Worte). Vendor-Redundanz + EU-Alternativen + Cost-Saving - vendor_redundancy.analyze() — funktionale Kategorisierung der CMP-Vendors, Detektion von Mehrfach-Anbietern pro Kategorie, EU-Alternative-Lookup (Matomo, IONOS, HERE, Friendly Captcha, Smart AdServer, ...). - vendor_cost_estimator: Tier-Inferenz aus Cookie-Footprint (Cookie-Anzahl + Premium-Feature-Cookies + Third-Party-Quote → starter/professional/ enterprise/premier). - Self-Service-Werbung (Google/Meta/Pinterest/...) = 0 Lizenz-Kosten (nur Media-Spend, separat). DSP-Plattformen behalten enge Range. - Tier-aware Saving-Range: bei Enterprise/Premier nutzen wir den oberen 40-100%-Band der Listpreise, nicht starter→premier. - Multi-Function-Tools (Matomo Pro, SAP CX, IONOS Cloud, Userlike, Smart AdServer, HERE Maps, Vimeo Pro, LamaPoll) — ein Tool ersetzt mehrere Kategorien gleichzeitig. Cookie-Wissens-DB + Funktionale Klassifikation - cookie_knowledge_db: 50 kuratierte Top-Cookies (Google/Meta/Adobe/MS/...) mit vendor, exact_purpose, data_collected, IAB-TCF-IDs, reid_risk, schrems_ii_status, EuGH-Urteile, EU-Alternative. - cookie_function_classifier: pro Cookie funktionale Rolle (tracking_id, ad_pixel, session_id, ab_test, csrf, ...) + blocking_impact. Country-Inferenz aus Rechtsform - cookie_link_validator: Country-Field wird aus Vendor-Name abgeleitet (A/S=DK, GmbH=DE, Inc=US, B.V.=NL, ...) plus Vendor-Lookup-Table. Reduziert false-positive no_country-Flags bei eindeutig-EU-Vendors (Adform DK, Pinterest IE). Action-Recipes + Doc-Anchor-Locator - finding_action_recipes: pro Finding-Typ (no_cookies_listed, no_country, broken_opt_out, "Auftragsverarbeiter erwaehnen", "Art. 22 Profiling", ...) eine strukturierte Anweisung mit what/why/fix_text/where/example. Zum 1:1-Einfuegen in Kunden-Dokumente. - doc_anchor_locator: Embedding-basiert (BGE-M3 cosine) — sucht den passenden Absatz im existierenden Kundendokument fuer jeden Finding. Per-Run Thread-Local-Cache. Fallback: keyword-Match. - Email-Rendering integriert Recipe + Anchor pro Doc-Pruefungs-Fail + Vendor-Flag-Liste mit aufklappbarer Action-Liste. - Score-Erklaerung pro Vendor-Zeile (3/5-Untertitel + Tooltip). Migration-Pipeline (Compliance-Check -> Customer Banner/Documents) - migration_to_banner.py: Vendor-Liste -> CookieBannerConfig mit 4 Kategorien + Review-Flags. - migration_to_document.py: Vendor-Liste -> Cookie-Policy + VVT-Register + Privacy-Policy-Pre-Fills. - agent_migration_routes: 3 Preview-Endpoints (banner-preview, document-preview, summary). Persistierung der cmp_vendors in /data/compliance_audits.db check_payloads-Tabelle. Borlabs-Parity Cookie-Banner-Features - Consent-Historie im Banner: window.bpShowConsentHistory() + localStorage. - Content-Blocker: cookie-banner-content-blocker.ts — YouTube/Maps/Video Placeholder bis Einwilligung. - Google Consent Mode v2 erweitert: wait_for_update + region=EEA/CH/GB. - Consent-Log Export (CSV/JSON) per einwilligungen_export_routes. Bug-Fixes - canonical_control_routes: _jsonish-Helper fuer string-typed jsonb, similar-controls-Endpoint mit _has_embedding_col()-Cache (kein 500 mehr). - Control-Library Frontend: defensive .map-Coercer in 2 Detail-Views. - Embedding-Service-Batching (32er Batches statt 165 in einem Call). - KeyError 'control_id' in MC-Result-Aggregation (defensive .get). - Master-Controls-Klick-Through von /sdk/master-controls auf /sdk/control-library?control=<id> mit URL-Param-Auto-Open. - Dockerfile: /data pre-chowned auf appuser (Audit-DB-Schreibrecht). - Cookie-Text-Routing-Bug (cmp_reconstructed > DOM-extraction). - doc_type-aware MC-Filter (statt all-text-MCs). - Master-Contract-Dedup (60 BMW-Internal-Eintraege = 1 Adobe-Vertrag). - A3-v2-Audit hat 24 UI-Sprache-MCs als 'process' reklassifiziert. Tests - test_migration_mappers.py (9 Tests) - test_migration_endpoints.py (4 Tests) Skripte (one-shot) - classify_mc_check_type.py (v1) + _v2 (PK=control_id,doc_type) - audit_mc_doctype_fit.py (v1 fits) + _v2 (ui_only + scope_requires) BMW-Run-Bilanz v1 (broken) -> v9 (alle Fixes): DSE 7,5% -> 81-83% Impressum 4% -> 100% (6 echte MCs alle erfuellt) Cookie 0% -> 79-83% (CMP-Text-Routing + Embedding) Plus: 10 Konsolidierungs-Kategorien, geschaetzte Saving 200k-3M / Jahr Plus: Action-Recipes + Doc-Anchors fuer jeden Fail Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
480 lines
18 KiB
Python
480 lines
18 KiB
Python
"""
|
|
Cookie-Richtlinie Opt-Out and Privacy-Policy link validator.
|
|
|
|
Art. 7(3) DSGVO: "Der Widerruf der Einwilligung muss so einfach wie die
|
|
Erteilung sein". Per third-party provider in the cookie policy there must
|
|
be a working opt-out mechanism. A missing or broken link makes that
|
|
provider entry legally non-compliant.
|
|
|
|
This module extracts the URLs from the cookie-policy text and tests each
|
|
one via async HTTP (HEAD first, GET fallback). Returns structured findings
|
|
the route layer turns into CheckItems for the email + frontend report.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from typing import TypedDict
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# URL extraction patterns. Each captures the URL that follows the keyword.
|
|
_URL_RE = r"https?://[\w\-./?#&=:%~+@]+"
|
|
_OPTOUT_PATTERN = re.compile(
|
|
rf"opt[\-\s]?out[\-\s]?(?:link)?\s*[:\|]?\s*({_URL_RE})",
|
|
re.IGNORECASE,
|
|
)
|
|
_PRIVACY_PATTERN = re.compile(
|
|
rf"(?:link\s+zur?\s+(?:privacy[\-\s]?policy|datenschutz\w*)|privacy[\-\s]?policy)\s*[:\|]?\s*({_URL_RE})",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Concurrency + timeout budget. 10 parallel requests, 8s per request,
|
|
# whole batch capped at 60s — keeps the cookie check inside the existing
|
|
# 120s backend → consent-tester budget.
|
|
_MAX_CONCURRENT = 10
|
|
_PER_URL_TIMEOUT = 8.0
|
|
_BATCH_TIMEOUT = 60.0
|
|
|
|
|
|
class LinkCheck(TypedDict, total=False):
|
|
url: str
|
|
kind: str # "opt-out" | "privacy-policy"
|
|
status: int # 0 = unreachable
|
|
final_url: str
|
|
error: str
|
|
reachable: bool
|
|
|
|
|
|
def extract_links(text: str) -> list[LinkCheck]:
|
|
"""Pull all Opt-Out + Privacy-Policy URLs from a cookie-policy text.
|
|
|
|
Deduplicates by URL+kind. Strips trailing punctuation/quotes commonly
|
|
captured by greedy URL regex.
|
|
"""
|
|
found: dict[tuple[str, str], LinkCheck] = {}
|
|
for kind, pattern in (("opt-out", _OPTOUT_PATTERN),
|
|
("privacy-policy", _PRIVACY_PATTERN)):
|
|
for match in pattern.finditer(text):
|
|
url = match.group(1).rstrip(".,;:\"')(]").strip()
|
|
if not url.startswith(("http://", "https://")):
|
|
continue
|
|
key = (url, kind)
|
|
if key not in found:
|
|
found[key] = LinkCheck(url=url, kind=kind)
|
|
return list(found.values())
|
|
|
|
|
|
async def validate_links(links: list[LinkCheck]) -> list[LinkCheck]:
|
|
"""HTTP-probe each link concurrently. Adds status + reachable flag.
|
|
|
|
Uses HEAD first (fast), falls back to GET for servers that reject HEAD.
|
|
Accepts any 2xx/3xx as reachable; 4xx/5xx and timeouts as broken.
|
|
"""
|
|
if not links:
|
|
return []
|
|
sem = asyncio.Semaphore(_MAX_CONCURRENT)
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=_PER_URL_TIMEOUT,
|
|
follow_redirects=True,
|
|
headers={"User-Agent": "BreakPilot-LinkChecker/1.0"},
|
|
) as client:
|
|
async def probe(link: LinkCheck) -> LinkCheck:
|
|
async with sem:
|
|
try:
|
|
resp = await client.head(link["url"])
|
|
if resp.status_code in (405, 403):
|
|
# Some servers reject HEAD; try GET
|
|
resp = await client.get(link["url"])
|
|
link["status"] = resp.status_code
|
|
link["final_url"] = str(resp.url)
|
|
link["reachable"] = 200 <= resp.status_code < 400
|
|
except httpx.TimeoutException:
|
|
link["status"] = 0
|
|
link["error"] = "timeout"
|
|
link["reachable"] = False
|
|
except Exception as e:
|
|
link["status"] = 0
|
|
link["error"] = str(e)[:80]
|
|
link["reachable"] = False
|
|
return link
|
|
|
|
try:
|
|
results = await asyncio.wait_for(
|
|
asyncio.gather(*[probe(link) for link in links]),
|
|
timeout=_BATCH_TIMEOUT,
|
|
)
|
|
return list(results)
|
|
except asyncio.TimeoutError:
|
|
logger.warning(
|
|
"Cookie-link batch timeout after %.0fs — %d urls",
|
|
_BATCH_TIMEOUT, len(links),
|
|
)
|
|
# Best-effort: return whatever links got updated
|
|
return links
|
|
|
|
|
|
# ── Per-vendor link validation ──────────────────────────────────────
|
|
|
|
async def validate_vendor_urls(vendors: list[dict]) -> list[dict]:
|
|
"""Probe opt-out and privacy URLs of each vendor. Mutates each vendor:
|
|
|
|
vendor["opt_out_status"] = int (0 = unreachable, 2xx/3xx = ok)
|
|
vendor["opt_out_ok"] = bool
|
|
vendor["privacy_status"] = int
|
|
vendor["privacy_ok"] = bool
|
|
"""
|
|
if not vendors:
|
|
return vendors
|
|
|
|
# Flatten into one list of LinkCheck (with back-reference to vendor)
|
|
probes: list[tuple[dict, str, str]] = [] # (vendor, url, kind)
|
|
for v in vendors:
|
|
if v.get("opt_out_url"):
|
|
probes.append((v, v["opt_out_url"], "opt_out"))
|
|
if v.get("privacy_policy_url"):
|
|
probes.append((v, v["privacy_policy_url"], "privacy"))
|
|
|
|
if not probes:
|
|
return vendors
|
|
|
|
sem = asyncio.Semaphore(_MAX_CONCURRENT)
|
|
async with httpx.AsyncClient(
|
|
timeout=_PER_URL_TIMEOUT,
|
|
follow_redirects=True,
|
|
headers={"User-Agent": "BreakPilot-LinkChecker/1.0"},
|
|
) as client:
|
|
async def probe(vendor: dict, url: str, kind: str) -> None:
|
|
async with sem:
|
|
try:
|
|
resp = await client.head(url)
|
|
if resp.status_code in (405, 403):
|
|
resp = await client.get(url)
|
|
vendor[f"{kind}_status"] = resp.status_code
|
|
vendor[f"{kind}_ok"] = 200 <= resp.status_code < 400
|
|
except Exception as e:
|
|
vendor[f"{kind}_status"] = 0
|
|
vendor[f"{kind}_ok"] = False
|
|
vendor[f"{kind}_error"] = str(e)[:60]
|
|
try:
|
|
await asyncio.wait_for(
|
|
asyncio.gather(*[probe(v, u, k) for v, u, k in probes]),
|
|
timeout=_BATCH_TIMEOUT,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("vendor-link batch timeout (%d probes)", len(probes))
|
|
return vendors
|
|
|
|
|
|
def score_vendors(vendors: list[dict]) -> list[dict]:
|
|
"""Compute per-vendor compliance score (0-100) and flags.
|
|
|
|
Scoring is recipient-type AND category aware. Two orthogonal axes
|
|
influence which fields are required:
|
|
|
|
recipient_type == INTERNAL / GROUP_COMPANY
|
|
Own processing — the user's consent + main DSI cover privacy +
|
|
opt-out for ALL of these. Per-row opt-out / privacy URLs are
|
|
NOT a compliance gap. What matters: VVT-relevante Fields
|
|
(purpose, cookies with names + expiry).
|
|
|
|
category == 'necessary' (§25 Abs. 2 TDDDG)
|
|
Technically necessary cookies don't need consent → no opt-out
|
|
required even for external processors.
|
|
|
|
For each non-applicable field we set flag '<field>_n_a' instead of
|
|
a penalty flag, so the report can render it neutrally.
|
|
"""
|
|
for v in vendors:
|
|
rtype = (v.get("recipient_type") or "OTHER").upper()
|
|
is_own = rtype in ("INTERNAL", "GROUP_COMPANY")
|
|
is_necessary = (v.get("category") or "").lower() in (
|
|
"necessary", "strictlynecessary",
|
|
)
|
|
opt_out_required = not is_own and not is_necessary
|
|
privacy_required = not is_own
|
|
country_required = not is_own
|
|
|
|
score = 0
|
|
max_score = 0
|
|
flags: list[str] = []
|
|
|
|
# Name (always required) — 20
|
|
max_score += 20
|
|
if v.get("name"):
|
|
score += 20
|
|
else:
|
|
flags.append("no_name")
|
|
|
|
# Purpose — 20
|
|
max_score += 20
|
|
if v.get("purpose"):
|
|
score += 20
|
|
else:
|
|
flags.append("no_purpose")
|
|
|
|
# Country — only for external processors / controllers
|
|
# Falls country leer ist, ableiten aus Rechtsform-Suffix im Namen.
|
|
if country_required:
|
|
max_score += 10
|
|
if v.get("country"):
|
|
score += 10
|
|
elif _country_from_name(v.get("name", "")):
|
|
inferred = _country_from_name(v.get("name", ""))
|
|
v["country"] = inferred
|
|
v["country_inferred"] = True
|
|
score += 10
|
|
else:
|
|
flags.append("no_country")
|
|
|
|
# Opt-Out URL — only when consent-based AND external
|
|
if opt_out_required:
|
|
max_score += 25
|
|
if not v.get("opt_out_url"):
|
|
flags.append("no_opt_out_url")
|
|
elif v.get("opt_out_ok") is False:
|
|
flags.append("broken_opt_out")
|
|
score += 5
|
|
else:
|
|
score += 25
|
|
|
|
# Privacy policy URL — required for external (own = via main DSI)
|
|
if privacy_required:
|
|
weight = 10 if is_necessary else 15
|
|
max_score += weight
|
|
if not v.get("privacy_policy_url"):
|
|
flags.append("no_privacy_url")
|
|
elif v.get("privacy_ok") is False:
|
|
flags.append("broken_privacy_url")
|
|
score += weight // 3
|
|
else:
|
|
score += weight
|
|
|
|
# Cookies disclosed (names + expiry) — required for ALL types
|
|
# (own processing too: BMW must list its own cookies for the VVT)
|
|
weight = 50 if is_own or is_necessary else 15
|
|
max_score += weight
|
|
cookies = v.get("cookies") or []
|
|
if cookies:
|
|
named = sum(1 for c in cookies if c.get("name"))
|
|
with_expiry = sum(1 for c in cookies if c.get("expiry"))
|
|
if named >= 1 and with_expiry >= 1:
|
|
score += weight
|
|
elif named >= 1:
|
|
score += weight // 2
|
|
flags.append("cookies_no_expiry")
|
|
else:
|
|
flags.append("cookies_no_names")
|
|
else:
|
|
flags.append("no_cookies_listed")
|
|
|
|
v["compliance_score"] = round(score / max_score * 100) if max_score else 0
|
|
v["compliance_flags"] = flags
|
|
return vendors
|
|
|
|
|
|
# ── CheckItem rendering ──────────────────────────────────────────────
|
|
|
|
def build_check_items(validated: list[LinkCheck]) -> list[dict]:
|
|
"""Turn validator results into compliance-check items (one per kind).
|
|
|
|
Always returns 2 items (opt-out + privacy-policy) so the report layout
|
|
is stable. Skipped if no links of that kind were extracted.
|
|
"""
|
|
items: list[dict] = []
|
|
for kind, label in (
|
|
("opt-out", "Opt-Out-Links der Drittanbieter erreichbar"),
|
|
("privacy-policy", "Privacy-Policy-Links der Drittanbieter erreichbar"),
|
|
):
|
|
of_kind = [l for l in validated if l.get("kind") == kind]
|
|
if not of_kind:
|
|
continue
|
|
total = len(of_kind)
|
|
ok = sum(1 for l in of_kind if l.get("reachable"))
|
|
broken = [l for l in of_kind if not l.get("reachable")]
|
|
all_pass = ok == total
|
|
|
|
hint = ""
|
|
matched = ""
|
|
if all_pass:
|
|
matched = f"{ok}/{total} Links erreichbar (HTTP 2xx/3xx)"
|
|
else:
|
|
broken_summary = ", ".join(
|
|
f"{l['url'][:60]} ({l.get('status') or l.get('error', '?')})"
|
|
for l in broken[:5]
|
|
)
|
|
hint = (
|
|
f"{len(broken)}/{total} Links sind defekt. Defekte "
|
|
f"Provider-Eintraege erfuellen Art. 7(3) DSGVO nicht — der "
|
|
f"Widerruf der Einwilligung ist fuer diese Anbieter unmoeglich. "
|
|
f"Beispiele: {broken_summary}"
|
|
)
|
|
items.append({
|
|
"id": f"cookie_links_{kind.replace('-', '_')}",
|
|
"label": label,
|
|
"passed": all_pass,
|
|
"severity": "MEDIUM" if kind == "opt-out" else "LOW",
|
|
"matched_text": matched,
|
|
"level": 2,
|
|
"parent": "opt_out",
|
|
"skipped": False,
|
|
"hint": hint,
|
|
})
|
|
return items
|
|
|
|
|
|
# ─── Country-Inferenz aus Rechtsform-Suffix ────────────────────────
|
|
#
|
|
# Wenn ein Vendor das "country"-Feld leer hat, koennen wir es oft aus
|
|
# dem Firmen-Suffix ableiten:
|
|
# Adform A/S → DK (Dänemark, Aktieselskab)
|
|
# Pinterest Europe Ltd. → IE (Irland, Limited)
|
|
# Salesforce Inc. → US (Incorporated)
|
|
# Adobe ... Ireland Limited → IE
|
|
# Genesys ... B.V. → NL (Niederlande, Besloten Vennootschap)
|
|
# Equativ S.A. → FR (Société Anonyme)
|
|
# SAP SE → DE (Societas Europaea — meist DE-eingetragen)
|
|
#
|
|
# Kombi-Strategie:
|
|
# 1) Suffix-Pattern
|
|
# 2) Laendername im Firmen-Namen ('Ireland', 'Deutschland')
|
|
# 3) Specific Vendor (Google Inc / Meta Platforms Ireland Ltd → vendor-specific)
|
|
|
|
import re as _re
|
|
|
|
_SUFFIX_COUNTRY: list[tuple[str, str]] = [
|
|
# Pattern (am Wort-Ende oder vor weiteren Tokens) → ISO-Code
|
|
(r"\bA/S\b", "DK"), # Aktieselskab
|
|
(r"\bApS\b", "DK"), # Anpartsselskab
|
|
(r"\bAB\b", "SE"), # Aktiebolag
|
|
(r"\bAS\b(?!\w)", "NO"), # Aksjeselskap
|
|
(r"\bOy\b", "FI"), # Osakeyhtiö
|
|
(r"\bAG\b(?!\w)", "DE"), # auch CH/AT moeglich, default DE
|
|
(r"\bGmbH\b", "DE"),
|
|
(r"\bUG\b", "DE"),
|
|
(r"\beG\b", "DE"),
|
|
(r"\bKG\b", "DE"),
|
|
(r"\bOHG\b", "DE"),
|
|
(r"\bSE\b", "DE"), # Societas Europaea — pruefen ob SAP SE etc.
|
|
(r"\bS\.A\.\b", "FR"), # France / SE / ES
|
|
(r"\bSAS\b", "FR"),
|
|
(r"\bS\.A\.S\.\b", "FR"),
|
|
(r"\bSARL\b", "FR"),
|
|
(r"\bS\.r\.l\.\b", "IT"),
|
|
(r"\bS\.p\.A\.\b", "IT"),
|
|
(r"\bSpA\b", "IT"),
|
|
(r"\bB\.V\.\b", "NL"),
|
|
(r"\bN\.V\.\b", "NL"),
|
|
(r"\bSL\b", "ES"),
|
|
(r"\bS\.A\.\sde C\.V\.\b", "MX"),
|
|
(r"\bd\.o\.o\.\b", "SI"), # Slowenien
|
|
(r"\bd\.d\.\b", "HR"), # Kroatien
|
|
(r"\bz\s?o\.o\.\b", "PL"),
|
|
(r"\bInc\.?\b", "US"),
|
|
(r"\bIncorporated\b", "US"),
|
|
(r"\bCorp\.?\b", "US"),
|
|
(r"\bCorporation\b", "US"),
|
|
(r"\bLLC\b", "US"),
|
|
(r"\bL\.L\.C\.\b", "US"),
|
|
(r"\bLtd\.?\b", "GB"), # UK Limited, default
|
|
(r"\bLimited\b", "GB"),
|
|
(r"\bPLC\b", "GB"),
|
|
(r"\bPty\b", "AU"),
|
|
(r"\bK\.K\.\b", "JP"), # Kabushiki-Kaisha
|
|
(r"\bPte\.?\sLtd\.?\b", "SG"),
|
|
]
|
|
|
|
# Country-Namen im Firmen-Namen (z.B. "Adobe Systems Software Ireland Limited")
|
|
_COUNTRY_NAME_TOKENS: list[tuple[str, str]] = [
|
|
("ireland", "IE"),
|
|
("deutschland", "DE"),
|
|
("germany", "DE"),
|
|
("netherlands", "NL"),
|
|
("france", "FR"),
|
|
("united kingdom", "GB"),
|
|
("uk", "GB"),
|
|
("usa", "US"),
|
|
("united states", "US"),
|
|
("austria", "AT"),
|
|
("oesterreich", "AT"),
|
|
("schweiz", "CH"),
|
|
("switzerland", "CH"),
|
|
("luxembourg", "LU"),
|
|
("luxemburg", "LU"),
|
|
("denmark", "DK"),
|
|
("daenemark", "DK"),
|
|
("sweden", "SE"),
|
|
("schweden", "SE"),
|
|
("norway", "NO"),
|
|
("norwegen", "NO"),
|
|
("finland", "FI"),
|
|
("finnland", "FI"),
|
|
]
|
|
|
|
# Bekannte Vendors mit eindeutigem Sitz (override)
|
|
_KNOWN_VENDOR_COUNTRY: dict[str, str] = {
|
|
"google inc": "US",
|
|
"google llc": "US",
|
|
"google ireland": "IE",
|
|
"meta platforms ireland": "IE",
|
|
"facebook ireland": "IE",
|
|
"amazon.com inc": "US",
|
|
"amazon web services": "US",
|
|
"amazon web services inc": "US",
|
|
"linkedin inc": "US",
|
|
"salesforce inc": "US",
|
|
"salesforce.com": "US",
|
|
"outbrain inc": "US",
|
|
"taboola inc": "US",
|
|
"pinterest europe ltd": "IE",
|
|
"intuition machines inc": "US",
|
|
"akamai technologies inc": "US",
|
|
"criteo s.a": "FR",
|
|
"criteo sa": "FR",
|
|
"adform a/s": "DK",
|
|
"speedcurve limited": "GB",
|
|
"longtail ad solutions": "US",
|
|
"genesys cloud services b.v": "NL",
|
|
"qualtrics": "US",
|
|
"teads sa": "FR",
|
|
"teads s.a": "FR",
|
|
"salesviewer gmbh": "DE",
|
|
"baqend gmbh": "DE",
|
|
"zenweshare sas": "FR",
|
|
"nayoki gmbh": "DE",
|
|
"psyma": "DE",
|
|
"matomo": "NZ", # InnoCraft NZ aber EU-hostbar
|
|
"adobe systems software ireland": "IE",
|
|
"microsoft corporation": "US",
|
|
"microsoft corp": "US",
|
|
}
|
|
|
|
|
|
def _country_from_name(vendor_name: str) -> str:
|
|
"""Best-effort: ISO-2 Country-Code aus dem Vendor-Namen ableiten."""
|
|
if not vendor_name:
|
|
return ""
|
|
# Vendor-Namen sind oft "<Firma> — <Tool>" — nur Firmen-Teil betrachten
|
|
firm = vendor_name.split(" — ")[0].strip()
|
|
firm_l = firm.lower()
|
|
|
|
# 1) Known vendor lookup (most specific)
|
|
for k, v in _KNOWN_VENDOR_COUNTRY.items():
|
|
if k in firm_l:
|
|
return v
|
|
# 2) Country-Name im Firmen-Namen
|
|
for token, code in _COUNTRY_NAME_TOKENS:
|
|
if token in firm_l:
|
|
return code
|
|
# 3) Rechtsform-Suffix
|
|
for pattern, code in _SUFFIX_COUNTRY:
|
|
if _re.search(pattern, firm):
|
|
return code
|
|
return ""
|