"""Legacy-URL-Discovery — systematische Suche nach veralteten DSE-/
Impressum-/Cookie-/AGB-URLs auf einer Domain.
Strategie aus 4 unabhängigen Quellen:
A.1 Sitemap-Parser — /sitemap.xml, /sitemap_index.xml, sitemap-de.xml,
sitemap-legal.xml
A.2 Wayback Machine — archive.org/wayback/available für jeden bekannten
Slug; URLs die vor ≥18 Monaten archiviert wurden
und heute noch 200 liefern = Legacy-Verdacht
A.3 Slug-Permutations — bekannte Slug-Familie × Locale/Brand-Parameter
A.4 Banner-Modal-Links — Playwright öffnet Cookie-Einstellungen-Modal
und sammelt alle Links (Plan A.4 wird via
consent-tester aufgerufen, hier nur Schema)
Output: Liste von Legacy-Kandidaten mit Status, last_modified, found_via,
recommended_action ("Redirect 301", "Offline nehmen", "Belassen — aktuell").
Best-Effort: jede Quelle catched eigene Exceptions — eine ausgefallene
Sitemap blockiert nicht Wayback.
"""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
import httpx
from compliance.services.legacy_url_cdx import cdx_enumerate
logger = logging.getLogger(__name__)
# Kanonische DE/EN Slug-Familie pro Doc-Type. Wir suchen jede dieser
# Pfade auf jeder Origin — auch wenn die Discovery sie schon hat,
# als unabhängige Verifikation.
_SLUG_FAMILY: dict[str, tuple[str, ...]] = {
"dse": (
"datenschutz", "datenschutzerklaerung", "datenschutzerklärung",
"datenschutzhinweise", "datenschutzhinweis",
"privacy", "privacy-policy", "privacy-notice",
"datenschutz-online", "dse",
),
"impressum": (
"impressum", "imprint", "legal-notice", "site-notice",
"anbieterkennzeichnung",
),
"cookie": (
"cookie-richtlinie", "cookies", "cookie-policy",
"cookie-erklaerung", "cookieerklaerung", "cookie-hinweise",
),
"agb": (
"agb", "allgemeine-geschaeftsbedingungen",
"geschaeftsbedingungen", "terms-and-conditions",
"general-terms-of-business",
),
"nutzungsbedingungen": (
"nutzungsbedingungen", "terms-of-use", "terms-of-service",
"nutzungsordnung",
),
"widerruf": (
"widerruf", "widerrufsbelehrung",
"widerrufsbelehrung-privatkunden", "cancellation",
),
}
_LANG_PREFIXES = ("", "/de", "/de_de", "/de-de", "/germany", "/en")
_BRAND_PARAMS = ("", "?brand=", "?lang=de", "?locale=de_DE")
_LEGACY_AGE_MONTHS_THRESHOLD = 18 # ältere = Legacy-Verdacht
async def _fetch_sitemap_urls(origin: str) -> list[str]:
"""A.1 — sitemap.xml + Varianten."""
candidates = (
f"{origin}/sitemap.xml",
f"{origin}/sitemap_index.xml",
f"{origin}/sitemap-de.xml",
f"{origin}/sitemap-legal.xml",
f"{origin}/sitemap-pages.xml",
)
out: set[str] = set()
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c:
for url in candidates:
try:
r = await c.get(url)
if r.status_code != 200:
continue
# Sitemap-Index: weitere Sitemaps verlinkt
locs = re.findall(r"([^<]+)", r.text)
for loc in locs:
loc = loc.strip()
if loc.endswith(".xml"):
# nested sitemap — fetch
try:
rr = await c.get(loc)
if rr.status_code == 200:
out.update(
m.strip() for m in
re.findall(r"([^<]+)",
rr.text)
if not m.strip().endswith(".xml")
)
except Exception:
continue
else:
out.add(loc)
except Exception:
continue
except Exception as e:
logger.info("sitemap fetch failed for %s: %s", origin, e)
return list(out)
async def _wayback_check(url: str) -> dict | None:
"""A.2 — Wayback-Machine. Return latest archived snapshot info."""
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.get(
"https://archive.org/wayback/available",
params={"url": url, "timestamp": "20200101"},
)
if r.status_code != 200:
return None
data = r.json() or {}
snap = (data.get("archived_snapshots") or {}).get("closest") or {}
if not snap.get("available"):
return None
ts = snap.get("timestamp", "")
return {
"snapshot_url": snap.get("url"),
"timestamp": ts,
"status": snap.get("status"),
}
except Exception:
return None
def _months_since(timestamp_yyyymmdd: str) -> int | None:
"""Wayback-Timestamp Format: YYYYMMDDHHMMSS."""
if not timestamp_yyyymmdd or len(timestamp_yyyymmdd) < 6:
return None
try:
snap = datetime.strptime(timestamp_yyyymmdd[:6], "%Y%m").replace(
tzinfo=timezone.utc,
)
now = datetime.now(timezone.utc)
delta = (now.year - snap.year) * 12 + (now.month - snap.month)
return max(0, delta)
except Exception:
return None
async def _probe_alive(url: str) -> tuple[int, str]:
"""Return (status_code, last_modified_header)."""
try:
async with httpx.AsyncClient(
timeout=6.0, follow_redirects=False,
) as c:
r = await c.head(url)
if r.status_code == 405:
r = await c.get(url)
return r.status_code, r.headers.get("last-modified", "")
except Exception:
return 0, ""
def _build_slug_candidates(origin: str) -> list[str]:
out: set[str] = set()
for doc_type, slugs in _SLUG_FAMILY.items():
for lang in _LANG_PREFIXES:
for slug in slugs:
base = f"{origin}{lang}/{slug}".replace("//", "/")
base = base.replace("https:/", "https://")
base = base.replace("http:/", "http://")
out.add(base)
for bp in _BRAND_PARAMS:
if bp:
out.add(base + bp)
return list(out)
def _filter_legal_urls(urls: list[str]) -> list[str]:
"""Compliance-relevante Pfade aus Sitemap-Output."""
keywords = []
for slugs in _SLUG_FAMILY.values():
keywords.extend(slugs)
keywords_lc = [k.lower() for k in keywords]
out: list[str] = []
for u in urls:
ul = u.lower()
if any(k in ul for k in keywords_lc):
out.append(u)
return out
def _recommend(status: int, age_months: int | None,
in_sitemap: bool, in_footer: bool) -> str:
if status == 404 or status == 410:
return "URL veraltet (404/410) — Backlinks prüfen, ggf. 301 setzen"
if status == 0:
return "Nicht erreichbar — manuell prüfen"
if status in (301, 302, 303, 307, 308):
return "Bereits redirected — behalten"
if status == 200:
if age_months is None:
return "Erreichbar, kein Wayback-Stand — Inhalt manuell prüfen"
if age_months >= _LEGACY_AGE_MONTHS_THRESHOLD and not in_footer:
return (
f"Legacy-Verdacht ({age_months} Monate altes Wayback, "
"nicht im Footer verlinkt) — 301-Redirect auf aktuelle "
"Version setzen ODER offline nehmen"
)
if age_months >= 36 and in_footer:
return (
f"Reachable + im Footer, aber Wayback {age_months} Monate "
"alt — manuell prüfen ob Inhalt noch aktuell"
)
return "Aktuell, kein Handlungsbedarf"
return f"HTTP {status} — manuell prüfen"
async def discover_legacy_urls(state: dict) -> dict:
"""Run all 4 sources + consolidate. Returns dict for HTML rendering."""
doc_entries = state.get("doc_entries") or []
origins: set[str] = set()
footer_urls: set[str] = set()
for e in doc_entries:
url = (e.get("url") or "").strip()
if url and "://" in url:
p = urlparse(url)
origins.add(f"{p.scheme}://{p.netloc}")
footer_urls.add(url.split("#")[0].split("?")[0])
if not origins:
return {"candidates": [], "skipped": "no_origin"}
candidates: set[str] = set()
# A.1 Sitemap + A.3 Slug-Permutations
for o in list(origins)[:2]:
sitemap_urls = await _fetch_sitemap_urls(o)
candidates.update(_filter_legal_urls(sitemap_urls))
candidates.update(_build_slug_candidates(o))
# A.5 Wayback-CDX: alle je archivierten URLs der Domain → faengt
# Orphans, die nie im Slug-Raster standen. (url, cdx_timestamp); der
# timestamp dient als Legacy-Alter (kein zweiter Wayback-Call noetig).
cdx_pairs: list[tuple[str, str]] = []
for o in list(origins)[:2]:
cdx_pairs.extend(await cdx_enumerate(o))
cdx_legal_urls = set(_filter_legal_urls([u for u, _ in cdx_pairs]))
cdx_legal = [
(u, ts) for (u, ts) in cdx_pairs
if u in cdx_legal_urls and u not in candidates
][:100]
# Cap to avoid explosion
cands = list(candidates)[:60]
# Probe alive + Wayback in parallel
async def _check(url: str) -> dict:
status, lm = await _probe_alive(url)
wb = await _wayback_check(url) if status == 200 else None
age = _months_since(wb.get("timestamp", "") if wb else "")
in_footer = url.split("#")[0].split("?")[0] in footer_urls
return {
"url": url,
"status": status,
"last_modified": lm,
"wayback_snapshot": wb.get("snapshot_url") if wb else "",
"wayback_timestamp": wb.get("timestamp", "") if wb else "",
"age_months": age,
"in_footer": in_footer,
"recommendation": _recommend(status, age, False, in_footer),
"via": "sitemap/slug",
}
# CDX-Kandidaten: nur Liveness pruefen (Archiv-Stand kennen wir schon).
async def _check_cdx(url: str, ts: str) -> dict:
status, lm = await _probe_alive(url)
age = _months_since(ts)
in_footer = url.split("#")[0].split("?")[0] in footer_urls
return {
"url": url,
"status": status,
"last_modified": lm,
"wayback_snapshot": "",
"wayback_timestamp": ts,
"age_months": age,
"in_footer": in_footer,
"recommendation": _recommend(status, age, False, in_footer),
"via": "wayback-cdx",
}
gathered = await asyncio.gather(
*[_check(u) for u in cands],
*[_check_cdx(u, ts) for u, ts in cdx_legal],
return_exceptions=True,
)
results = [r for r in gathered if isinstance(r, dict)]
# Filter: only show interesting ones (≥200 reachable + legacy-relevant)
interesting: list[dict] = []
for r in results:
if r["status"] == 0:
continue # Nicht erreichbar, nicht interessant
# 404/410/redirects nur wenn im footer → broken link
if r["status"] in (404, 410) and not r["in_footer"]:
continue
# 200 + im Footer + recent Wayback → "alles OK" filter
if (r["status"] == 200 and r["in_footer"]
and r["age_months"] is not None
and r["age_months"] < _LEGACY_AGE_MONTHS_THRESHOLD):
continue
interesting.append(r)
# Sort: Legacy-Verdächtige zuerst (200 + alt + nicht im Footer)
interesting.sort(
key=lambda r: (
0 if "Legacy-Verdacht" in r["recommendation"] else
1 if "veraltet" in r["recommendation"] else 2,
-(r.get("age_months") or 0),
),
)
return {
"candidates": interesting,
"probed": len(results),
"filtered_kept": len(interesting),
"cdx_candidates": len(cdx_legal),
"origins": list(origins),
}