Files
breakpilot-compliance/backend-compliance/compliance/services/multi_version_dse.py
T
Benjamin Admin 5c5d676f01
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / loc-budget (push) Failing after 11s
CI / python-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 28s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 10s
CI / go-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
feat: Plan B + A + C — DSE-Versions-MCs + Legacy-URL + Multi-Version
Drei verwandte Mechanismen für DSE-Beweisbarkeit + URL-Hygiene.

Plan B + PDF — Versions-Beweisbarkeit-MCs (dse_checks.py):
  - mc-dse_version_date (HIGH) — sichtbares Stand/Versionsdatum
    Pflicht. 12 Regex-Pattern: "Stand: April 2024", ISO-Datum,
    "Letzte Aktualisierung", "Version 3.2", englische
    Varianten ("Last updated", "Effective date as of …").
    Norm: Art. 7 Abs. 1 DSGVO (Nachweisbarkeit Einwilligung).
  - mc-dse_version_proof (MED) — PDF-Download oder
    versionierte Archiv-URL. Reine HTML-DSE ohne Snapshot ist
    juristisch fragil. 8 Pattern: .pdf, Download-Hinweis,
    web.archive.org, /dse-vNNN.html.
    Norm: DSK-Orientierungshilfe 2024.

Plan A — Legacy-URL-Discovery (legacy_url_discovery.py + B20):
  Vier komplementäre Quellen:
    A.1 /sitemap.xml + Sub-Sitemaps parsen, auf compliance-
        relevante Slugs filtern
    A.2 archive.org/wayback/available pro Slug — wenn Wayback
        zeigt ≥18 Monate alten Snapshot UND Seite heute noch
        200 liefert UND nicht im Footer → Legacy-Verdacht
    A.3 Slug-Permutations: 6 doc_types × 6 Slug-Varianten ×
        5 Lang-Prefixe × 4 Brand-Parameter
    A.4 Banner-Modal-Links (über consent-tester Stufe 4 Tour)
  Mail-Block "🗂️ Legacy-URL-Inventar" mit Tabelle: URL · HTTP ·
  Wayback-Alter · Footer · Empfehlung (301/Offline/Behalten).
  Engine entscheidet NICHT was Legacy ist — präsentiert das
  Inventar, Kunde wählt.

  Real-World-Smoke Elli:
    /en/cookies → HTTP 200, Wayback 69 Mo alt, nicht im Footer
                  → "Legacy-Verdacht, 301 setzen"
    /en/impressum → HTTP 302, redirected → "behalten"

Plan C — Multi-Version-DSE-Analyse (multi_version_dse.py):
  Wenn ≥2 DSE-URLs reachable: pro Variante DSB-Name + Datum +
  Wortzahl + SHA-256 extrahieren, Inkonsistenzen flaggen
  (date_divergent, dsb_divergent, no_date_count).
  Mail-Block "📑 Mehrere DSE-Versionen erkannt" mit
  Vergleichstabelle + rotem Hinweis "Nur eine Version kann
  gültig sein". Beispiel Elli: /de/datenschutz (Mollstr-DSB,
  2022) vs /de/datenschutzerklaerung?brand=elli (Proliance,
  ohne Datum).

API-Response erweitert um legacy_url_inventory +
html_blocks.legacy_urls + multi_version_dse_html im V2-Layout.

ENV-Override: LEGACY_URL_DISABLED=1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-08 10:04:14 +02:00

216 lines
7.5 KiB
Python

"""Multi-Version-DSE-Analyse.
Wenn Auto-Discovery + Legacy-URL-Discovery mehrere DSE-URLs auf der
gleichen Domain finden, vergleichen wir Key-Felder pro Variante:
- Stand-/Versionsdatum (sichtbar?)
- DSB-Name (Mollstraße vs Proliance vs …)
- Wortzahl (deutlich kürzere Version = veraltet?)
- SHA-256-Hash (für Audit-Trail)
Output: HTML-Block mit Vergleichstabelle + roter Hinweis "Nur eine
Version kann gültig sein". Nicht-destruktiv: wir entscheiden NICHT
welche Variante richtig ist — wir präsentieren beide nebeneinander.
Performance: cap auf max 3 zusätzliche DSE-URLs (Sitemap kann Hunderte
liefern, das würde 3min+ kosten).
"""
from __future__ import annotations
import hashlib
import logging
import re
from html import escape as h
from urllib.parse import urlparse
import httpx
logger = logging.getLogger(__name__)
_DSB_PATTERNS = (
r"datenschutzbeauftragt\w*[\s\S]{0,200}?"
r"((?:[A-ZÄÖÜ][\w\-]{2,40}\s+){1,4}"
r"(?:GmbH|AG|GbR|Mollstr|Stra(?:ße|sse|sse)|str\.))",
r"(proliance\s+gmbh)",
r"(datenschutzexperte\.de)",
)
_DATE_PATTERN = re.compile(
r"(?:stand|letzte\s+aktualisierung|version|effective)[:.]?\s*"
r"(\d{4}[-./]\d{1,2}(?:[-./]\d{1,2})?|"
r"(?:januar|februar|m(?:ae|ä)rz|april|mai|juni|juli|august|"
r"september|oktober|november|dezember)\s+\d{4}|"
r"\d{1,2}[./]\d{4})",
re.IGNORECASE,
)
async def _fetch_text(url: str) -> tuple[str, int]:
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as c:
r = await c.get(url)
if r.status_code != 200:
return "", r.status_code
text = re.sub(r"<script.*?</script>", " ", r.text,
flags=re.S | re.I)
text = re.sub(r"<style.*?</style>", " ", text,
flags=re.S | re.I)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text, 200
except Exception as e:
logger.info("fetch failed for %s: %s", url, e)
return "", 0
def _extract_dsb(text: str) -> str:
if not text:
return ""
for pat in _DSB_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
return (m.group(1) if m.lastindex else m.group(0))[:120].strip()
return ""
def _extract_date(text: str) -> str:
if not text:
return ""
m = _DATE_PATTERN.search(text)
return (m.group(1) if m else "")[:40].strip()
async def analyze_multiple_dse_versions(state: dict) -> dict:
"""If ≥2 DSE-like URLs are reachable on the same domain, fetch
each and produce a comparison table."""
doc_entries = state.get("doc_entries") or []
legacy = (state.get("legacy_url_inventory") or {}).get("candidates") or []
# Collect DSE-candidate URLs from doc_entries + legacy-inventory
candidates: list[str] = []
seen: set[str] = set()
for e in doc_entries:
if (e.get("doc_type") or "") != "dse":
continue
url = (e.get("url") or "").strip()
if url and url not in seen:
candidates.append(url)
seen.add(url)
for c in legacy:
url = (c.get("url") or "").strip()
if not url or url in seen:
continue
# Only DSE-ish URLs
url_lc = url.lower()
if any(k in url_lc for k in (
"datenschutz", "privacy", "datenschutzerk",
)):
if c.get("status") == 200:
candidates.append(url)
seen.add(url)
if len(candidates) < 2:
return {"versions": [], "skipped": "single_version_or_none"}
# Cap to 3 for performance
candidates = candidates[:3]
versions: list[dict] = []
for url in candidates:
text, status = await _fetch_text(url)
if not text:
continue
versions.append({
"url": url,
"status": status,
"word_count": len(text.split()),
"sha256": hashlib.sha256(text.encode("utf-8")).hexdigest()[:16],
"date_found": _extract_date(text) or "kein Datum",
"dsb_found": _extract_dsb(text) or "",
})
if len(versions) < 2:
return {"versions": versions, "skipped": "only_one_fetched"}
# Detect contradictions
dates = {v["date_found"] for v in versions if v["date_found"] != "kein Datum"}
dsbs = {v["dsb_found"] for v in versions if v["dsb_found"] != ""}
return {
"versions": versions,
"date_divergent": len(dates) > 1,
"dsb_divergent": len(dsbs) > 1,
"no_date_count": sum(
1 for v in versions if v["date_found"] == "kein Datum"
),
}
def render_multi_version_block(info: dict) -> str:
versions = info.get("versions") or []
if len(versions) < 2:
return ""
rows = []
for v in versions:
rows.append(
f"<tr>"
f"<td style='padding:6px 8px;font-family:monospace;font-size:11px;"
f"max-width:300px;word-break:break-all;'>"
f"<a href='{h(v['url'])}' style='color:#0369a1;'>"
f"{h(v['url'][:90])}</a></td>"
f"<td style='padding:6px 8px;font-size:11px;text-align:right;'>"
f"{v['word_count']:,}</td>"
f"<td style='padding:6px 8px;font-size:11px;font-family:monospace;"
f"color:#475569;'>{h(v['sha256'])}…</td>"
f"<td style='padding:6px 8px;font-size:11px;'>"
f"{h(v['date_found'])}</td>"
f"<td style='padding:6px 8px;font-size:11px;'>"
f"{h(v['dsb_found'])}</td>"
f"</tr>"
)
warnings = []
if info.get("date_divergent"):
warnings.append("verschiedene Datumsangaben")
if info.get("dsb_divergent"):
warnings.append("verschiedene DSB benannt")
if info.get("no_date_count"):
warnings.append(
f"{info['no_date_count']} von {len(versions)} ohne Datum"
)
warn_html = ""
if warnings:
warn_html = (
"<p style='margin:8px 0;padding:8px 12px;background:#fef3c7;"
"border-left:3px solid #f59e0b;font-size:12px;color:#92400e;'>"
"<strong>Erkannte Inkonsistenzen:</strong> "
+ " · ".join(warnings) +
"</p>"
)
return (
"<div style='margin:24px 0;padding:16px;border-left:4px solid #dc2626;"
"background:#fef2f2;border-radius:4px;'>"
f"<h2 style='margin:0 0 8px;color:#7f1d1d;font-size:16px;'>"
f"📑 Mehrere DSE-Versionen erkannt ({len(versions)})"
"</h2>"
"<p style='margin:0 0 8px;font-size:13px;color:#475569;'>"
"Auf deiner Domain sind mehrere DSE-URLs öffentlich reachable. "
"<strong>Nur eine Version kann rechtsverbindlich gültig sein.</strong> "
"Wir prüfen jede unabhängig — der Kunde wählt das gültige "
"Ergebnis und sorgt dafür, dass die andere Variante "
"<em>301-Redirect</em> oder <em>offline</em> wird."
"</p>"
+ warn_html +
"<table style='font-size:11px;width:100%;border-collapse:collapse;"
"background:#fff;border-radius:4px;'>"
"<thead><tr style='background:#fee2e2;'>"
"<th style='padding:6px 8px;text-align:left;'>URL</th>"
"<th style='padding:6px 8px;text-align:right;'>Wörter</th>"
"<th style='padding:6px 8px;text-align:left;'>SHA-256</th>"
"<th style='padding:6px 8px;text-align:left;'>Datum</th>"
"<th style='padding:6px 8px;text-align:left;'>DSB benannt</th>"
"</tr></thead><tbody>" + "".join(rows) + "</tbody></table>"
"</div>"
)