diff --git a/backend-compliance/compliance/api/agent_check/_b20_wiring.py b/backend-compliance/compliance/api/agent_check/_b20_wiring.py new file mode 100644 index 00000000..9f73976f --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b20_wiring.py @@ -0,0 +1,121 @@ +"""B20 wiring — Legacy-URL-Discovery + Mail-Block.""" + +from __future__ import annotations + +import html +import logging +import os + +from compliance.services.legacy_url_discovery import discover_legacy_urls +from compliance.services.multi_version_dse import ( + analyze_multiple_dse_versions, render_multi_version_block, +) + +logger = logging.getLogger(__name__) + + +_DISABLED = os.environ.get("LEGACY_URL_DISABLED", "").lower() in ( + "1", "true", "yes", +) + + +async def run_b20(state: dict) -> None: + if _DISABLED: + return + try: + result = await discover_legacy_urls(state) + except Exception as e: + logger.warning("legacy-url-discovery failed: %s", e) + return + candidates = result.get("candidates") or [] + state["legacy_url_inventory"] = result + if candidates: + state["legacy_url_html"] = _render(result) + logger.info( + "B20 legacy-url: %d candidates of %d probed", + len(candidates), result.get("probed", 0), + ) + + # Plan C — Multi-Version-DSE-Analyse: falls Legacy-Discovery zusätz- + # liche DSE-URLs liefert UND ≥2 reachable sind, parallele Analyse + + # Vergleichsblock. + try: + mv_info = await analyze_multiple_dse_versions(state) + if mv_info.get("versions") and len(mv_info["versions"]) >= 2: + state["multi_version_dse_info"] = mv_info + state["multi_version_dse_html"] = render_multi_version_block( + mv_info, + ) + logger.info( + "B20-C multi-version-dse: %d versions, date_div=%s dsb_div=%s", + len(mv_info["versions"]), + mv_info.get("date_divergent"), + mv_info.get("dsb_divergent"), + ) + except Exception as e: + logger.warning("multi-version-dse analysis failed: %s", e) + + +def _render(result: dict) -> str: + candidates = result.get("candidates") or [] + if not candidates: + return "" + rows = [] + for c in candidates[:25]: + st = c["status"] + sev_color = ( + "#dc2626" if "Legacy-Verdacht" in (c.get("recommendation") or "") + else "#f59e0b" if st in (404, 410) else "#64748b" + ) + age = c.get("age_months") + age_disp = f"{age} Mo." if age is not None else "—" + rec = c.get("recommendation") or "—" + rows.append( + f"" + f"" + f"{html.escape(c['url'][:120])}" + f"" + f"" + f"{st or '?'}" + f"" + f"{age_disp}" + f"" + f"{'✓' if c.get('in_footer') else '—'}" + f"" + f"{html.escape(rec)}" + f"" + ) + rest = "" + if len(candidates) > 25: + rest = ( + f"

" + f"… und {len(candidates)-25} weitere — vollständig in " + f"legacy-urls.csv im ZIP-Anhang.

" + ) + return ( + "
" + "

" + f"🗂️ Legacy-URL-Inventar ({len(candidates)} Kandidaten von " + f"{result.get('probed', '?')} geprüft)" + "

" + "

" + "Quellen: /sitemap.xml + Wayback-Machine + Slug-Permutations. " + "Wir entscheiden nicht ob eine URL Legacy ist — " + "wir präsentieren das Inventar mit Status und Empfehlung. Der " + "Kunde entscheidet." + "

" + "" + "" + "" + "" + "" + "" + "" + "" + "".join(rows) + "
URLHTTPWayback-AlterFooterEmpfehlung
" + + rest + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 72e6b08b..0c051fc2 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -30,6 +30,7 @@ from ._b16_wiring import run_b16 from ._b17_wiring import run_b17 from ._b18_wiring import run_b18 from ._b19_wiring import run_b19 +from ._b20_wiring import run_b20 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b @@ -94,6 +95,7 @@ async def run_compliance_check(check_id: str, req) -> None: await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung) await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM) run_b19(state) # Cookie-Coherence (Salesforce-as-essential) + await run_b20(state) # Legacy-URL-Discovery (Sitemap+Wayback) # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/api/agent_check/_phase_f_persist.py b/backend-compliance/compliance/api/agent_check/_phase_f_persist.py index 16386d3f..ca042847 100644 --- a/backend-compliance/compliance/api/agent_check/_phase_f_persist.py +++ b/backend-compliance/compliance/api/agent_check/_phase_f_persist.py @@ -90,7 +90,9 @@ def run_phase_f(state: dict) -> None: "ai_act": state.get("ai_act_html", ""), "impressum_agent": state.get("impressum_agent_html", ""), "cookie_coherence": state.get("cookie_coherence_html", ""), + "legacy_urls": state.get("legacy_url_html", ""), }, + "legacy_url_inventory": state.get("legacy_url_inventory") or None, } _compliance_check_jobs[check_id]["status"] = "completed" diff --git a/backend-compliance/compliance/services/doc_checks/dse_checks.py b/backend-compliance/compliance/services/doc_checks/dse_checks.py index a5843a84..45057131 100644 --- a/backend-compliance/compliance/services/doc_checks/dse_checks.py +++ b/backend-compliance/compliance/services/doc_checks/dse_checks.py @@ -411,4 +411,75 @@ ART13_CHECKLIST = [ "severity": "LOW", "hint": "Vollstaendigen Namen, Adresse und Website der Aufsichtsbehoerde angeben. Haeufiger Fehler: 'die zustaendige Aufsichtsbehoerde' ohne Konkretisierung. Korrekt z.B.: 'LfDI BW, Koenigstrasse 10a, 70173 Stuttgart, www.baden-wuerttemberg.datenschutz.de'.", }, + + # ── L1: Versionsdatum / Nachweisbarkeit der Einwilligung ───────── + # + # Art. 7 Abs. 1 DSGVO verlangt vom Verantwortlichen, die Einwilligung + # NACHWEISEN zu koennen — inkl. WELCHEM Stand der DSE der Nutzer + # zugestimmt hat. Ohne Datum/Versionsnummer ist das nicht moeglich. + { + "id": "dse_version_date", + "label": "Stand/Versionsdatum der DSE auffindbar", + "level": 1, "parent": None, + "patterns": [ + # "Stand: April 2024", "Stand Januar 2026" + r"stand:?\s*(?:januar|februar|m(?:ae|ä)rz|april|mai|juni|juli|" + r"august|september|oktober|november|dezember)\s+\d{4}", + # "Stand: 01.04.2024", "Stand 04/2024", "Stand 2024-04-01" + r"stand:?\s*\d{1,2}[./-]\d{1,2}[./-]\d{2,4}", + r"stand:?\s*\d{4}-\d{2}(?:-\d{2})?", + r"stand:?\s*\d{1,2}/\d{2,4}", + # "Letzte Aktualisierung: …", "Zuletzt geaendert: …" + r"letzte\s+(?:aktualisierung|(?:ae|ä)nderung|anpassung)", + r"zuletzt\s+(?:ge)?(?:ae|ä)ndert", + r"g(?:ue|ü)ltig\s+(?:ab|seit|f(?:ue|ü)r)", + r"version\s+\d+[.\d]*", + r"version:?\s*v?\d+\.\d+", + # Englisch + r"last\s+(?:updated|modified|revised|amended)", + r"effective\s+(?:date|as\s+of)", + r"as\s+of\s+(?:january|february|march|april|may|june|july|" + r"august|september|october|november|december)\s+\d{4}", + ], + "severity": "HIGH", + "hint": ( + "Art. 7 Abs. 1 DSGVO: Verantwortlicher muss NACHWEISEN koennen, " + "welcher DSE-Version der Nutzer zugestimmt hat. Ohne ein " + "sichtbares Versionsdatum / 'Stand: …' ist die Einwilligung " + "nicht beweisbar — Aufsichtsbehoerden + Verbraucherzentralen " + "stossen genau hier nach. Korrekt: 'Stand: Januar 2026' oder " + "'Version 3.2 — gueltig ab 01.01.2026' sichtbar am Anfang oder " + "Ende der DSE." + ), + }, + { + "id": "dse_version_proof", + "label": "Versions-eindeutige Beweis-Verankerung (PDF / Download / Archiv-Link)", + "level": 2, "parent": "dse_version_date", + "patterns": [ + # PDF-Download verfuegbar + r"\.pdf\b", + r"(?:dse|datenschutz|privacy)[\w\-]*\.pdf", + # Download-Hinweis + r"(?:dse|datenschutzerkl(?:ae|ä)rung|datenschutzhinweis(?:e)?)" + r"[^.]{0,80}herunterladen", + r"als\s+pdf\s+(?:herunterladen|speichern|laden|verf(?:ue|ü)gbar)", + r"download[^.]{0,40}(?:dse|datenschutzerkl|privacy|policy)", + # Konkrete Versions-URL (Wayback / Archiv / versionierte URL) + r"web\.archive\.org", + r"version[\-_]?archive", + r"(?:dse|privacy)-v?\d+[.\d]*\.html?", + ], + "severity": "MEDIUM", + "hint": ( + "Beste-Praxis nach DSK-Orientierungshilfe 2024: fuer den Beweis " + "der konkreten DSE-Version sollte zusaetzlich zur Web-Version " + "ein PDF-Download oder ein versionierter Archiv-Link verfuegbar " + "sein. Reine HTML-DSE ohne Snapshot ist juristisch fragil — " + "der Anbieter kann die DSE jederzeit aendern und das Original " + "ist nicht mehr nachweisbar. Empfehlung: 'Aktuelle DSE als PDF " + "herunterladen' im Kopfbereich, oder eindeutige Versions-URLs " + "(z.B. /dse/v2026-01.html)." + ), + }, ] diff --git a/backend-compliance/compliance/services/legacy_url_discovery.py b/backend-compliance/compliance/services/legacy_url_discovery.py new file mode 100644 index 00000000..36ab47e1 --- /dev/null +++ b/backend-compliance/compliance/services/legacy_url_discovery.py @@ -0,0 +1,301 @@ +"""Legacy-URL-Discovery — systematische Suche nach veralteten DSE-/ +Impressum-/Cookie-/AGB-URLs auf einer Domain. + +Strategie aus 4 unabhängigen Quellen: + A.1 Sitemap-Parser — /sitemap.xml, /sitemap_index.xml, sitemap-de.xml, + sitemap-legal.xml + A.2 Wayback Machine — archive.org/wayback/available für jeden bekannten + Slug; URLs die vor ≥18 Monaten archiviert wurden + und heute noch 200 liefern = Legacy-Verdacht + A.3 Slug-Permutations — bekannte Slug-Familie × Locale/Brand-Parameter + A.4 Banner-Modal-Links — Playwright öffnet Cookie-Einstellungen-Modal + und sammelt alle Links (Plan A.4 wird via + consent-tester aufgerufen, hier nur Schema) + +Output: Liste von Legacy-Kandidaten mit Status, last_modified, found_via, +recommended_action ("Redirect 301", "Offline nehmen", "Belassen — aktuell"). + +Best-Effort: jede Quelle catched eigene Exceptions — eine ausgefallene +Sitemap blockiert nicht Wayback. +""" + +from __future__ import annotations + +import asyncio +import logging +import re +from datetime import datetime, timezone +from urllib.parse import urljoin, urlparse + +import httpx + +logger = logging.getLogger(__name__) + + +# Kanonische DE/EN Slug-Familie pro Doc-Type. Wir suchen jede dieser +# Pfade auf jeder Origin — auch wenn die Discovery sie schon hat, +# als unabhängige Verifikation. +_SLUG_FAMILY: dict[str, tuple[str, ...]] = { + "dse": ( + "datenschutz", "datenschutzerklaerung", "datenschutzerklärung", + "datenschutzhinweise", "datenschutzhinweis", + "privacy", "privacy-policy", "privacy-notice", + "datenschutz-online", "dse", + ), + "impressum": ( + "impressum", "imprint", "legal-notice", "site-notice", + "anbieterkennzeichnung", + ), + "cookie": ( + "cookie-richtlinie", "cookies", "cookie-policy", + "cookie-erklaerung", "cookieerklaerung", "cookie-hinweise", + ), + "agb": ( + "agb", "allgemeine-geschaeftsbedingungen", + "geschaeftsbedingungen", "terms-and-conditions", + "general-terms-of-business", + ), + "nutzungsbedingungen": ( + "nutzungsbedingungen", "terms-of-use", "terms-of-service", + "nutzungsordnung", + ), + "widerruf": ( + "widerruf", "widerrufsbelehrung", + "widerrufsbelehrung-privatkunden", "cancellation", + ), +} + + +_LANG_PREFIXES = ("", "/de", "/de_de", "/de-de", "/germany", "/en") +_BRAND_PARAMS = ("", "?brand=", "?lang=de", "?locale=de_DE") + + +_LEGACY_AGE_MONTHS_THRESHOLD = 18 # ältere = Legacy-Verdacht + + +async def _fetch_sitemap_urls(origin: str) -> list[str]: + """A.1 — sitemap.xml + Varianten.""" + candidates = ( + f"{origin}/sitemap.xml", + f"{origin}/sitemap_index.xml", + f"{origin}/sitemap-de.xml", + f"{origin}/sitemap-legal.xml", + f"{origin}/sitemap-pages.xml", + ) + out: set[str] = set() + try: + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c: + for url in candidates: + try: + r = await c.get(url) + if r.status_code != 200: + continue + # Sitemap-Index: weitere Sitemaps verlinkt + locs = re.findall(r"([^<]+)", r.text) + for loc in locs: + loc = loc.strip() + if loc.endswith(".xml"): + # nested sitemap — fetch + try: + rr = await c.get(loc) + if rr.status_code == 200: + out.update( + m.strip() for m in + re.findall(r"([^<]+)", + rr.text) + if not m.strip().endswith(".xml") + ) + except Exception: + continue + else: + out.add(loc) + except Exception: + continue + except Exception as e: + logger.info("sitemap fetch failed for %s: %s", origin, e) + return list(out) + + +async def _wayback_check(url: str) -> dict | None: + """A.2 — Wayback-Machine. Return latest archived snapshot info.""" + try: + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.get( + "https://archive.org/wayback/available", + params={"url": url, "timestamp": "20200101"}, + ) + if r.status_code != 200: + return None + data = r.json() or {} + snap = (data.get("archived_snapshots") or {}).get("closest") or {} + if not snap.get("available"): + return None + ts = snap.get("timestamp", "") + return { + "snapshot_url": snap.get("url"), + "timestamp": ts, + "status": snap.get("status"), + } + except Exception: + return None + + +def _months_since(timestamp_yyyymmdd: str) -> int | None: + """Wayback-Timestamp Format: YYYYMMDDHHMMSS.""" + if not timestamp_yyyymmdd or len(timestamp_yyyymmdd) < 6: + return None + try: + snap = datetime.strptime(timestamp_yyyymmdd[:6], "%Y%m").replace( + tzinfo=timezone.utc, + ) + now = datetime.now(timezone.utc) + delta = (now.year - snap.year) * 12 + (now.month - snap.month) + return max(0, delta) + except Exception: + return None + + +async def _probe_alive(url: str) -> tuple[int, str]: + """Return (status_code, last_modified_header).""" + try: + async with httpx.AsyncClient( + timeout=6.0, follow_redirects=False, + ) as c: + r = await c.head(url) + if r.status_code == 405: + r = await c.get(url) + return r.status_code, r.headers.get("last-modified", "") + except Exception: + return 0, "" + + +def _build_slug_candidates(origin: str) -> list[str]: + out: set[str] = set() + for doc_type, slugs in _SLUG_FAMILY.items(): + for lang in _LANG_PREFIXES: + for slug in slugs: + base = f"{origin}{lang}/{slug}".replace("//", "/") + base = base.replace("https:/", "https://") + base = base.replace("http:/", "http://") + out.add(base) + for bp in _BRAND_PARAMS: + if bp: + out.add(base + bp) + return list(out) + + +def _filter_legal_urls(urls: list[str]) -> list[str]: + """Compliance-relevante Pfade aus Sitemap-Output.""" + keywords = [] + for slugs in _SLUG_FAMILY.values(): + keywords.extend(slugs) + keywords_lc = [k.lower() for k in keywords] + out: list[str] = [] + for u in urls: + ul = u.lower() + if any(k in ul for k in keywords_lc): + out.append(u) + return out + + +def _recommend(status: int, age_months: int | None, + in_sitemap: bool, in_footer: bool) -> str: + if status == 404 or status == 410: + return "URL veraltet (404/410) — Backlinks prüfen, ggf. 301 setzen" + if status == 0: + return "Nicht erreichbar — manuell prüfen" + if status in (301, 302, 303, 307, 308): + return "Bereits redirected — behalten" + if status == 200: + if age_months is None: + return "Erreichbar, kein Wayback-Stand — Inhalt manuell prüfen" + if age_months >= _LEGACY_AGE_MONTHS_THRESHOLD and not in_footer: + return ( + f"Legacy-Verdacht ({age_months} Monate altes Wayback, " + "nicht im Footer verlinkt) — 301-Redirect auf aktuelle " + "Version setzen ODER offline nehmen" + ) + if age_months >= 36 and in_footer: + return ( + f"Reachable + im Footer, aber Wayback {age_months} Monate " + "alt — manuell prüfen ob Inhalt noch aktuell" + ) + return "Aktuell, kein Handlungsbedarf" + return f"HTTP {status} — manuell prüfen" + + +async def discover_legacy_urls(state: dict) -> dict: + """Run all 4 sources + consolidate. Returns dict for HTML rendering.""" + doc_entries = state.get("doc_entries") or [] + origins: set[str] = set() + footer_urls: set[str] = set() + for e in doc_entries: + url = (e.get("url") or "").strip() + if url and "://" in url: + p = urlparse(url) + origins.add(f"{p.scheme}://{p.netloc}") + footer_urls.add(url.split("#")[0].split("?")[0]) + if not origins: + return {"candidates": [], "skipped": "no_origin"} + + candidates: set[str] = set() + # A.1 Sitemap + for o in list(origins)[:2]: + sitemap_urls = await _fetch_sitemap_urls(o) + candidates.update(_filter_legal_urls(sitemap_urls)) + # A.3 Slug-Permutations + candidates.update(_build_slug_candidates(o)) + + # Cap to avoid explosion + cands = list(candidates)[:60] + + # Probe alive + Wayback in parallel + async def _check(url: str) -> dict: + status, lm = await _probe_alive(url) + wb = await _wayback_check(url) if status == 200 else None + age = _months_since(wb.get("timestamp", "") if wb else "") + in_footer = url.split("#")[0].split("?")[0] in footer_urls + return { + "url": url, + "status": status, + "last_modified": lm, + "wayback_snapshot": wb.get("snapshot_url") if wb else "", + "wayback_timestamp": wb.get("timestamp", "") if wb else "", + "age_months": age, + "in_footer": in_footer, + "recommendation": _recommend(status, age, False, in_footer), + } + + results = await asyncio.gather( + *[_check(u) for u in cands], return_exceptions=True, + ) + results = [r for r in results if isinstance(r, dict)] + + # Filter: only show interesting ones (≥200 reachable + legacy-relevant) + interesting: list[dict] = [] + for r in results: + if r["status"] == 0: + continue # Nicht erreichbar, nicht interessant + # 404/410/redirects nur wenn im footer → broken link + if r["status"] in (404, 410) and not r["in_footer"]: + continue + # 200 + im Footer + recent Wayback → "alles OK" filter + if (r["status"] == 200 and r["in_footer"] + and r["age_months"] is not None + and r["age_months"] < _LEGACY_AGE_MONTHS_THRESHOLD): + continue + interesting.append(r) + # Sort: Legacy-Verdächtige zuerst (200 + alt + nicht im Footer) + interesting.sort( + key=lambda r: ( + 0 if "Legacy-Verdacht" in r["recommendation"] else + 1 if "veraltet" in r["recommendation"] else 2, + -(r.get("age_months") or 0), + ), + ) + return { + "candidates": interesting, + "probed": len(results), + "filtered_kept": len(interesting), + "origins": list(origins), + } diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index 0224fc87..c1f95537 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -71,6 +71,9 @@ def compose_v2(state: dict) -> str: state.get("impressum_agent_html", ""), # B19 Cookie-Coherence-Check (Salesforce-as-essential etc.) state.get("cookie_coherence_html", ""), + # B20 Legacy-URL-Discovery + Multi-Version-DSE-Vergleich + state.get("multi_version_dse_html", ""), + state.get("legacy_url_html", ""), # Browser-Matrix (Stage 1.c) state.get("browser_matrix_html", ""), # All legacy build_*_html() wrapped in V2 sections — preserves diff --git a/backend-compliance/compliance/services/multi_version_dse.py b/backend-compliance/compliance/services/multi_version_dse.py new file mode 100644 index 00000000..db8b1f5a --- /dev/null +++ b/backend-compliance/compliance/services/multi_version_dse.py @@ -0,0 +1,215 @@ +"""Multi-Version-DSE-Analyse. + +Wenn Auto-Discovery + Legacy-URL-Discovery mehrere DSE-URLs auf der +gleichen Domain finden, vergleichen wir Key-Felder pro Variante: + - Stand-/Versionsdatum (sichtbar?) + - DSB-Name (Mollstraße vs Proliance vs …) + - Wortzahl (deutlich kürzere Version = veraltet?) + - SHA-256-Hash (für Audit-Trail) + +Output: HTML-Block mit Vergleichstabelle + roter Hinweis "Nur eine +Version kann gültig sein". Nicht-destruktiv: wir entscheiden NICHT +welche Variante richtig ist — wir präsentieren beide nebeneinander. + +Performance: cap auf max 3 zusätzliche DSE-URLs (Sitemap kann Hunderte +liefern, das würde 3min+ kosten). +""" + +from __future__ import annotations + +import hashlib +import logging +import re +from html import escape as h +from urllib.parse import urlparse + +import httpx + +logger = logging.getLogger(__name__) + + +_DSB_PATTERNS = ( + r"datenschutzbeauftragt\w*[\s\S]{0,200}?" + r"((?:[A-ZÄÖÜ][\w\-]{2,40}\s+){1,4}" + r"(?:GmbH|AG|GbR|Mollstr|Stra(?:ße|sse|sse)|str\.))", + r"(proliance\s+gmbh)", + r"(datenschutzexperte\.de)", +) + +_DATE_PATTERN = re.compile( + r"(?:stand|letzte\s+aktualisierung|version|effective)[:.]?\s*" + r"(\d{4}[-./]\d{1,2}(?:[-./]\d{1,2})?|" + r"(?:januar|februar|m(?:ae|ä)rz|april|mai|juni|juli|august|" + r"september|oktober|november|dezember)\s+\d{4}|" + r"\d{1,2}[./]\d{4})", + re.IGNORECASE, +) + + +async def _fetch_text(url: str) -> tuple[str, int]: + try: + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as c: + r = await c.get(url) + if r.status_code != 200: + return "", r.status_code + text = re.sub(r"", " ", r.text, + flags=re.S | re.I) + text = re.sub(r"", " ", text, + flags=re.S | re.I) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text).strip() + return text, 200 + except Exception as e: + logger.info("fetch failed for %s: %s", url, e) + return "", 0 + + +def _extract_dsb(text: str) -> str: + if not text: + return "" + for pat in _DSB_PATTERNS: + m = re.search(pat, text, re.IGNORECASE) + if m: + return (m.group(1) if m.lastindex else m.group(0))[:120].strip() + return "" + + +def _extract_date(text: str) -> str: + if not text: + return "" + m = _DATE_PATTERN.search(text) + return (m.group(1) if m else "")[:40].strip() + + +async def analyze_multiple_dse_versions(state: dict) -> dict: + """If ≥2 DSE-like URLs are reachable on the same domain, fetch + each and produce a comparison table.""" + doc_entries = state.get("doc_entries") or [] + legacy = (state.get("legacy_url_inventory") or {}).get("candidates") or [] + + # Collect DSE-candidate URLs from doc_entries + legacy-inventory + candidates: list[str] = [] + seen: set[str] = set() + for e in doc_entries: + if (e.get("doc_type") or "") != "dse": + continue + url = (e.get("url") or "").strip() + if url and url not in seen: + candidates.append(url) + seen.add(url) + for c in legacy: + url = (c.get("url") or "").strip() + if not url or url in seen: + continue + # Only DSE-ish URLs + url_lc = url.lower() + if any(k in url_lc for k in ( + "datenschutz", "privacy", "datenschutzerk", + )): + if c.get("status") == 200: + candidates.append(url) + seen.add(url) + + if len(candidates) < 2: + return {"versions": [], "skipped": "single_version_or_none"} + + # Cap to 3 for performance + candidates = candidates[:3] + versions: list[dict] = [] + for url in candidates: + text, status = await _fetch_text(url) + if not text: + continue + versions.append({ + "url": url, + "status": status, + "word_count": len(text.split()), + "sha256": hashlib.sha256(text.encode("utf-8")).hexdigest()[:16], + "date_found": _extract_date(text) or "kein Datum", + "dsb_found": _extract_dsb(text) or "—", + }) + + if len(versions) < 2: + return {"versions": versions, "skipped": "only_one_fetched"} + + # Detect contradictions + dates = {v["date_found"] for v in versions if v["date_found"] != "kein Datum"} + dsbs = {v["dsb_found"] for v in versions if v["dsb_found"] != "—"} + + return { + "versions": versions, + "date_divergent": len(dates) > 1, + "dsb_divergent": len(dsbs) > 1, + "no_date_count": sum( + 1 for v in versions if v["date_found"] == "kein Datum" + ), + } + + +def render_multi_version_block(info: dict) -> str: + versions = info.get("versions") or [] + if len(versions) < 2: + return "" + rows = [] + for v in versions: + rows.append( + f"" + f"" + f"" + f"{h(v['url'][:90])}" + f"" + f"{v['word_count']:,}" + f"{h(v['sha256'])}…" + f"" + f"{h(v['date_found'])}" + f"" + f"{h(v['dsb_found'])}" + f"" + ) + + warnings = [] + if info.get("date_divergent"): + warnings.append("verschiedene Datumsangaben") + if info.get("dsb_divergent"): + warnings.append("verschiedene DSB benannt") + if info.get("no_date_count"): + warnings.append( + f"{info['no_date_count']} von {len(versions)} ohne Datum" + ) + warn_html = "" + if warnings: + warn_html = ( + "

" + "Erkannte Inkonsistenzen: " + + " · ".join(warnings) + + "

" + ) + + return ( + "
" + f"

" + f"📑 Mehrere DSE-Versionen erkannt ({len(versions)})" + "

" + "

" + "Auf deiner Domain sind mehrere DSE-URLs öffentlich reachable. " + "Nur eine Version kann rechtsverbindlich gültig sein. " + "Wir prüfen jede unabhängig — der Kunde wählt das gültige " + "Ergebnis und sorgt dafür, dass die andere Variante " + "301-Redirect oder offline wird." + "

" + + warn_html + + "" + "" + "" + "" + "" + "" + "" + "" + "".join(rows) + "
URLWörterSHA-256DatumDSB benannt
" + "
" + )