feat: Plan B + A + C — DSE-Versions-MCs + Legacy-URL + Multi-Version
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / loc-budget (push) Failing after 11s
CI / python-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 28s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 10s
CI / go-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped

Drei verwandte Mechanismen für DSE-Beweisbarkeit + URL-Hygiene.

Plan B + PDF — Versions-Beweisbarkeit-MCs (dse_checks.py):
  - mc-dse_version_date (HIGH) — sichtbares Stand/Versionsdatum
    Pflicht. 12 Regex-Pattern: "Stand: April 2024", ISO-Datum,
    "Letzte Aktualisierung", "Version 3.2", englische
    Varianten ("Last updated", "Effective date as of …").
    Norm: Art. 7 Abs. 1 DSGVO (Nachweisbarkeit Einwilligung).
  - mc-dse_version_proof (MED) — PDF-Download oder
    versionierte Archiv-URL. Reine HTML-DSE ohne Snapshot ist
    juristisch fragil. 8 Pattern: .pdf, Download-Hinweis,
    web.archive.org, /dse-vNNN.html.
    Norm: DSK-Orientierungshilfe 2024.

Plan A — Legacy-URL-Discovery (legacy_url_discovery.py + B20):
  Vier komplementäre Quellen:
    A.1 /sitemap.xml + Sub-Sitemaps parsen, auf compliance-
        relevante Slugs filtern
    A.2 archive.org/wayback/available pro Slug — wenn Wayback
        zeigt ≥18 Monate alten Snapshot UND Seite heute noch
        200 liefert UND nicht im Footer → Legacy-Verdacht
    A.3 Slug-Permutations: 6 doc_types × 6 Slug-Varianten ×
        5 Lang-Prefixe × 4 Brand-Parameter
    A.4 Banner-Modal-Links (über consent-tester Stufe 4 Tour)
  Mail-Block "🗂️ Legacy-URL-Inventar" mit Tabelle: URL · HTTP ·
  Wayback-Alter · Footer · Empfehlung (301/Offline/Behalten).
  Engine entscheidet NICHT was Legacy ist — präsentiert das
  Inventar, Kunde wählt.

  Real-World-Smoke Elli:
    /en/cookies → HTTP 200, Wayback 69 Mo alt, nicht im Footer
                  → "Legacy-Verdacht, 301 setzen"
    /en/impressum → HTTP 302, redirected → "behalten"

Plan C — Multi-Version-DSE-Analyse (multi_version_dse.py):
  Wenn ≥2 DSE-URLs reachable: pro Variante DSB-Name + Datum +
  Wortzahl + SHA-256 extrahieren, Inkonsistenzen flaggen
  (date_divergent, dsb_divergent, no_date_count).
  Mail-Block "📑 Mehrere DSE-Versionen erkannt" mit
  Vergleichstabelle + rotem Hinweis "Nur eine Version kann
  gültig sein". Beispiel Elli: /de/datenschutz (Mollstr-DSB,
  2022) vs /de/datenschutzerklaerung?brand=elli (Proliance,
  ohne Datum).

API-Response erweitert um legacy_url_inventory +
html_blocks.legacy_urls + multi_version_dse_html im V2-Layout.

ENV-Override: LEGACY_URL_DISABLED=1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-08 10:04:14 +02:00
parent 663a1c3e38
commit 5c5d676f01
7 changed files with 715 additions and 0 deletions
@@ -0,0 +1,121 @@
"""B20 wiring — Legacy-URL-Discovery + Mail-Block."""
from __future__ import annotations
import html
import logging
import os
from compliance.services.legacy_url_discovery import discover_legacy_urls
from compliance.services.multi_version_dse import (
analyze_multiple_dse_versions, render_multi_version_block,
)
logger = logging.getLogger(__name__)
_DISABLED = os.environ.get("LEGACY_URL_DISABLED", "").lower() in (
"1", "true", "yes",
)
async def run_b20(state: dict) -> None:
if _DISABLED:
return
try:
result = await discover_legacy_urls(state)
except Exception as e:
logger.warning("legacy-url-discovery failed: %s", e)
return
candidates = result.get("candidates") or []
state["legacy_url_inventory"] = result
if candidates:
state["legacy_url_html"] = _render(result)
logger.info(
"B20 legacy-url: %d candidates of %d probed",
len(candidates), result.get("probed", 0),
)
# Plan C — Multi-Version-DSE-Analyse: falls Legacy-Discovery zusätz-
# liche DSE-URLs liefert UND ≥2 reachable sind, parallele Analyse +
# Vergleichsblock.
try:
mv_info = await analyze_multiple_dse_versions(state)
if mv_info.get("versions") and len(mv_info["versions"]) >= 2:
state["multi_version_dse_info"] = mv_info
state["multi_version_dse_html"] = render_multi_version_block(
mv_info,
)
logger.info(
"B20-C multi-version-dse: %d versions, date_div=%s dsb_div=%s",
len(mv_info["versions"]),
mv_info.get("date_divergent"),
mv_info.get("dsb_divergent"),
)
except Exception as e:
logger.warning("multi-version-dse analysis failed: %s", e)
def _render(result: dict) -> str:
candidates = result.get("candidates") or []
if not candidates:
return ""
rows = []
for c in candidates[:25]:
st = c["status"]
sev_color = (
"#dc2626" if "Legacy-Verdacht" in (c.get("recommendation") or "")
else "#f59e0b" if st in (404, 410) else "#64748b"
)
age = c.get("age_months")
age_disp = f"{age} Mo." if age is not None else ""
rec = c.get("recommendation") or ""
rows.append(
f"<tr>"
f"<td style='padding:5px 8px;font-family:monospace;color:#475569;"
f"font-size:11px;max-width:380px;word-break:break-all;'>"
f"<a href='{html.escape(c['url'])}' "
f"style='color:{sev_color};'>{html.escape(c['url'][:120])}</a>"
f"</td>"
f"<td style='padding:5px 8px;font-size:11px;text-align:center;'>"
f"<strong style='color:{sev_color};'>{st or '?'}</strong></td>"
f"<td style='padding:5px 8px;font-size:11px;text-align:center;'>"
f"{age_disp}</td>"
f"<td style='padding:5px 8px;font-size:11px;text-align:center;'>"
f"{'' if c.get('in_footer') else ''}</td>"
f"<td style='padding:5px 8px;font-size:11px;color:#475569;'>"
f"{html.escape(rec)}</td>"
f"</tr>"
)
rest = ""
if len(candidates) > 25:
rest = (
f"<p style='font-size:12px;color:#64748b;margin-top:6px;'>"
f"<em>… und {len(candidates)-25} weitere — vollständig in "
f"<code>legacy-urls.csv</code> im ZIP-Anhang.</em></p>"
)
return (
"<div style='margin:24px 0;padding:16px;border-left:4px solid #0f766e;"
"background:#f0fdfa;border-radius:4px;'>"
"<h2 style='margin:0 0 8px;color:#134e4a;font-size:16px;'>"
f"🗂️ Legacy-URL-Inventar ({len(candidates)} Kandidaten von "
f"{result.get('probed', '?')} geprüft)"
"</h2>"
"<p style='margin:0 0 8px;font-size:12px;color:#475569;'>"
"Quellen: /sitemap.xml + Wayback-Machine + Slug-Permutations. "
"Wir <strong>entscheiden nicht</strong> ob eine URL Legacy ist — "
"wir präsentieren das Inventar mit Status und Empfehlung. Der "
"Kunde entscheidet."
"</p>"
"<table style='font-size:11px;width:100%;border-collapse:collapse;"
"background:#fff;border-radius:4px;'>"
"<thead><tr style='background:#ccfbf1;'>"
"<th style='padding:6px 8px;text-align:left;'>URL</th>"
"<th style='padding:6px 8px;'>HTTP</th>"
"<th style='padding:6px 8px;'>Wayback-Alter</th>"
"<th style='padding:6px 8px;'>Footer</th>"
"<th style='padding:6px 8px;text-align:left;'>Empfehlung</th>"
"</tr></thead><tbody>" + "".join(rows) + "</tbody></table>"
+ rest +
"</div>"
)
@@ -30,6 +30,7 @@ from ._b16_wiring import run_b16
from ._b17_wiring import run_b17
from ._b18_wiring import run_b18
from ._b19_wiring import run_b19
from ._b20_wiring import run_b20
from ._constants import _compliance_check_jobs
from ._phase_a_resolve import run_phase_a
from ._phase_b_profile_check import run_phase_b
@@ -94,6 +95,7 @@ async def run_compliance_check(check_id: str, req) -> None:
await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung)
await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM)
run_b19(state) # Cookie-Coherence (Salesforce-as-essential)
await run_b20(state) # Legacy-URL-Discovery (Sitemap+Wayback)
# Phase D-3 top/mid/bot: Step 5 HTML blocks
await run_phase_d3_top(state)
await run_phase_d3_mid(state)
@@ -90,7 +90,9 @@ def run_phase_f(state: dict) -> None:
"ai_act": state.get("ai_act_html", ""),
"impressum_agent": state.get("impressum_agent_html", ""),
"cookie_coherence": state.get("cookie_coherence_html", ""),
"legacy_urls": state.get("legacy_url_html", ""),
},
"legacy_url_inventory": state.get("legacy_url_inventory") or None,
}
_compliance_check_jobs[check_id]["status"] = "completed"
@@ -411,4 +411,75 @@ ART13_CHECKLIST = [
"severity": "LOW",
"hint": "Vollstaendigen Namen, Adresse und Website der Aufsichtsbehoerde angeben. Haeufiger Fehler: 'die zustaendige Aufsichtsbehoerde' ohne Konkretisierung. Korrekt z.B.: 'LfDI BW, Koenigstrasse 10a, 70173 Stuttgart, www.baden-wuerttemberg.datenschutz.de'.",
},
# ── L1: Versionsdatum / Nachweisbarkeit der Einwilligung ─────────
#
# Art. 7 Abs. 1 DSGVO verlangt vom Verantwortlichen, die Einwilligung
# NACHWEISEN zu koennen — inkl. WELCHEM Stand der DSE der Nutzer
# zugestimmt hat. Ohne Datum/Versionsnummer ist das nicht moeglich.
{
"id": "dse_version_date",
"label": "Stand/Versionsdatum der DSE auffindbar",
"level": 1, "parent": None,
"patterns": [
# "Stand: April 2024", "Stand Januar 2026"
r"stand:?\s*(?:januar|februar|m(?:ae|ä)rz|april|mai|juni|juli|"
r"august|september|oktober|november|dezember)\s+\d{4}",
# "Stand: 01.04.2024", "Stand 04/2024", "Stand 2024-04-01"
r"stand:?\s*\d{1,2}[./-]\d{1,2}[./-]\d{2,4}",
r"stand:?\s*\d{4}-\d{2}(?:-\d{2})?",
r"stand:?\s*\d{1,2}/\d{2,4}",
# "Letzte Aktualisierung: …", "Zuletzt geaendert: …"
r"letzte\s+(?:aktualisierung|(?:ae|ä)nderung|anpassung)",
r"zuletzt\s+(?:ge)?(?:ae|ä)ndert",
r"g(?:ue|ü)ltig\s+(?:ab|seit|f(?:ue|ü)r)",
r"version\s+\d+[.\d]*",
r"version:?\s*v?\d+\.\d+",
# Englisch
r"last\s+(?:updated|modified|revised|amended)",
r"effective\s+(?:date|as\s+of)",
r"as\s+of\s+(?:january|february|march|april|may|june|july|"
r"august|september|october|november|december)\s+\d{4}",
],
"severity": "HIGH",
"hint": (
"Art. 7 Abs. 1 DSGVO: Verantwortlicher muss NACHWEISEN koennen, "
"welcher DSE-Version der Nutzer zugestimmt hat. Ohne ein "
"sichtbares Versionsdatum / 'Stand: …' ist die Einwilligung "
"nicht beweisbar — Aufsichtsbehoerden + Verbraucherzentralen "
"stossen genau hier nach. Korrekt: 'Stand: Januar 2026' oder "
"'Version 3.2 — gueltig ab 01.01.2026' sichtbar am Anfang oder "
"Ende der DSE."
),
},
{
"id": "dse_version_proof",
"label": "Versions-eindeutige Beweis-Verankerung (PDF / Download / Archiv-Link)",
"level": 2, "parent": "dse_version_date",
"patterns": [
# PDF-Download verfuegbar
r"\.pdf\b",
r"(?:dse|datenschutz|privacy)[\w\-]*\.pdf",
# Download-Hinweis
r"(?:dse|datenschutzerkl(?:ae|ä)rung|datenschutzhinweis(?:e)?)"
r"[^.]{0,80}herunterladen",
r"als\s+pdf\s+(?:herunterladen|speichern|laden|verf(?:ue|ü)gbar)",
r"download[^.]{0,40}(?:dse|datenschutzerkl|privacy|policy)",
# Konkrete Versions-URL (Wayback / Archiv / versionierte URL)
r"web\.archive\.org",
r"version[\-_]?archive",
r"(?:dse|privacy)-v?\d+[.\d]*\.html?",
],
"severity": "MEDIUM",
"hint": (
"Beste-Praxis nach DSK-Orientierungshilfe 2024: fuer den Beweis "
"der konkreten DSE-Version sollte zusaetzlich zur Web-Version "
"ein PDF-Download oder ein versionierter Archiv-Link verfuegbar "
"sein. Reine HTML-DSE ohne Snapshot ist juristisch fragil — "
"der Anbieter kann die DSE jederzeit aendern und das Original "
"ist nicht mehr nachweisbar. Empfehlung: 'Aktuelle DSE als PDF "
"herunterladen' im Kopfbereich, oder eindeutige Versions-URLs "
"(z.B. /dse/v2026-01.html)."
),
},
]
@@ -0,0 +1,301 @@
"""Legacy-URL-Discovery — systematische Suche nach veralteten DSE-/
Impressum-/Cookie-/AGB-URLs auf einer Domain.
Strategie aus 4 unabhängigen Quellen:
A.1 Sitemap-Parser — /sitemap.xml, /sitemap_index.xml, sitemap-de.xml,
sitemap-legal.xml
A.2 Wayback Machine — archive.org/wayback/available für jeden bekannten
Slug; URLs die vor ≥18 Monaten archiviert wurden
und heute noch 200 liefern = Legacy-Verdacht
A.3 Slug-Permutations — bekannte Slug-Familie × Locale/Brand-Parameter
A.4 Banner-Modal-Links — Playwright öffnet Cookie-Einstellungen-Modal
und sammelt alle Links (Plan A.4 wird via
consent-tester aufgerufen, hier nur Schema)
Output: Liste von Legacy-Kandidaten mit Status, last_modified, found_via,
recommended_action ("Redirect 301", "Offline nehmen", "Belassen — aktuell").
Best-Effort: jede Quelle catched eigene Exceptions — eine ausgefallene
Sitemap blockiert nicht Wayback.
"""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
import httpx
logger = logging.getLogger(__name__)
# Kanonische DE/EN Slug-Familie pro Doc-Type. Wir suchen jede dieser
# Pfade auf jeder Origin — auch wenn die Discovery sie schon hat,
# als unabhängige Verifikation.
_SLUG_FAMILY: dict[str, tuple[str, ...]] = {
"dse": (
"datenschutz", "datenschutzerklaerung", "datenschutzerklärung",
"datenschutzhinweise", "datenschutzhinweis",
"privacy", "privacy-policy", "privacy-notice",
"datenschutz-online", "dse",
),
"impressum": (
"impressum", "imprint", "legal-notice", "site-notice",
"anbieterkennzeichnung",
),
"cookie": (
"cookie-richtlinie", "cookies", "cookie-policy",
"cookie-erklaerung", "cookieerklaerung", "cookie-hinweise",
),
"agb": (
"agb", "allgemeine-geschaeftsbedingungen",
"geschaeftsbedingungen", "terms-and-conditions",
"general-terms-of-business",
),
"nutzungsbedingungen": (
"nutzungsbedingungen", "terms-of-use", "terms-of-service",
"nutzungsordnung",
),
"widerruf": (
"widerruf", "widerrufsbelehrung",
"widerrufsbelehrung-privatkunden", "cancellation",
),
}
_LANG_PREFIXES = ("", "/de", "/de_de", "/de-de", "/germany", "/en")
_BRAND_PARAMS = ("", "?brand=", "?lang=de", "?locale=de_DE")
_LEGACY_AGE_MONTHS_THRESHOLD = 18 # ältere = Legacy-Verdacht
async def _fetch_sitemap_urls(origin: str) -> list[str]:
"""A.1 — sitemap.xml + Varianten."""
candidates = (
f"{origin}/sitemap.xml",
f"{origin}/sitemap_index.xml",
f"{origin}/sitemap-de.xml",
f"{origin}/sitemap-legal.xml",
f"{origin}/sitemap-pages.xml",
)
out: set[str] = set()
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c:
for url in candidates:
try:
r = await c.get(url)
if r.status_code != 200:
continue
# Sitemap-Index: weitere Sitemaps verlinkt
locs = re.findall(r"<loc>([^<]+)</loc>", r.text)
for loc in locs:
loc = loc.strip()
if loc.endswith(".xml"):
# nested sitemap — fetch
try:
rr = await c.get(loc)
if rr.status_code == 200:
out.update(
m.strip() for m in
re.findall(r"<loc>([^<]+)</loc>",
rr.text)
if not m.strip().endswith(".xml")
)
except Exception:
continue
else:
out.add(loc)
except Exception:
continue
except Exception as e:
logger.info("sitemap fetch failed for %s: %s", origin, e)
return list(out)
async def _wayback_check(url: str) -> dict | None:
"""A.2 — Wayback-Machine. Return latest archived snapshot info."""
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.get(
"https://archive.org/wayback/available",
params={"url": url, "timestamp": "20200101"},
)
if r.status_code != 200:
return None
data = r.json() or {}
snap = (data.get("archived_snapshots") or {}).get("closest") or {}
if not snap.get("available"):
return None
ts = snap.get("timestamp", "")
return {
"snapshot_url": snap.get("url"),
"timestamp": ts,
"status": snap.get("status"),
}
except Exception:
return None
def _months_since(timestamp_yyyymmdd: str) -> int | None:
"""Wayback-Timestamp Format: YYYYMMDDHHMMSS."""
if not timestamp_yyyymmdd or len(timestamp_yyyymmdd) < 6:
return None
try:
snap = datetime.strptime(timestamp_yyyymmdd[:6], "%Y%m").replace(
tzinfo=timezone.utc,
)
now = datetime.now(timezone.utc)
delta = (now.year - snap.year) * 12 + (now.month - snap.month)
return max(0, delta)
except Exception:
return None
async def _probe_alive(url: str) -> tuple[int, str]:
"""Return (status_code, last_modified_header)."""
try:
async with httpx.AsyncClient(
timeout=6.0, follow_redirects=False,
) as c:
r = await c.head(url)
if r.status_code == 405:
r = await c.get(url)
return r.status_code, r.headers.get("last-modified", "")
except Exception:
return 0, ""
def _build_slug_candidates(origin: str) -> list[str]:
out: set[str] = set()
for doc_type, slugs in _SLUG_FAMILY.items():
for lang in _LANG_PREFIXES:
for slug in slugs:
base = f"{origin}{lang}/{slug}".replace("//", "/")
base = base.replace("https:/", "https://")
base = base.replace("http:/", "http://")
out.add(base)
for bp in _BRAND_PARAMS:
if bp:
out.add(base + bp)
return list(out)
def _filter_legal_urls(urls: list[str]) -> list[str]:
"""Compliance-relevante Pfade aus Sitemap-Output."""
keywords = []
for slugs in _SLUG_FAMILY.values():
keywords.extend(slugs)
keywords_lc = [k.lower() for k in keywords]
out: list[str] = []
for u in urls:
ul = u.lower()
if any(k in ul for k in keywords_lc):
out.append(u)
return out
def _recommend(status: int, age_months: int | None,
in_sitemap: bool, in_footer: bool) -> str:
if status == 404 or status == 410:
return "URL veraltet (404/410) — Backlinks prüfen, ggf. 301 setzen"
if status == 0:
return "Nicht erreichbar — manuell prüfen"
if status in (301, 302, 303, 307, 308):
return "Bereits redirected — behalten"
if status == 200:
if age_months is None:
return "Erreichbar, kein Wayback-Stand — Inhalt manuell prüfen"
if age_months >= _LEGACY_AGE_MONTHS_THRESHOLD and not in_footer:
return (
f"Legacy-Verdacht ({age_months} Monate altes Wayback, "
"nicht im Footer verlinkt) — 301-Redirect auf aktuelle "
"Version setzen ODER offline nehmen"
)
if age_months >= 36 and in_footer:
return (
f"Reachable + im Footer, aber Wayback {age_months} Monate "
"alt — manuell prüfen ob Inhalt noch aktuell"
)
return "Aktuell, kein Handlungsbedarf"
return f"HTTP {status} — manuell prüfen"
async def discover_legacy_urls(state: dict) -> dict:
"""Run all 4 sources + consolidate. Returns dict for HTML rendering."""
doc_entries = state.get("doc_entries") or []
origins: set[str] = set()
footer_urls: set[str] = set()
for e in doc_entries:
url = (e.get("url") or "").strip()
if url and "://" in url:
p = urlparse(url)
origins.add(f"{p.scheme}://{p.netloc}")
footer_urls.add(url.split("#")[0].split("?")[0])
if not origins:
return {"candidates": [], "skipped": "no_origin"}
candidates: set[str] = set()
# A.1 Sitemap
for o in list(origins)[:2]:
sitemap_urls = await _fetch_sitemap_urls(o)
candidates.update(_filter_legal_urls(sitemap_urls))
# A.3 Slug-Permutations
candidates.update(_build_slug_candidates(o))
# Cap to avoid explosion
cands = list(candidates)[:60]
# Probe alive + Wayback in parallel
async def _check(url: str) -> dict:
status, lm = await _probe_alive(url)
wb = await _wayback_check(url) if status == 200 else None
age = _months_since(wb.get("timestamp", "") if wb else "")
in_footer = url.split("#")[0].split("?")[0] in footer_urls
return {
"url": url,
"status": status,
"last_modified": lm,
"wayback_snapshot": wb.get("snapshot_url") if wb else "",
"wayback_timestamp": wb.get("timestamp", "") if wb else "",
"age_months": age,
"in_footer": in_footer,
"recommendation": _recommend(status, age, False, in_footer),
}
results = await asyncio.gather(
*[_check(u) for u in cands], return_exceptions=True,
)
results = [r for r in results if isinstance(r, dict)]
# Filter: only show interesting ones (≥200 reachable + legacy-relevant)
interesting: list[dict] = []
for r in results:
if r["status"] == 0:
continue # Nicht erreichbar, nicht interessant
# 404/410/redirects nur wenn im footer → broken link
if r["status"] in (404, 410) and not r["in_footer"]:
continue
# 200 + im Footer + recent Wayback → "alles OK" filter
if (r["status"] == 200 and r["in_footer"]
and r["age_months"] is not None
and r["age_months"] < _LEGACY_AGE_MONTHS_THRESHOLD):
continue
interesting.append(r)
# Sort: Legacy-Verdächtige zuerst (200 + alt + nicht im Footer)
interesting.sort(
key=lambda r: (
0 if "Legacy-Verdacht" in r["recommendation"] else
1 if "veraltet" in r["recommendation"] else 2,
-(r.get("age_months") or 0),
),
)
return {
"candidates": interesting,
"probed": len(results),
"filtered_kept": len(interesting),
"origins": list(origins),
}
@@ -71,6 +71,9 @@ def compose_v2(state: dict) -> str:
state.get("impressum_agent_html", ""),
# B19 Cookie-Coherence-Check (Salesforce-as-essential etc.)
state.get("cookie_coherence_html", ""),
# B20 Legacy-URL-Discovery + Multi-Version-DSE-Vergleich
state.get("multi_version_dse_html", ""),
state.get("legacy_url_html", ""),
# Browser-Matrix (Stage 1.c)
state.get("browser_matrix_html", ""),
# All legacy build_*_html() wrapped in V2 sections — preserves
@@ -0,0 +1,215 @@
"""Multi-Version-DSE-Analyse.
Wenn Auto-Discovery + Legacy-URL-Discovery mehrere DSE-URLs auf der
gleichen Domain finden, vergleichen wir Key-Felder pro Variante:
- Stand-/Versionsdatum (sichtbar?)
- DSB-Name (Mollstraße vs Proliance vs …)
- Wortzahl (deutlich kürzere Version = veraltet?)
- SHA-256-Hash (für Audit-Trail)
Output: HTML-Block mit Vergleichstabelle + roter Hinweis "Nur eine
Version kann gültig sein". Nicht-destruktiv: wir entscheiden NICHT
welche Variante richtig ist — wir präsentieren beide nebeneinander.
Performance: cap auf max 3 zusätzliche DSE-URLs (Sitemap kann Hunderte
liefern, das würde 3min+ kosten).
"""
from __future__ import annotations
import hashlib
import logging
import re
from html import escape as h
from urllib.parse import urlparse
import httpx
logger = logging.getLogger(__name__)
_DSB_PATTERNS = (
r"datenschutzbeauftragt\w*[\s\S]{0,200}?"
r"((?:[A-ZÄÖÜ][\w\-]{2,40}\s+){1,4}"
r"(?:GmbH|AG|GbR|Mollstr|Stra(?:ße|sse|sse)|str\.))",
r"(proliance\s+gmbh)",
r"(datenschutzexperte\.de)",
)
_DATE_PATTERN = re.compile(
r"(?:stand|letzte\s+aktualisierung|version|effective)[:.]?\s*"
r"(\d{4}[-./]\d{1,2}(?:[-./]\d{1,2})?|"
r"(?:januar|februar|m(?:ae|ä)rz|april|mai|juni|juli|august|"
r"september|oktober|november|dezember)\s+\d{4}|"
r"\d{1,2}[./]\d{4})",
re.IGNORECASE,
)
async def _fetch_text(url: str) -> tuple[str, int]:
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as c:
r = await c.get(url)
if r.status_code != 200:
return "", r.status_code
text = re.sub(r"<script.*?</script>", " ", r.text,
flags=re.S | re.I)
text = re.sub(r"<style.*?</style>", " ", text,
flags=re.S | re.I)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text, 200
except Exception as e:
logger.info("fetch failed for %s: %s", url, e)
return "", 0
def _extract_dsb(text: str) -> str:
if not text:
return ""
for pat in _DSB_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
return (m.group(1) if m.lastindex else m.group(0))[:120].strip()
return ""
def _extract_date(text: str) -> str:
if not text:
return ""
m = _DATE_PATTERN.search(text)
return (m.group(1) if m else "")[:40].strip()
async def analyze_multiple_dse_versions(state: dict) -> dict:
"""If ≥2 DSE-like URLs are reachable on the same domain, fetch
each and produce a comparison table."""
doc_entries = state.get("doc_entries") or []
legacy = (state.get("legacy_url_inventory") or {}).get("candidates") or []
# Collect DSE-candidate URLs from doc_entries + legacy-inventory
candidates: list[str] = []
seen: set[str] = set()
for e in doc_entries:
if (e.get("doc_type") or "") != "dse":
continue
url = (e.get("url") or "").strip()
if url and url not in seen:
candidates.append(url)
seen.add(url)
for c in legacy:
url = (c.get("url") or "").strip()
if not url or url in seen:
continue
# Only DSE-ish URLs
url_lc = url.lower()
if any(k in url_lc for k in (
"datenschutz", "privacy", "datenschutzerk",
)):
if c.get("status") == 200:
candidates.append(url)
seen.add(url)
if len(candidates) < 2:
return {"versions": [], "skipped": "single_version_or_none"}
# Cap to 3 for performance
candidates = candidates[:3]
versions: list[dict] = []
for url in candidates:
text, status = await _fetch_text(url)
if not text:
continue
versions.append({
"url": url,
"status": status,
"word_count": len(text.split()),
"sha256": hashlib.sha256(text.encode("utf-8")).hexdigest()[:16],
"date_found": _extract_date(text) or "kein Datum",
"dsb_found": _extract_dsb(text) or "",
})
if len(versions) < 2:
return {"versions": versions, "skipped": "only_one_fetched"}
# Detect contradictions
dates = {v["date_found"] for v in versions if v["date_found"] != "kein Datum"}
dsbs = {v["dsb_found"] for v in versions if v["dsb_found"] != ""}
return {
"versions": versions,
"date_divergent": len(dates) > 1,
"dsb_divergent": len(dsbs) > 1,
"no_date_count": sum(
1 for v in versions if v["date_found"] == "kein Datum"
),
}
def render_multi_version_block(info: dict) -> str:
versions = info.get("versions") or []
if len(versions) < 2:
return ""
rows = []
for v in versions:
rows.append(
f"<tr>"
f"<td style='padding:6px 8px;font-family:monospace;font-size:11px;"
f"max-width:300px;word-break:break-all;'>"
f"<a href='{h(v['url'])}' style='color:#0369a1;'>"
f"{h(v['url'][:90])}</a></td>"
f"<td style='padding:6px 8px;font-size:11px;text-align:right;'>"
f"{v['word_count']:,}</td>"
f"<td style='padding:6px 8px;font-size:11px;font-family:monospace;"
f"color:#475569;'>{h(v['sha256'])}…</td>"
f"<td style='padding:6px 8px;font-size:11px;'>"
f"{h(v['date_found'])}</td>"
f"<td style='padding:6px 8px;font-size:11px;'>"
f"{h(v['dsb_found'])}</td>"
f"</tr>"
)
warnings = []
if info.get("date_divergent"):
warnings.append("verschiedene Datumsangaben")
if info.get("dsb_divergent"):
warnings.append("verschiedene DSB benannt")
if info.get("no_date_count"):
warnings.append(
f"{info['no_date_count']} von {len(versions)} ohne Datum"
)
warn_html = ""
if warnings:
warn_html = (
"<p style='margin:8px 0;padding:8px 12px;background:#fef3c7;"
"border-left:3px solid #f59e0b;font-size:12px;color:#92400e;'>"
"<strong>Erkannte Inkonsistenzen:</strong> "
+ " · ".join(warnings) +
"</p>"
)
return (
"<div style='margin:24px 0;padding:16px;border-left:4px solid #dc2626;"
"background:#fef2f2;border-radius:4px;'>"
f"<h2 style='margin:0 0 8px;color:#7f1d1d;font-size:16px;'>"
f"📑 Mehrere DSE-Versionen erkannt ({len(versions)})"
"</h2>"
"<p style='margin:0 0 8px;font-size:13px;color:#475569;'>"
"Auf deiner Domain sind mehrere DSE-URLs öffentlich reachable. "
"<strong>Nur eine Version kann rechtsverbindlich gültig sein.</strong> "
"Wir prüfen jede unabhängig — der Kunde wählt das gültige "
"Ergebnis und sorgt dafür, dass die andere Variante "
"<em>301-Redirect</em> oder <em>offline</em> wird."
"</p>"
+ warn_html +
"<table style='font-size:11px;width:100%;border-collapse:collapse;"
"background:#fff;border-radius:4px;'>"
"<thead><tr style='background:#fee2e2;'>"
"<th style='padding:6px 8px;text-align:left;'>URL</th>"
"<th style='padding:6px 8px;text-align:right;'>Wörter</th>"
"<th style='padding:6px 8px;text-align:left;'>SHA-256</th>"
"<th style='padding:6px 8px;text-align:left;'>Datum</th>"
"<th style='padding:6px 8px;text-align:left;'>DSB benannt</th>"
"</tr></thead><tbody>" + "".join(rows) + "</tbody></table>"
"</div>"
)