diff --git a/backend-compliance/compliance/api/agent_check/_fetch.py b/backend-compliance/compliance/api/agent_check/_fetch.py index 52c96e7e..e08c2bc8 100644 --- a/backend-compliance/compliance/api/agent_check/_fetch.py +++ b/backend-compliance/compliance/api/agent_check/_fetch.py @@ -54,6 +54,17 @@ async def _fetch_text(url: str, doc_type: str = "") -> tuple[str, list[dict]]: docs = payload.get("documents", []) cmp_payloads = payload.get("cmp_payloads") or [] cmp_cookie_text = payload.get("cmp_cookie_text") or "" + coverage = payload.get("coverage") or {} + if coverage: + logger.info( + "Crawl-Coverage %s: %d Interaktions-Runden, " + "%d Elemente expandiert, %d Shadow-Links, " + "%d versteckte Links", + url, coverage.get("interaction_rounds", 0), + coverage.get("elements_expanded", 0), + coverage.get("shadow_links_found", 0), + coverage.get("hidden_links_found", 0), + ) # D — wenn der consent-tester HTML-Tabellen aus dem DOM # extrahiert hat, in die cmp_payloads als "generic_table" # einschleusen damit das Backend sie via cookies_table_parser diff --git a/backend-compliance/compliance/services/legacy_url_cdx.py b/backend-compliance/compliance/services/legacy_url_cdx.py new file mode 100644 index 00000000..a54849da --- /dev/null +++ b/backend-compliance/compliance/services/legacy_url_cdx.py @@ -0,0 +1,89 @@ +"""Wayback-CDX-Enumeration — listet ALLE je archivierten URLs einer Domain. + +Anders als die per-Slug-Wayback-Pruefung (legacy_url_discovery._wayback_check) +holen wir hier die KOMPLETTE History-Liste der Domain ueber die CDX-API. So +finden wir Orphan-/Legacy-Seiten, die nie im Slug-Raster standen und heute +nicht mehr verlinkt sind, aber per Direkt-URL noch erreichbar — genau der Fall +"www.xyz.com/datenschutz existierte mal, wurde nie entfernt". + +Best-effort: jede Exception → leere Liste, blockiert die uebrige Discovery nie. +""" + +from __future__ import annotations + +import logging +from urllib.parse import urlparse + +import httpx + +logger = logging.getLogger(__name__) + +_CDX_API = "http://web.archive.org/cdx/search/cdx" + +# Nicht-HTML-Assets, die uns fuer Rechts-Content nicht interessieren. +_ASSET_SUFFIXES = ( + ".js", ".css", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", + ".woff", ".woff2", ".ttf", ".eot", ".webp", ".mp4", ".webm", + ".zip", ".map", ".json", ".xml", ".rss", ".txt", ".csv", +) + + +def _parse_cdx_rows(rows: list) -> list[tuple[str, str]]: + """Parst CDX-JSON zu (url, timestamp)-Paaren. + + CDX-JSON ist ein Array von Arrays; Zeile 0 ist der Header + ["original","timestamp","statuscode"]. Assets werden gedroppt, + Duplikate (per URL ohne Fragment) entfernt. + """ + if not isinstance(rows, list) or len(rows) < 2: + return [] + seen: set[str] = set() + out: list[tuple[str, str]] = [] + for row in rows[1:]: # Zeile 0 = Header + if not isinstance(row, (list, tuple)) or not row: + continue + url = str(row[0]).strip() + if not url: + continue + path = url.lower().split("?", 1)[0].split("#", 1)[0] + if path.endswith(_ASSET_SUFFIXES): + continue + key = url.split("#", 1)[0] + if key in seen: + continue + seen.add(key) + ts = str(row[1]).strip() if len(row) > 1 else "" + out.append((url, ts)) + return out + + +async def cdx_enumerate(origin: str, limit: int = 2000) -> list[tuple[str, str]]: + """Liefert (url, wayback_timestamp) fuer alle je archivierten HTML-URLs. + + `collapse=urlkey` → eine Zeile pro URL; `filter=statuscode:200` → nur + erfolgreich archivierte. Der timestamp wird spaeter als Legacy-Alter + wiederverwendet (spart einen zweiten Wayback-Call pro URL). + """ + netloc = urlparse(origin).netloc or origin.replace("https://", "").replace( + "http://", "", + ) + if not netloc: + return [] + params = { + "url": f"{netloc}*", + "output": "json", + "collapse": "urlkey", + "fl": "original,timestamp,statuscode", + "filter": "statuscode:200", + "limit": str(limit), + } + try: + async with httpx.AsyncClient(timeout=15.0) as c: + r = await c.get(_CDX_API, params=params) + if r.status_code != 200: + return [] + rows = r.json() or [] + except Exception as e: + logger.info("CDX enumerate failed for %s: %s", netloc, e) + return [] + return _parse_cdx_rows(rows) diff --git a/backend-compliance/compliance/services/legacy_url_discovery.py b/backend-compliance/compliance/services/legacy_url_discovery.py index 36ab47e1..ef181dbc 100644 --- a/backend-compliance/compliance/services/legacy_url_discovery.py +++ b/backend-compliance/compliance/services/legacy_url_discovery.py @@ -29,6 +29,8 @@ from urllib.parse import urljoin, urlparse import httpx +from compliance.services.legacy_url_cdx import cdx_enumerate + logger = logging.getLogger(__name__) @@ -239,13 +241,24 @@ async def discover_legacy_urls(state: dict) -> dict: return {"candidates": [], "skipped": "no_origin"} candidates: set[str] = set() - # A.1 Sitemap + # A.1 Sitemap + A.3 Slug-Permutations for o in list(origins)[:2]: sitemap_urls = await _fetch_sitemap_urls(o) candidates.update(_filter_legal_urls(sitemap_urls)) - # A.3 Slug-Permutations candidates.update(_build_slug_candidates(o)) + # A.5 Wayback-CDX: alle je archivierten URLs der Domain → faengt + # Orphans, die nie im Slug-Raster standen. (url, cdx_timestamp); der + # timestamp dient als Legacy-Alter (kein zweiter Wayback-Call noetig). + cdx_pairs: list[tuple[str, str]] = [] + for o in list(origins)[:2]: + cdx_pairs.extend(await cdx_enumerate(o)) + cdx_legal_urls = set(_filter_legal_urls([u for u, _ in cdx_pairs])) + cdx_legal = [ + (u, ts) for (u, ts) in cdx_pairs + if u in cdx_legal_urls and u not in candidates + ][:100] + # Cap to avoid explosion cands = list(candidates)[:60] @@ -264,12 +277,32 @@ async def discover_legacy_urls(state: dict) -> dict: "age_months": age, "in_footer": in_footer, "recommendation": _recommend(status, age, False, in_footer), + "via": "sitemap/slug", } - results = await asyncio.gather( - *[_check(u) for u in cands], return_exceptions=True, + # CDX-Kandidaten: nur Liveness pruefen (Archiv-Stand kennen wir schon). + async def _check_cdx(url: str, ts: str) -> dict: + status, lm = await _probe_alive(url) + age = _months_since(ts) + in_footer = url.split("#")[0].split("?")[0] in footer_urls + return { + "url": url, + "status": status, + "last_modified": lm, + "wayback_snapshot": "", + "wayback_timestamp": ts, + "age_months": age, + "in_footer": in_footer, + "recommendation": _recommend(status, age, False, in_footer), + "via": "wayback-cdx", + } + + gathered = await asyncio.gather( + *[_check(u) for u in cands], + *[_check_cdx(u, ts) for u, ts in cdx_legal], + return_exceptions=True, ) - results = [r for r in results if isinstance(r, dict)] + results = [r for r in gathered if isinstance(r, dict)] # Filter: only show interesting ones (≥200 reachable + legacy-relevant) interesting: list[dict] = [] @@ -297,5 +330,6 @@ async def discover_legacy_urls(state: dict) -> dict: "candidates": interesting, "probed": len(results), "filtered_kept": len(interesting), + "cdx_candidates": len(cdx_legal), "origins": list(origins), } diff --git a/backend-compliance/tests/test_legacy_url_cdx.py b/backend-compliance/tests/test_legacy_url_cdx.py new file mode 100644 index 00000000..26fdd712 --- /dev/null +++ b/backend-compliance/tests/test_legacy_url_cdx.py @@ -0,0 +1,110 @@ +"""Tests für die Wayback-CDX-Orphan-Enumeration (Feature C).""" + +from __future__ import annotations + +import asyncio + +from compliance.services.legacy_url_cdx import _parse_cdx_rows, cdx_enumerate + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +# ── Pure: _parse_cdx_rows ─────────────────────────────────────────── + + +def test_parse_cdx_rows_drops_assets_and_dedups(): + rows = [ + ["original", "timestamp", "statuscode"], # Header + ["http://x.com/datenschutz", "20190101", "200"], + ["http://x.com/datenschutz", "20200101", "200"], # Duplikat + ["http://x.com/style.css", "20200101", "200"], # Asset + ["http://x.com/app.js", "20200101", "200"], # Asset + ["http://x.com/impressum", "20180101", "200"], + ] + out = _parse_cdx_rows(rows) + urls = [u for u, _ in out] + assert urls == ["http://x.com/datenschutz", "http://x.com/impressum"] + # timestamp des ERSTEN (ältesten) Snapshots bleibt erhalten + assert out[0] == ("http://x.com/datenschutz", "20190101") + + +def test_parse_cdx_rows_empty_or_header_only(): + assert _parse_cdx_rows([]) == [] + assert _parse_cdx_rows([["original", "timestamp"]]) == [] + assert _parse_cdx_rows("garbage") == [] # type: ignore[arg-type] + + +# ── cdx_enumerate mit gemocktem httpx ─────────────────────────────── + + +class _FakeResp: + def __init__(self, status_code, json_data): + self.status_code = status_code + self._json = json_data + + def json(self): + return self._json + + +class _FakeClient: + def __init__(self, resp): + self._resp = resp + + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + return False + + async def get(self, *a, **kw): + return self._resp + + +def _patch_httpx(monkeypatch, resp): + monkeypatch.setattr( + "compliance.services.legacy_url_cdx.httpx.AsyncClient", + lambda *a, **kw: _FakeClient(resp), + ) + + +def test_cdx_enumerate_returns_parsed_pairs(monkeypatch): + rows = [ + ["original", "timestamp", "statuscode"], + ["http://x.com/datenschutz", "20190101120000", "200"], + ["http://x.com/logo.png", "20200101", "200"], + ] + _patch_httpx(monkeypatch, _FakeResp(200, rows)) + out = _run(cdx_enumerate("https://x.com")) + urls = [u for u, _ in out] + assert "http://x.com/datenschutz" in urls + assert "http://x.com/logo.png" not in urls # Asset gedroppt + + +def test_cdx_enumerate_non_200_returns_empty(monkeypatch): + _patch_httpx(monkeypatch, _FakeResp(503, [])) + assert _run(cdx_enumerate("https://x.com")) == [] + + +def test_cdx_enumerate_no_netloc_returns_empty(monkeypatch): + _patch_httpx(monkeypatch, _FakeResp(200, [])) + assert _run(cdx_enumerate("")) == [] + + +# ── Orphan-Pfad: CDX-Fund → Legal-Filter behält Rechts-Seite ──────── + + +def test_cdx_orphan_survives_legal_filter(): + """Der eigentliche Orphan-Fall: CDX findet /datenschutz (nicht mehr + verlinkt), der Legal-Filter behält sie, Produktseiten fallen raus.""" + from compliance.services.legacy_url_discovery import _filter_legal_urls + rows = [ + ["original", "timestamp", "statuscode"], + ["http://x.com/datenschutz", "20190101", "200"], + ["http://x.com/products/widget", "20200101", "200"], + ] + pairs = _parse_cdx_rows(rows) + legal = _filter_legal_urls([u for u, _ in pairs]) + assert "http://x.com/datenschutz" in legal + assert "http://x.com/products/widget" not in legal diff --git a/consent-tester/main.py b/consent-tester/main.py index bf1df978..b8e5eb93 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -324,6 +324,7 @@ class DSIDiscoveryResponse(BaseModel): # Raw CMP payloads captured during navigation (ePaaS, OneTrust, etc.). # Backend uses these to build the per-vendor compliance table. cmp_payloads: list[dict] = [] + coverage: dict = {} # Coverage-Telemetrie (Feature B), s. coverage_dict() @app.post("/dsi-discovery", response_model=DSIDiscoveryResponse) @@ -376,6 +377,7 @@ async def dsi_discovery(req: DSIDiscoveryRequest): errors=result.errors, scanned_at=datetime.now(timezone.utc).isoformat(), cmp_payloads=result.cmp_payloads, + coverage=result.coverage_dict(), ) diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index a0583a27..1ac595de 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -181,6 +181,23 @@ class DSIDiscoveryResult: # the authoritative cookie-text so MC checks run on the real policy, # not the homepage navigation that DOM extraction returns. cmp_cookie_text: str = "" + # Coverage-Telemetrie (Feature B): macht messbar, wie erschoepfend die + # Interaktion war — wir behaupten kein "100%", wir MESSEN es. + interaction_rounds: int = 0 + elements_expanded: int = 0 + dom_growth_bytes: int = 0 + shadow_links_found: int = 0 + hidden_links_found: int = 0 + + def coverage_dict(self) -> dict: + """Coverage-Telemetrie als Dict (Feature B) — fuers Response-Mapping.""" + return { + "interaction_rounds": self.interaction_rounds, + "elements_expanded": self.elements_expanded, + "dom_growth_bytes": self.dom_growth_bytes, + "shadow_links_found": self.shadow_links_found, + "hidden_links_found": self.hidden_links_found, + } async def _extract_dom_tables(page) -> list[list[str]]: """D — extrahiert alle