Files
breakpilot-compliance/backend-compliance/compliance/services/legacy_url_discovery.py
T
Benjamin Admin 08c08fcba2
CI / test-python-backend (push) Successful in 30s
CI / detect-changes (push) Successful in 9s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 12s
CI / loc-budget (push) Successful in 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
feat(crawl): Vollstaendigkeit — Shadow-DOM/versteckte Links + Interaktions-Fixpunkt + Wayback-CDX-Orphans
Damit die Specialist-Agents auf vollstaendigem Website-Content arbeiten:

A — _find_dsi_links pierct jetzt Shadow-DOM (Web-Components wie Usercentrics/
    Mercedes) rekursiv; versteckte (display:none) Links werden erfasst + als
    Coverage-Metadatum geflaggt.
B — _expand_to_fixpoint klappt Akkordeons/Tabs/Hover-Menues in einer Schleife
    auf, bis das DOM stabil ist (statt 1 Pass); erweiterte Selektoren;
    Coverage-Telemetrie (Runden, expandierte Elemente, DOM-Wachstum, Shadow-/
    versteckte Links) → Response + Backend-Log.
C — legacy_url_cdx.cdx_enumerate listet via Wayback-CDX-API ALLE je
    archivierten URLs der Domain → findet Orphan-/Legacy-Seiten, die nie im
    Slug-Raster standen (z.B. nicht mehr verlinktes /datenschutz, per Direkt-
    URL noch erreichbar). Fliesst durch das bestehende Legacy-URL-Inventar.

Tests: test_legacy_url_cdx.py (6) + consent-tester/tests/test_dsi_discovery.py
(Pure-Helper + Real-Browser-Integration). Alle gruen, LOC-Gate gruen.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 12:33:34 +02:00

336 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Legacy-URL-Discovery — systematische Suche nach veralteten DSE-/
Impressum-/Cookie-/AGB-URLs auf einer Domain.
Strategie aus 4 unabhängigen Quellen:
A.1 Sitemap-Parser — /sitemap.xml, /sitemap_index.xml, sitemap-de.xml,
sitemap-legal.xml
A.2 Wayback Machine — archive.org/wayback/available für jeden bekannten
Slug; URLs die vor ≥18 Monaten archiviert wurden
und heute noch 200 liefern = Legacy-Verdacht
A.3 Slug-Permutations — bekannte Slug-Familie × Locale/Brand-Parameter
A.4 Banner-Modal-Links — Playwright öffnet Cookie-Einstellungen-Modal
und sammelt alle Links (Plan A.4 wird via
consent-tester aufgerufen, hier nur Schema)
Output: Liste von Legacy-Kandidaten mit Status, last_modified, found_via,
recommended_action ("Redirect 301", "Offline nehmen", "Belassen — aktuell").
Best-Effort: jede Quelle catched eigene Exceptions — eine ausgefallene
Sitemap blockiert nicht Wayback.
"""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
import httpx
from compliance.services.legacy_url_cdx import cdx_enumerate
logger = logging.getLogger(__name__)
# Kanonische DE/EN Slug-Familie pro Doc-Type. Wir suchen jede dieser
# Pfade auf jeder Origin — auch wenn die Discovery sie schon hat,
# als unabhängige Verifikation.
_SLUG_FAMILY: dict[str, tuple[str, ...]] = {
"dse": (
"datenschutz", "datenschutzerklaerung", "datenschutzerklärung",
"datenschutzhinweise", "datenschutzhinweis",
"privacy", "privacy-policy", "privacy-notice",
"datenschutz-online", "dse",
),
"impressum": (
"impressum", "imprint", "legal-notice", "site-notice",
"anbieterkennzeichnung",
),
"cookie": (
"cookie-richtlinie", "cookies", "cookie-policy",
"cookie-erklaerung", "cookieerklaerung", "cookie-hinweise",
),
"agb": (
"agb", "allgemeine-geschaeftsbedingungen",
"geschaeftsbedingungen", "terms-and-conditions",
"general-terms-of-business",
),
"nutzungsbedingungen": (
"nutzungsbedingungen", "terms-of-use", "terms-of-service",
"nutzungsordnung",
),
"widerruf": (
"widerruf", "widerrufsbelehrung",
"widerrufsbelehrung-privatkunden", "cancellation",
),
}
_LANG_PREFIXES = ("", "/de", "/de_de", "/de-de", "/germany", "/en")
_BRAND_PARAMS = ("", "?brand=", "?lang=de", "?locale=de_DE")
_LEGACY_AGE_MONTHS_THRESHOLD = 18 # ältere = Legacy-Verdacht
async def _fetch_sitemap_urls(origin: str) -> list[str]:
"""A.1 — sitemap.xml + Varianten."""
candidates = (
f"{origin}/sitemap.xml",
f"{origin}/sitemap_index.xml",
f"{origin}/sitemap-de.xml",
f"{origin}/sitemap-legal.xml",
f"{origin}/sitemap-pages.xml",
)
out: set[str] = set()
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as c:
for url in candidates:
try:
r = await c.get(url)
if r.status_code != 200:
continue
# Sitemap-Index: weitere Sitemaps verlinkt
locs = re.findall(r"<loc>([^<]+)</loc>", r.text)
for loc in locs:
loc = loc.strip()
if loc.endswith(".xml"):
# nested sitemap — fetch
try:
rr = await c.get(loc)
if rr.status_code == 200:
out.update(
m.strip() for m in
re.findall(r"<loc>([^<]+)</loc>",
rr.text)
if not m.strip().endswith(".xml")
)
except Exception:
continue
else:
out.add(loc)
except Exception:
continue
except Exception as e:
logger.info("sitemap fetch failed for %s: %s", origin, e)
return list(out)
async def _wayback_check(url: str) -> dict | None:
"""A.2 — Wayback-Machine. Return latest archived snapshot info."""
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.get(
"https://archive.org/wayback/available",
params={"url": url, "timestamp": "20200101"},
)
if r.status_code != 200:
return None
data = r.json() or {}
snap = (data.get("archived_snapshots") or {}).get("closest") or {}
if not snap.get("available"):
return None
ts = snap.get("timestamp", "")
return {
"snapshot_url": snap.get("url"),
"timestamp": ts,
"status": snap.get("status"),
}
except Exception:
return None
def _months_since(timestamp_yyyymmdd: str) -> int | None:
"""Wayback-Timestamp Format: YYYYMMDDHHMMSS."""
if not timestamp_yyyymmdd or len(timestamp_yyyymmdd) < 6:
return None
try:
snap = datetime.strptime(timestamp_yyyymmdd[:6], "%Y%m").replace(
tzinfo=timezone.utc,
)
now = datetime.now(timezone.utc)
delta = (now.year - snap.year) * 12 + (now.month - snap.month)
return max(0, delta)
except Exception:
return None
async def _probe_alive(url: str) -> tuple[int, str]:
"""Return (status_code, last_modified_header)."""
try:
async with httpx.AsyncClient(
timeout=6.0, follow_redirects=False,
) as c:
r = await c.head(url)
if r.status_code == 405:
r = await c.get(url)
return r.status_code, r.headers.get("last-modified", "")
except Exception:
return 0, ""
def _build_slug_candidates(origin: str) -> list[str]:
out: set[str] = set()
for doc_type, slugs in _SLUG_FAMILY.items():
for lang in _LANG_PREFIXES:
for slug in slugs:
base = f"{origin}{lang}/{slug}".replace("//", "/")
base = base.replace("https:/", "https://")
base = base.replace("http:/", "http://")
out.add(base)
for bp in _BRAND_PARAMS:
if bp:
out.add(base + bp)
return list(out)
def _filter_legal_urls(urls: list[str]) -> list[str]:
"""Compliance-relevante Pfade aus Sitemap-Output."""
keywords = []
for slugs in _SLUG_FAMILY.values():
keywords.extend(slugs)
keywords_lc = [k.lower() for k in keywords]
out: list[str] = []
for u in urls:
ul = u.lower()
if any(k in ul for k in keywords_lc):
out.append(u)
return out
def _recommend(status: int, age_months: int | None,
in_sitemap: bool, in_footer: bool) -> str:
if status == 404 or status == 410:
return "URL veraltet (404/410) — Backlinks prüfen, ggf. 301 setzen"
if status == 0:
return "Nicht erreichbar — manuell prüfen"
if status in (301, 302, 303, 307, 308):
return "Bereits redirected — behalten"
if status == 200:
if age_months is None:
return "Erreichbar, kein Wayback-Stand — Inhalt manuell prüfen"
if age_months >= _LEGACY_AGE_MONTHS_THRESHOLD and not in_footer:
return (
f"Legacy-Verdacht ({age_months} Monate altes Wayback, "
"nicht im Footer verlinkt) — 301-Redirect auf aktuelle "
"Version setzen ODER offline nehmen"
)
if age_months >= 36 and in_footer:
return (
f"Reachable + im Footer, aber Wayback {age_months} Monate "
"alt — manuell prüfen ob Inhalt noch aktuell"
)
return "Aktuell, kein Handlungsbedarf"
return f"HTTP {status} — manuell prüfen"
async def discover_legacy_urls(state: dict) -> dict:
"""Run all 4 sources + consolidate. Returns dict for HTML rendering."""
doc_entries = state.get("doc_entries") or []
origins: set[str] = set()
footer_urls: set[str] = set()
for e in doc_entries:
url = (e.get("url") or "").strip()
if url and "://" in url:
p = urlparse(url)
origins.add(f"{p.scheme}://{p.netloc}")
footer_urls.add(url.split("#")[0].split("?")[0])
if not origins:
return {"candidates": [], "skipped": "no_origin"}
candidates: set[str] = set()
# A.1 Sitemap + A.3 Slug-Permutations
for o in list(origins)[:2]:
sitemap_urls = await _fetch_sitemap_urls(o)
candidates.update(_filter_legal_urls(sitemap_urls))
candidates.update(_build_slug_candidates(o))
# A.5 Wayback-CDX: alle je archivierten URLs der Domain → faengt
# Orphans, die nie im Slug-Raster standen. (url, cdx_timestamp); der
# timestamp dient als Legacy-Alter (kein zweiter Wayback-Call noetig).
cdx_pairs: list[tuple[str, str]] = []
for o in list(origins)[:2]:
cdx_pairs.extend(await cdx_enumerate(o))
cdx_legal_urls = set(_filter_legal_urls([u for u, _ in cdx_pairs]))
cdx_legal = [
(u, ts) for (u, ts) in cdx_pairs
if u in cdx_legal_urls and u not in candidates
][:100]
# Cap to avoid explosion
cands = list(candidates)[:60]
# Probe alive + Wayback in parallel
async def _check(url: str) -> dict:
status, lm = await _probe_alive(url)
wb = await _wayback_check(url) if status == 200 else None
age = _months_since(wb.get("timestamp", "") if wb else "")
in_footer = url.split("#")[0].split("?")[0] in footer_urls
return {
"url": url,
"status": status,
"last_modified": lm,
"wayback_snapshot": wb.get("snapshot_url") if wb else "",
"wayback_timestamp": wb.get("timestamp", "") if wb else "",
"age_months": age,
"in_footer": in_footer,
"recommendation": _recommend(status, age, False, in_footer),
"via": "sitemap/slug",
}
# CDX-Kandidaten: nur Liveness pruefen (Archiv-Stand kennen wir schon).
async def _check_cdx(url: str, ts: str) -> dict:
status, lm = await _probe_alive(url)
age = _months_since(ts)
in_footer = url.split("#")[0].split("?")[0] in footer_urls
return {
"url": url,
"status": status,
"last_modified": lm,
"wayback_snapshot": "",
"wayback_timestamp": ts,
"age_months": age,
"in_footer": in_footer,
"recommendation": _recommend(status, age, False, in_footer),
"via": "wayback-cdx",
}
gathered = await asyncio.gather(
*[_check(u) for u in cands],
*[_check_cdx(u, ts) for u, ts in cdx_legal],
return_exceptions=True,
)
results = [r for r in gathered if isinstance(r, dict)]
# Filter: only show interesting ones (≥200 reachable + legacy-relevant)
interesting: list[dict] = []
for r in results:
if r["status"] == 0:
continue # Nicht erreichbar, nicht interessant
# 404/410/redirects nur wenn im footer → broken link
if r["status"] in (404, 410) and not r["in_footer"]:
continue
# 200 + im Footer + recent Wayback → "alles OK" filter
if (r["status"] == 200 and r["in_footer"]
and r["age_months"] is not None
and r["age_months"] < _LEGACY_AGE_MONTHS_THRESHOLD):
continue
interesting.append(r)
# Sort: Legacy-Verdächtige zuerst (200 + alt + nicht im Footer)
interesting.sort(
key=lambda r: (
0 if "Legacy-Verdacht" in r["recommendation"] else
1 if "veraltet" in r["recommendation"] else 2,
-(r.get("age_months") or 0),
),
)
return {
"candidates": interesting,
"probed": len(results),
"filtered_kept": len(interesting),
"cdx_candidates": len(cdx_legal),
"origins": list(origins),
}