Files
breakpilot-compliance/backend-compliance/compliance/services/legacy_url_discovery.py
T
Benjamin Admin 5c5d676f01
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / loc-budget (push) Failing after 11s
CI / python-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 28s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 10s
CI / go-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
feat: Plan B + A + C — DSE-Versions-MCs + Legacy-URL + Multi-Version
Drei verwandte Mechanismen für DSE-Beweisbarkeit + URL-Hygiene.

Plan B + PDF — Versions-Beweisbarkeit-MCs (dse_checks.py):
  - mc-dse_version_date (HIGH) — sichtbares Stand/Versionsdatum
    Pflicht. 12 Regex-Pattern: "Stand: April 2024", ISO-Datum,
    "Letzte Aktualisierung", "Version 3.2", englische
    Varianten ("Last updated", "Effective date as of …").
    Norm: Art. 7 Abs. 1 DSGVO (Nachweisbarkeit Einwilligung).
  - mc-dse_version_proof (MED) — PDF-Download oder
    versionierte Archiv-URL. Reine HTML-DSE ohne Snapshot ist
    juristisch fragil. 8 Pattern: .pdf, Download-Hinweis,
    web.archive.org, /dse-vNNN.html.
    Norm: DSK-Orientierungshilfe 2024.

Plan A — Legacy-URL-Discovery (legacy_url_discovery.py + B20):
  Vier komplementäre Quellen:
    A.1 /sitemap.xml + Sub-Sitemaps parsen, auf compliance-
        relevante Slugs filtern
    A.2 archive.org/wayback/available pro Slug — wenn Wayback
        zeigt ≥18 Monate alten Snapshot UND Seite heute noch
        200 liefert UND nicht im Footer → Legacy-Verdacht
    A.3 Slug-Permutations: 6 doc_types × 6 Slug-Varianten ×
        5 Lang-Prefixe × 4 Brand-Parameter
    A.4 Banner-Modal-Links (über consent-tester Stufe 4 Tour)
  Mail-Block "🗂️ Legacy-URL-Inventar" mit Tabelle: URL · HTTP ·
  Wayback-Alter · Footer · Empfehlung (301/Offline/Behalten).
  Engine entscheidet NICHT was Legacy ist — präsentiert das
  Inventar, Kunde wählt.

  Real-World-Smoke Elli:
    /en/cookies → HTTP 200, Wayback 69 Mo alt, nicht im Footer
                  → "Legacy-Verdacht, 301 setzen"
    /en/impressum → HTTP 302, redirected → "behalten"

Plan C — Multi-Version-DSE-Analyse (multi_version_dse.py):
  Wenn ≥2 DSE-URLs reachable: pro Variante DSB-Name + Datum +
  Wortzahl + SHA-256 extrahieren, Inkonsistenzen flaggen
  (date_divergent, dsb_divergent, no_date_count).
  Mail-Block "📑 Mehrere DSE-Versionen erkannt" mit
  Vergleichstabelle + rotem Hinweis "Nur eine Version kann
  gültig sein". Beispiel Elli: /de/datenschutz (Mollstr-DSB,
  2022) vs /de/datenschutzerklaerung?brand=elli (Proliance,
  ohne Datum).

API-Response erweitert um legacy_url_inventory +
html_blocks.legacy_urls + multi_version_dse_html im V2-Layout.

ENV-Override: LEGACY_URL_DISABLED=1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-08 10:04:14 +02:00

302 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Legacy-URL-Discovery — systematische Suche nach veralteten DSE-/
Impressum-/Cookie-/AGB-URLs auf einer Domain.
Strategie aus 4 unabhängigen Quellen:
A.1 Sitemap-Parser — /sitemap.xml, /sitemap_index.xml, sitemap-de.xml,
sitemap-legal.xml
A.2 Wayback Machine — archive.org/wayback/available für jeden bekannten
Slug; URLs die vor ≥18 Monaten archiviert wurden
und heute noch 200 liefern = Legacy-Verdacht
A.3 Slug-Permutations — bekannte Slug-Familie × Locale/Brand-Parameter
A.4 Banner-Modal-Links — Playwright öffnet Cookie-Einstellungen-Modal
und sammelt alle Links (Plan A.4 wird via
consent-tester aufgerufen, hier nur Schema)
Output: Liste von Legacy-Kandidaten mit Status, last_modified, found_via,
recommended_action ("Redirect 301", "Offline nehmen", "Belassen — aktuell").
Best-Effort: jede Quelle catched eigene Exceptions — eine ausgefallene
Sitemap blockiert nicht Wayback.
"""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
import httpx
logger = logging.getLogger(__name__)
# Kanonische DE/EN Slug-Familie pro Doc-Type. Wir suchen jede dieser
# Pfade auf jeder Origin — auch wenn die Discovery sie schon hat,
# als unabhängige Verifikation.
_SLUG_FAMILY: dict[str, tuple[str, ...]] = {
"dse": (
"datenschutz", "datenschutzerklaerung", "datenschutzerklärung",
"datenschutzhinweise", "datenschutzhinweis",
"privacy", "privacy-policy", "privacy-notice",
"datenschutz-online", "dse",
),
"impressum": (
"impressum", "imprint", "legal-notice", "site-notice",
"anbieterkennzeichnung",
),
"cookie": (
"cookie-richtlinie", "cookies", "cookie-policy",
"cookie-erklaerung", "cookieerklaerung", "cookie-hinweise",
),
"agb": (
"agb", "allgemeine-geschaeftsbedingungen",
"geschaeftsbedingungen", "terms-and-conditions",
"general-terms-of-business",
),
"nutzungsbedingungen": (
"nutzungsbedingungen", "terms-of-use", "terms-of-service",
"nutzungsordnung",
),
"widerruf": (
"widerruf", "widerrufsbelehrung",
"widerrufsbelehrung-privatkunden", "cancellation",
),
}
_LANG_PREFIXES = ("", "/de", "/de_de", "/de-de", "/germany", "/en")
_BRAND_PARAMS = ("", "?brand=", "?lang=de", "?locale=de_DE")
_LEGACY_AGE_MONTHS_THRESHOLD = 18 # ältere = Legacy-Verdacht
async def _fetch_sitemap_urls(origin: str) -> list[str]:
"""A.1 — sitemap.xml + Varianten."""
candidates = (
f"{origin}/sitemap.xml",
f"{origin}/sitemap_index.xml",
f"{origin}/sitemap-de.xml",
f"{origin}/sitemap-legal.xml",
f"{origin}/sitemap-pages.xml",
)
out: set[str] = set()
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c:
for url in candidates:
try:
r = await c.get(url)
if r.status_code != 200:
continue
# Sitemap-Index: weitere Sitemaps verlinkt
locs = re.findall(r"<loc>([^<]+)</loc>", r.text)
for loc in locs:
loc = loc.strip()
if loc.endswith(".xml"):
# nested sitemap — fetch
try:
rr = await c.get(loc)
if rr.status_code == 200:
out.update(
m.strip() for m in
re.findall(r"<loc>([^<]+)</loc>",
rr.text)
if not m.strip().endswith(".xml")
)
except Exception:
continue
else:
out.add(loc)
except Exception:
continue
except Exception as e:
logger.info("sitemap fetch failed for %s: %s", origin, e)
return list(out)
async def _wayback_check(url: str) -> dict | None:
"""A.2 — Wayback-Machine. Return latest archived snapshot info."""
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.get(
"https://archive.org/wayback/available",
params={"url": url, "timestamp": "20200101"},
)
if r.status_code != 200:
return None
data = r.json() or {}
snap = (data.get("archived_snapshots") or {}).get("closest") or {}
if not snap.get("available"):
return None
ts = snap.get("timestamp", "")
return {
"snapshot_url": snap.get("url"),
"timestamp": ts,
"status": snap.get("status"),
}
except Exception:
return None
def _months_since(timestamp_yyyymmdd: str) -> int | None:
"""Wayback-Timestamp Format: YYYYMMDDHHMMSS."""
if not timestamp_yyyymmdd or len(timestamp_yyyymmdd) < 6:
return None
try:
snap = datetime.strptime(timestamp_yyyymmdd[:6], "%Y%m").replace(
tzinfo=timezone.utc,
)
now = datetime.now(timezone.utc)
delta = (now.year - snap.year) * 12 + (now.month - snap.month)
return max(0, delta)
except Exception:
return None
async def _probe_alive(url: str) -> tuple[int, str]:
"""Return (status_code, last_modified_header)."""
try:
async with httpx.AsyncClient(
timeout=6.0, follow_redirects=False,
) as c:
r = await c.head(url)
if r.status_code == 405:
r = await c.get(url)
return r.status_code, r.headers.get("last-modified", "")
except Exception:
return 0, ""
def _build_slug_candidates(origin: str) -> list[str]:
out: set[str] = set()
for doc_type, slugs in _SLUG_FAMILY.items():
for lang in _LANG_PREFIXES:
for slug in slugs:
base = f"{origin}{lang}/{slug}".replace("//", "/")
base = base.replace("https:/", "https://")
base = base.replace("http:/", "http://")
out.add(base)
for bp in _BRAND_PARAMS:
if bp:
out.add(base + bp)
return list(out)
def _filter_legal_urls(urls: list[str]) -> list[str]:
"""Compliance-relevante Pfade aus Sitemap-Output."""
keywords = []
for slugs in _SLUG_FAMILY.values():
keywords.extend(slugs)
keywords_lc = [k.lower() for k in keywords]
out: list[str] = []
for u in urls:
ul = u.lower()
if any(k in ul for k in keywords_lc):
out.append(u)
return out
def _recommend(status: int, age_months: int | None,
in_sitemap: bool, in_footer: bool) -> str:
if status == 404 or status == 410:
return "URL veraltet (404/410) — Backlinks prüfen, ggf. 301 setzen"
if status == 0:
return "Nicht erreichbar — manuell prüfen"
if status in (301, 302, 303, 307, 308):
return "Bereits redirected — behalten"
if status == 200:
if age_months is None:
return "Erreichbar, kein Wayback-Stand — Inhalt manuell prüfen"
if age_months >= _LEGACY_AGE_MONTHS_THRESHOLD and not in_footer:
return (
f"Legacy-Verdacht ({age_months} Monate altes Wayback, "
"nicht im Footer verlinkt) — 301-Redirect auf aktuelle "
"Version setzen ODER offline nehmen"
)
if age_months >= 36 and in_footer:
return (
f"Reachable + im Footer, aber Wayback {age_months} Monate "
"alt — manuell prüfen ob Inhalt noch aktuell"
)
return "Aktuell, kein Handlungsbedarf"
return f"HTTP {status} — manuell prüfen"
async def discover_legacy_urls(state: dict) -> dict:
"""Run all 4 sources + consolidate. Returns dict for HTML rendering."""
doc_entries = state.get("doc_entries") or []
origins: set[str] = set()
footer_urls: set[str] = set()
for e in doc_entries:
url = (e.get("url") or "").strip()
if url and "://" in url:
p = urlparse(url)
origins.add(f"{p.scheme}://{p.netloc}")
footer_urls.add(url.split("#")[0].split("?")[0])
if not origins:
return {"candidates": [], "skipped": "no_origin"}
candidates: set[str] = set()
# A.1 Sitemap
for o in list(origins)[:2]:
sitemap_urls = await _fetch_sitemap_urls(o)
candidates.update(_filter_legal_urls(sitemap_urls))
# A.3 Slug-Permutations
candidates.update(_build_slug_candidates(o))
# Cap to avoid explosion
cands = list(candidates)[:60]
# Probe alive + Wayback in parallel
async def _check(url: str) -> dict:
status, lm = await _probe_alive(url)
wb = await _wayback_check(url) if status == 200 else None
age = _months_since(wb.get("timestamp", "") if wb else "")
in_footer = url.split("#")[0].split("?")[0] in footer_urls
return {
"url": url,
"status": status,
"last_modified": lm,
"wayback_snapshot": wb.get("snapshot_url") if wb else "",
"wayback_timestamp": wb.get("timestamp", "") if wb else "",
"age_months": age,
"in_footer": in_footer,
"recommendation": _recommend(status, age, False, in_footer),
}
results = await asyncio.gather(
*[_check(u) for u in cands], return_exceptions=True,
)
results = [r for r in results if isinstance(r, dict)]
# Filter: only show interesting ones (≥200 reachable + legacy-relevant)
interesting: list[dict] = []
for r in results:
if r["status"] == 0:
continue # Nicht erreichbar, nicht interessant
# 404/410/redirects nur wenn im footer → broken link
if r["status"] in (404, 410) and not r["in_footer"]:
continue
# 200 + im Footer + recent Wayback → "alles OK" filter
if (r["status"] == 200 and r["in_footer"]
and r["age_months"] is not None
and r["age_months"] < _LEGACY_AGE_MONTHS_THRESHOLD):
continue
interesting.append(r)
# Sort: Legacy-Verdächtige zuerst (200 + alt + nicht im Footer)
interesting.sort(
key=lambda r: (
0 if "Legacy-Verdacht" in r["recommendation"] else
1 if "veraltet" in r["recommendation"] else 2,
-(r.get("age_months") or 0),
),
)
return {
"candidates": interesting,
"probed": len(results),
"filtered_kept": len(interesting),
"origins": list(origins),
}