From 08c08fcba29ffdab9f8c05b669c3350881015ea6 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 9 Jun 2026 12:33:34 +0200 Subject: [PATCH] =?UTF-8?q?feat(crawl):=20Vollstaendigkeit=20=E2=80=94=20S?= =?UTF-8?q?hadow-DOM/versteckte=20Links=20+=20Interaktions-Fixpunkt=20+=20?= =?UTF-8?q?Wayback-CDX-Orphans?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Damit die Specialist-Agents auf vollstaendigem Website-Content arbeiten: A — _find_dsi_links pierct jetzt Shadow-DOM (Web-Components wie Usercentrics/ Mercedes) rekursiv; versteckte (display:none) Links werden erfasst + als Coverage-Metadatum geflaggt. B — _expand_to_fixpoint klappt Akkordeons/Tabs/Hover-Menues in einer Schleife auf, bis das DOM stabil ist (statt 1 Pass); erweiterte Selektoren; Coverage-Telemetrie (Runden, expandierte Elemente, DOM-Wachstum, Shadow-/ versteckte Links) → Response + Backend-Log. C — legacy_url_cdx.cdx_enumerate listet via Wayback-CDX-API ALLE je archivierten URLs der Domain → findet Orphan-/Legacy-Seiten, die nie im Slug-Raster standen (z.B. nicht mehr verlinktes /datenschutz, per Direkt- URL noch erreichbar). Fliesst durch das bestehende Legacy-URL-Inventar. Tests: test_legacy_url_cdx.py (6) + consent-tester/tests/test_dsi_discovery.py (Pure-Helper + Real-Browser-Integration). Alle gruen, LOC-Gate gruen. Co-Authored-By: Claude Opus 4.8 --- .../compliance/api/agent_check/_fetch.py | 11 ++ .../compliance/services/legacy_url_cdx.py | 89 +++++++++ .../services/legacy_url_discovery.py | 44 ++++- .../tests/test_legacy_url_cdx.py | 110 +++++++++++ consent-tester/main.py | 2 + consent-tester/services/dsi_discovery.py | 183 ++++++++++++++---- consent-tester/tests/test_dsi_discovery.py | 89 +++++++++ 7 files changed, 487 insertions(+), 41 deletions(-) create mode 100644 backend-compliance/compliance/services/legacy_url_cdx.py create mode 100644 backend-compliance/tests/test_legacy_url_cdx.py create mode 100644 consent-tester/tests/test_dsi_discovery.py diff --git a/backend-compliance/compliance/api/agent_check/_fetch.py b/backend-compliance/compliance/api/agent_check/_fetch.py index 52c96e7e..e08c2bc8 100644 --- a/backend-compliance/compliance/api/agent_check/_fetch.py +++ b/backend-compliance/compliance/api/agent_check/_fetch.py @@ -54,6 +54,17 @@ async def _fetch_text(url: str, doc_type: str = "") -> tuple[str, list[dict]]: docs = payload.get("documents", []) cmp_payloads = payload.get("cmp_payloads") or [] cmp_cookie_text = payload.get("cmp_cookie_text") or "" + coverage = payload.get("coverage") or {} + if coverage: + logger.info( + "Crawl-Coverage %s: %d Interaktions-Runden, " + "%d Elemente expandiert, %d Shadow-Links, " + "%d versteckte Links", + url, coverage.get("interaction_rounds", 0), + coverage.get("elements_expanded", 0), + coverage.get("shadow_links_found", 0), + coverage.get("hidden_links_found", 0), + ) # D — wenn der consent-tester HTML-Tabellen aus dem DOM # extrahiert hat, in die cmp_payloads als "generic_table" # einschleusen damit das Backend sie via cookies_table_parser diff --git a/backend-compliance/compliance/services/legacy_url_cdx.py b/backend-compliance/compliance/services/legacy_url_cdx.py new file mode 100644 index 00000000..a54849da --- /dev/null +++ b/backend-compliance/compliance/services/legacy_url_cdx.py @@ -0,0 +1,89 @@ +"""Wayback-CDX-Enumeration — listet ALLE je archivierten URLs einer Domain. + +Anders als die per-Slug-Wayback-Pruefung (legacy_url_discovery._wayback_check) +holen wir hier die KOMPLETTE History-Liste der Domain ueber die CDX-API. So +finden wir Orphan-/Legacy-Seiten, die nie im Slug-Raster standen und heute +nicht mehr verlinkt sind, aber per Direkt-URL noch erreichbar — genau der Fall +"www.xyz.com/datenschutz existierte mal, wurde nie entfernt". + +Best-effort: jede Exception → leere Liste, blockiert die uebrige Discovery nie. +""" + +from __future__ import annotations + +import logging +from urllib.parse import urlparse + +import httpx + +logger = logging.getLogger(__name__) + +_CDX_API = "http://web.archive.org/cdx/search/cdx" + +# Nicht-HTML-Assets, die uns fuer Rechts-Content nicht interessieren. +_ASSET_SUFFIXES = ( + ".js", ".css", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", + ".woff", ".woff2", ".ttf", ".eot", ".webp", ".mp4", ".webm", + ".zip", ".map", ".json", ".xml", ".rss", ".txt", ".csv", +) + + +def _parse_cdx_rows(rows: list) -> list[tuple[str, str]]: + """Parst CDX-JSON zu (url, timestamp)-Paaren. + + CDX-JSON ist ein Array von Arrays; Zeile 0 ist der Header + ["original","timestamp","statuscode"]. Assets werden gedroppt, + Duplikate (per URL ohne Fragment) entfernt. + """ + if not isinstance(rows, list) or len(rows) < 2: + return [] + seen: set[str] = set() + out: list[tuple[str, str]] = [] + for row in rows[1:]: # Zeile 0 = Header + if not isinstance(row, (list, tuple)) or not row: + continue + url = str(row[0]).strip() + if not url: + continue + path = url.lower().split("?", 1)[0].split("#", 1)[0] + if path.endswith(_ASSET_SUFFIXES): + continue + key = url.split("#", 1)[0] + if key in seen: + continue + seen.add(key) + ts = str(row[1]).strip() if len(row) > 1 else "" + out.append((url, ts)) + return out + + +async def cdx_enumerate(origin: str, limit: int = 2000) -> list[tuple[str, str]]: + """Liefert (url, wayback_timestamp) fuer alle je archivierten HTML-URLs. + + `collapse=urlkey` → eine Zeile pro URL; `filter=statuscode:200` → nur + erfolgreich archivierte. Der timestamp wird spaeter als Legacy-Alter + wiederverwendet (spart einen zweiten Wayback-Call pro URL). + """ + netloc = urlparse(origin).netloc or origin.replace("https://", "").replace( + "http://", "", + ) + if not netloc: + return [] + params = { + "url": f"{netloc}*", + "output": "json", + "collapse": "urlkey", + "fl": "original,timestamp,statuscode", + "filter": "statuscode:200", + "limit": str(limit), + } + try: + async with httpx.AsyncClient(timeout=15.0) as c: + r = await c.get(_CDX_API, params=params) + if r.status_code != 200: + return [] + rows = r.json() or [] + except Exception as e: + logger.info("CDX enumerate failed for %s: %s", netloc, e) + return [] + return _parse_cdx_rows(rows) diff --git a/backend-compliance/compliance/services/legacy_url_discovery.py b/backend-compliance/compliance/services/legacy_url_discovery.py index 36ab47e1..ef181dbc 100644 --- a/backend-compliance/compliance/services/legacy_url_discovery.py +++ b/backend-compliance/compliance/services/legacy_url_discovery.py @@ -29,6 +29,8 @@ from urllib.parse import urljoin, urlparse import httpx +from compliance.services.legacy_url_cdx import cdx_enumerate + logger = logging.getLogger(__name__) @@ -239,13 +241,24 @@ async def discover_legacy_urls(state: dict) -> dict: return {"candidates": [], "skipped": "no_origin"} candidates: set[str] = set() - # A.1 Sitemap + # A.1 Sitemap + A.3 Slug-Permutations for o in list(origins)[:2]: sitemap_urls = await _fetch_sitemap_urls(o) candidates.update(_filter_legal_urls(sitemap_urls)) - # A.3 Slug-Permutations candidates.update(_build_slug_candidates(o)) + # A.5 Wayback-CDX: alle je archivierten URLs der Domain → faengt + # Orphans, die nie im Slug-Raster standen. (url, cdx_timestamp); der + # timestamp dient als Legacy-Alter (kein zweiter Wayback-Call noetig). + cdx_pairs: list[tuple[str, str]] = [] + for o in list(origins)[:2]: + cdx_pairs.extend(await cdx_enumerate(o)) + cdx_legal_urls = set(_filter_legal_urls([u for u, _ in cdx_pairs])) + cdx_legal = [ + (u, ts) for (u, ts) in cdx_pairs + if u in cdx_legal_urls and u not in candidates + ][:100] + # Cap to avoid explosion cands = list(candidates)[:60] @@ -264,12 +277,32 @@ async def discover_legacy_urls(state: dict) -> dict: "age_months": age, "in_footer": in_footer, "recommendation": _recommend(status, age, False, in_footer), + "via": "sitemap/slug", } - results = await asyncio.gather( - *[_check(u) for u in cands], return_exceptions=True, + # CDX-Kandidaten: nur Liveness pruefen (Archiv-Stand kennen wir schon). + async def _check_cdx(url: str, ts: str) -> dict: + status, lm = await _probe_alive(url) + age = _months_since(ts) + in_footer = url.split("#")[0].split("?")[0] in footer_urls + return { + "url": url, + "status": status, + "last_modified": lm, + "wayback_snapshot": "", + "wayback_timestamp": ts, + "age_months": age, + "in_footer": in_footer, + "recommendation": _recommend(status, age, False, in_footer), + "via": "wayback-cdx", + } + + gathered = await asyncio.gather( + *[_check(u) for u in cands], + *[_check_cdx(u, ts) for u, ts in cdx_legal], + return_exceptions=True, ) - results = [r for r in results if isinstance(r, dict)] + results = [r for r in gathered if isinstance(r, dict)] # Filter: only show interesting ones (≥200 reachable + legacy-relevant) interesting: list[dict] = [] @@ -297,5 +330,6 @@ async def discover_legacy_urls(state: dict) -> dict: "candidates": interesting, "probed": len(results), "filtered_kept": len(interesting), + "cdx_candidates": len(cdx_legal), "origins": list(origins), } diff --git a/backend-compliance/tests/test_legacy_url_cdx.py b/backend-compliance/tests/test_legacy_url_cdx.py new file mode 100644 index 00000000..26fdd712 --- /dev/null +++ b/backend-compliance/tests/test_legacy_url_cdx.py @@ -0,0 +1,110 @@ +"""Tests für die Wayback-CDX-Orphan-Enumeration (Feature C).""" + +from __future__ import annotations + +import asyncio + +from compliance.services.legacy_url_cdx import _parse_cdx_rows, cdx_enumerate + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +# ── Pure: _parse_cdx_rows ─────────────────────────────────────────── + + +def test_parse_cdx_rows_drops_assets_and_dedups(): + rows = [ + ["original", "timestamp", "statuscode"], # Header + ["http://x.com/datenschutz", "20190101", "200"], + ["http://x.com/datenschutz", "20200101", "200"], # Duplikat + ["http://x.com/style.css", "20200101", "200"], # Asset + ["http://x.com/app.js", "20200101", "200"], # Asset + ["http://x.com/impressum", "20180101", "200"], + ] + out = _parse_cdx_rows(rows) + urls = [u for u, _ in out] + assert urls == ["http://x.com/datenschutz", "http://x.com/impressum"] + # timestamp des ERSTEN (ältesten) Snapshots bleibt erhalten + assert out[0] == ("http://x.com/datenschutz", "20190101") + + +def test_parse_cdx_rows_empty_or_header_only(): + assert _parse_cdx_rows([]) == [] + assert _parse_cdx_rows([["original", "timestamp"]]) == [] + assert _parse_cdx_rows("garbage") == [] # type: ignore[arg-type] + + +# ── cdx_enumerate mit gemocktem httpx ─────────────────────────────── + + +class _FakeResp: + def __init__(self, status_code, json_data): + self.status_code = status_code + self._json = json_data + + def json(self): + return self._json + + +class _FakeClient: + def __init__(self, resp): + self._resp = resp + + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + return False + + async def get(self, *a, **kw): + return self._resp + + +def _patch_httpx(monkeypatch, resp): + monkeypatch.setattr( + "compliance.services.legacy_url_cdx.httpx.AsyncClient", + lambda *a, **kw: _FakeClient(resp), + ) + + +def test_cdx_enumerate_returns_parsed_pairs(monkeypatch): + rows = [ + ["original", "timestamp", "statuscode"], + ["http://x.com/datenschutz", "20190101120000", "200"], + ["http://x.com/logo.png", "20200101", "200"], + ] + _patch_httpx(monkeypatch, _FakeResp(200, rows)) + out = _run(cdx_enumerate("https://x.com")) + urls = [u for u, _ in out] + assert "http://x.com/datenschutz" in urls + assert "http://x.com/logo.png" not in urls # Asset gedroppt + + +def test_cdx_enumerate_non_200_returns_empty(monkeypatch): + _patch_httpx(monkeypatch, _FakeResp(503, [])) + assert _run(cdx_enumerate("https://x.com")) == [] + + +def test_cdx_enumerate_no_netloc_returns_empty(monkeypatch): + _patch_httpx(monkeypatch, _FakeResp(200, [])) + assert _run(cdx_enumerate("")) == [] + + +# ── Orphan-Pfad: CDX-Fund → Legal-Filter behält Rechts-Seite ──────── + + +def test_cdx_orphan_survives_legal_filter(): + """Der eigentliche Orphan-Fall: CDX findet /datenschutz (nicht mehr + verlinkt), der Legal-Filter behält sie, Produktseiten fallen raus.""" + from compliance.services.legacy_url_discovery import _filter_legal_urls + rows = [ + ["original", "timestamp", "statuscode"], + ["http://x.com/datenschutz", "20190101", "200"], + ["http://x.com/products/widget", "20200101", "200"], + ] + pairs = _parse_cdx_rows(rows) + legal = _filter_legal_urls([u for u, _ in pairs]) + assert "http://x.com/datenschutz" in legal + assert "http://x.com/products/widget" not in legal diff --git a/consent-tester/main.py b/consent-tester/main.py index bf1df978..b8e5eb93 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -324,6 +324,7 @@ class DSIDiscoveryResponse(BaseModel): # Raw CMP payloads captured during navigation (ePaaS, OneTrust, etc.). # Backend uses these to build the per-vendor compliance table. cmp_payloads: list[dict] = [] + coverage: dict = {} # Coverage-Telemetrie (Feature B), s. coverage_dict() @app.post("/dsi-discovery", response_model=DSIDiscoveryResponse) @@ -376,6 +377,7 @@ async def dsi_discovery(req: DSIDiscoveryRequest): errors=result.errors, scanned_at=datetime.now(timezone.utc).isoformat(), cmp_payloads=result.cmp_payloads, + coverage=result.coverage_dict(), ) diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index a0583a27..1ac595de 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -181,6 +181,23 @@ class DSIDiscoveryResult: # the authoritative cookie-text so MC checks run on the real policy, # not the homepage navigation that DOM extraction returns. cmp_cookie_text: str = "" + # Coverage-Telemetrie (Feature B): macht messbar, wie erschoepfend die + # Interaktion war — wir behaupten kein "100%", wir MESSEN es. + interaction_rounds: int = 0 + elements_expanded: int = 0 + dom_growth_bytes: int = 0 + shadow_links_found: int = 0 + hidden_links_found: int = 0 + + def coverage_dict(self) -> dict: + """Coverage-Telemetrie als Dict (Feature B) — fuers Response-Mapping.""" + return { + "interaction_rounds": self.interaction_rounds, + "elements_expanded": self.elements_expanded, + "dom_growth_bytes": self.dom_growth_bytes, + "shadow_links_found": self.shadow_links_found, + "hidden_links_found": self.hidden_links_found, + } async def _extract_dom_tables(page) -> list[list[str]]: """D — extrahiert alle -Elemente aus dem aktuellen DOM als @@ -444,15 +461,24 @@ async def discover_dsi_documents( links = await _find_dsi_links(page, base_domain) logger.info("Found %d DSI links on %s", len(links), url) - # Step 3: Expand accordions, tabs, dropdowns to find hidden content - await _expand_all_interactive(page) - await page.wait_for_timeout(1000) + # Step 3: Interaktions-Fixpunkt — aufklappen bis das DOM stabil ist + # (faengt verschachtelte/lazy Akkordeons, die ein einzelner Pass + # verpasst). Telemetrie als messbares Coverage-Signal. + _tel = await _expand_to_fixpoint(page) + result.interaction_rounds = _tel["rounds"] + result.elements_expanded = _tel["elements_expanded"] + result.dom_growth_bytes = _tel["dom_growth"] + await page.wait_for_timeout(500) # Step 3b: Re-scan after expanding (may reveal new links) links_after = await _find_dsi_links(page, base_domain) for link in links_after: if link["href"] not in [l["href"] for l in links]: links.append(link) + result.shadow_links_found = sum( + 1 for l in links_after if l.get("in_shadow")) + result.hidden_links_found = sum( + 1 for l in links_after if not l.get("visible")) # Step 4: Check for inline DSI sections (accordion content already visible) inline_sections = await _find_inline_dsi_sections(page) @@ -524,7 +550,7 @@ async def discover_dsi_documents( continue await try_dismiss_consent_banner(page) - await _expand_all_interactive(page) + await _expand_to_fixpoint(page) await page.wait_for_timeout(500) # Extract text — try specific content areas, fall back to full body @@ -595,7 +621,7 @@ async def discover_dsi_documents( # Navigate back for next link await goto_resilient(page, url, timeout=45000) await page.wait_for_timeout(500) - await _expand_all_interactive(page) + await _expand_to_fixpoint(page) except Exception as e: result.errors.append(f"Failed to load {href}: {str(e)[:80]}") @@ -674,25 +700,48 @@ def _deduplicate_documents(docs: list[DiscoveredDSI]) -> list[DiscoveredDSI]: return unique async def _find_dsi_links(page: Page, base_domain: str) -> list[dict]: - """Find all links whose text or href matches DSI keywords.""" + """Find all links whose text or href matches DSI keywords. + + Pierct Shadow-DOM (Web-Components wie Usercentrics/Mercedes) rekursiv — + sonst werden Rechts-Links in Shadow-Trees uebersehen. Versteckte Links + (display:none) kommen ueber querySelectorAll ohnehin mit; das + visible-Flag bleibt als Coverage-Metadatum erhalten. + """ try: all_links = await page.evaluate(""" - () => [...document.querySelectorAll('a[href]')].map(a => ({ - href: a.href, - text: (a.textContent || '').trim().substring(0, 200), - ariaLabel: a.getAttribute('aria-label') || '', - title: a.getAttribute('title') || '', - visible: a.getBoundingClientRect().width > 0, - })) + () => { + const out = []; + const collect = (root) => { + if (!root || !root.querySelectorAll) return; + root.querySelectorAll('a[href]').forEach(a => out.push({ + href: a.href, + text: (a.textContent || '').trim().substring(0, 200), + ariaLabel: a.getAttribute('aria-label') || '', + title: a.getAttribute('title') || '', + visible: a.getBoundingClientRect().width > 0, + inShadow: root !== document, + })); + root.querySelectorAll('*').forEach(el => { + if (el.shadowRoot) collect(el.shadowRoot); + }); + }; + collect(document); + return out; + } """) dsi_links = [] for link in (all_links or []): - search_text = f"{link['text']} {link['ariaLabel']} {link['title']}".lower() + search_text = ( + f"{link['text']} {link['ariaLabel']} {link['title']}".lower() + ) href = link["href"] href_lower = href.lower() # Match by link text or href - is_match = any(kw in search_text or kw in href_lower for kw in ALL_DSI_KEYWORDS) + is_match = any( + kw in search_text or kw in href_lower + for kw in ALL_DSI_KEYWORDS + ) if not is_match: continue @@ -702,6 +751,7 @@ async def _find_dsi_links(page: Page, base_domain: str) -> list[dict]: "href": href, "text": link["text"], "visible": link["visible"], + "in_shadow": link.get("inShadow", False), }) return dsi_links @@ -709,47 +759,108 @@ async def _find_dsi_links(page: Page, base_domain: str) -> list[dict]: logger.warning("DSI link scan failed: %s", e) return [] -async def _expand_all_interactive(page: Page) -> None: +async def _expand_all_interactive(page: Page) -> int: """Expand all accordions, tabs, details, dropdowns on the page. IMPORTANT: Only expand CLOSED elements. Never click elements that are already expanded (aria-expanded="true") — that would close them. BMW, for example, has accordions open by default. + + Returns the number of elements acted on (drives the fixpoint loop + + coverage telemetry). """ try: - await page.evaluate("""() => { - // 1. Open all
that are closed - document.querySelectorAll('details:not([open])').forEach(d => d.open = true); + return await page.evaluate("""() => { + let n = 0; + const click = (el) => { try { el.click(); n++; } catch {} }; - // 2. Click buttons that are explicitly CLOSED (aria-expanded="false") - document.querySelectorAll('button[aria-expanded="false"]').forEach(b => { - try { b.click(); } catch {} + // 1. Open all
that are closed + document.querySelectorAll('details:not([open])').forEach(d => { + d.open = true; n++; }); + // 2. Anything explicitly CLOSED (aria-expanded="false") — not + // only