From 298c95731af0f58e35c2ae9da434e205b4452671 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 4 May 2026 21:56:29 +0200 Subject: [PATCH] feat: Generic legal document discovery (DSI, AGB, Widerruf, Cookie-Richtlinie) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New service: dsi_discovery.py — finds ALL legal documents on any website: - Technology-agnostic: HTML, SPA, WordPress, Typo3, custom CMS - Structure-agnostic: accordions, sidebars, footers, inline links, tabs - Format-agnostic: HTML pages, anchor sections, PDFs, cross-domain links - Language-agnostic: 26 EU/EEA languages with document-type keywords Document types discovered: - Datenschutzinformationen / Privacy Policies (Art. 13/14 DSGVO) - AGB / Terms of Service / Nutzungsbedingungen - Widerrufsbelehrung / Right of Withdrawal (§355 BGB) - Cookie-Richtlinie / Cookie Policy - All cross-domain variants (e.g. help.instagram.com from instagram.com) API: POST /dsi-discovery { url, max_documents } Returns: list of documents with title, url, language, type, word_count, text_preview Features: - Expands all accordions, details, tabs, dropdowns before scanning - Follows cross-domain links (same registrable domain) - Re-expands after navigation back to source page - Handles anchor links (#sections) separately from full pages Co-Authored-By: Claude Opus 4.6 (1M context) --- consent-tester/main.py | 321 ++++++++++++++++ consent-tester/services/dsi_discovery.py | 469 +++++++++++++++++++++++ 2 files changed, 790 insertions(+) create mode 100644 consent-tester/main.py create mode 100644 consent-tester/services/dsi_discovery.py diff --git a/consent-tester/main.py b/consent-tester/main.py new file mode 100644 index 0000000..6a79ac5 --- /dev/null +++ b/consent-tester/main.py @@ -0,0 +1,321 @@ +""" +Consent Tester Service — Playwright-based 3-phase cookie consent test. + +Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance. +Runs as independent microservice on port 8094. +""" + +import logging +from datetime import datetime, timezone + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from services.consent_scanner import run_consent_test, ConsentTestResult +from services.authenticated_scanner import run_authenticated_test, AuthTestResult +from services.playwright_scanner import scan_website_playwright +from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult + +logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") +logger = logging.getLogger(__name__) + +app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +class ScanRequest(BaseModel): + url: str + timeout_per_phase: int = 10 # seconds to wait after page load + + +class ScanResponse(BaseModel): + url: str + banner_detected: bool + banner_provider: str + phases: dict + summary: dict + scanned_at: str + category_tests: list = [] + banner_checks: dict = {} + + +@app.get("/health") +async def health(): + return {"status": "healthy", "service": "consent-tester"} + + +@app.post("/scan", response_model=ScanResponse) +async def scan_consent(req: ScanRequest): + """Run 3-phase consent test on a URL.""" + logger.info("Starting consent test for %s", req.url) + result = await run_consent_test(req.url, req.timeout_per_phase) + + return ScanResponse( + url=req.url, + banner_detected=result.banner_detected, + banner_provider=result.banner_provider, + phases={ + "before_consent": { + "scripts": result.before_scripts, + "cookies": result.before_cookies, + "tracking_services": result.before_tracking, + "violations": [v.__dict__ for v in result.before_violations], + }, + "after_reject": { + "scripts": result.reject_scripts, + "cookies": result.reject_cookies, + "new_tracking": result.reject_new_tracking, + "violations": [v.__dict__ for v in result.reject_violations], + }, + "after_accept": { + "scripts": result.accept_scripts, + "cookies": result.accept_cookies, + "new_tracking": result.accept_new_tracking, + "undocumented": result.accept_undocumented, + }, + }, + summary={ + "critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"), + "high": len(result.before_violations) + sum(1 for v in result.banner_text_violations if v.severity == "HIGH"), + "undocumented": len(result.accept_undocumented), + "total_violations": len(result.before_violations) + len(result.reject_violations) + len(result.banner_text_violations), + "category_violations": sum(len(ct.violations) for ct in result.category_tests), + "categories_tested": len(result.category_tests), + "banner_text_issues": len(result.banner_text_violations), + }, + banner_checks={ + "has_impressum_link": result.banner_has_impressum_link, + "has_dse_link": result.banner_has_dse_link, + "violations": [v.__dict__ for v in result.banner_text_violations], + }, + scanned_at=datetime.now(timezone.utc).isoformat(), + category_tests=[{ + "category": ct.category, + "category_label": ct.category_label, + "tracking_services": ct.tracking_services, + "violations": ct.violations, + } for ct in result.category_tests] if result.category_tests else [], + ) + + +class AuthScanRequest(BaseModel): + url: str + username: str + password: str + username_selector: str = "" + password_selector: str = "" + submit_selector: str = "" + + +class AuthCheckInfo(BaseModel): + found: bool = False + text: str = "" + legal_ref: str = "" + + +class AuthScanResponse(BaseModel): + url: str + authenticated: bool + login_error: str = "" + checks: dict[str, AuthCheckInfo] + findings_count: int + scanned_at: str + + +LEGAL_REFS = { + "cancel_subscription": "§312k BGB (Kuendigungsbutton)", + "delete_account": "Art. 17 DSGVO (Recht auf Loeschung)", + "export_data": "Art. 20 DSGVO (Datenportabilitaet)", + "consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)", + "profile_visible": "Art. 15 DSGVO (Auskunftsrecht)", +} + + +@app.post("/authenticated-scan", response_model=AuthScanResponse) +async def authenticated_scan(req: AuthScanRequest): + """Test post-login functionality. Credentials are destroyed after test.""" + logger.info("Starting authenticated test for %s", req.url) + + result = await run_authenticated_test( + url=req.url, + username=req.username, + password=req.password, + username_selector=req.username_selector, + password_selector=req.password_selector, + submit_selector=req.submit_selector, + ) + + checks = { + "cancel_subscription": AuthCheckInfo( + found=result.cancel_subscription.found, + text=result.cancel_subscription.text, + legal_ref=LEGAL_REFS["cancel_subscription"], + ), + "delete_account": AuthCheckInfo( + found=result.delete_account.found, + text=result.delete_account.text, + legal_ref=LEGAL_REFS["delete_account"], + ), + "export_data": AuthCheckInfo( + found=result.export_data.found, + text=result.export_data.text, + legal_ref=LEGAL_REFS["export_data"], + ), + "consent_settings": AuthCheckInfo( + found=result.consent_settings.found, + text=result.consent_settings.text, + legal_ref=LEGAL_REFS["consent_settings"], + ), + "profile_visible": AuthCheckInfo( + found=result.profile_visible.found, + text=result.profile_visible.text, + legal_ref=LEGAL_REFS["profile_visible"], + ), + } + + missing = sum(1 for c in checks.values() if not c.found) + + return AuthScanResponse( + url=req.url, + authenticated=result.authenticated, + login_error=result.login_error, + checks=checks, + findings_count=missing, + scanned_at=datetime.now(timezone.utc).isoformat(), + ) + + +# ═══════════════════════════════════════════════════════════════ +# PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner) +# ═══════════════════════════════════════════════════════════════ + +class WebsiteScanRequest(BaseModel): + url: str + max_pages: int = 15 + click_nav: bool = True + + +class PageInfo(BaseModel): + url: str + status: int + title: str = "" + error: str = "" + + +class WebsiteScanResponse(BaseModel): + url: str + pages: list[PageInfo] + pages_count: int + external_scripts: list[str] + cookies: list[str] + page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis) + scanned_at: str + + +@app.post("/website-scan", response_model=WebsiteScanResponse) +async def website_scan(req: WebsiteScanRequest): + """Scan website using Playwright — discovers pages via JS navigation + menu clicks.""" + logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages) + + result = await scan_website_playwright(req.url, req.max_pages, req.click_nav) + + # Build page HTML map (only successful pages, truncated) + page_htmls = {} + for p in result.pages: + if p.html and p.status < 400: + page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page + + return WebsiteScanResponse( + url=req.url, + pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages], + pages_count=len(result.pages), + external_scripts=result.external_scripts[:50], + cookies=result.all_cookies, + page_htmls=page_htmls, + scanned_at=datetime.now(timezone.utc).isoformat(), + ) + + +# ═══════════════════════════════════════════════════════════════ +# DSI DISCOVERY (finds all privacy + legal documents on a website) +# ═══════════════════════════════════════════════════════════════ + +class DSIDiscoveryRequest(BaseModel): + url: str + max_documents: int = 30 + + +class DSIDocumentInfo(BaseModel): + title: str + url: str + source_url: str + language: str = "" + doc_type: str = "" + word_count: int = 0 + text_preview: str = "" + + +class DSIDiscoveryResponse(BaseModel): + url: str + documents: list[DSIDocumentInfo] + total_found: int + languages_detected: list[str] + errors: list[str] + scanned_at: str + + +@app.post("/dsi-discovery", response_model=DSIDiscoveryResponse) +async def dsi_discovery(req: DSIDiscoveryRequest): + """Discover all privacy/data protection documents on a website. + + Generically finds DSI, AGB, Nutzungsbedingungen, Widerrufsbelehrung, + Cookie-Richtlinien etc. regardless of website technology or language. + Supports HTML pages, accordions, sidebars, PDFs, cross-domain links. + """ + logger.info("Starting DSI discovery for %s (max %d docs)", req.url, req.max_documents) + + from playwright.async_api import async_playwright + + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) + context = await browser.new_context( + user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + ) + page = await context.new_page() + + try: + result = await discover_dsi_documents(page, req.url, req.max_documents) + finally: + await context.close() + await browser.close() + + return DSIDiscoveryResponse( + url=req.url, + documents=[ + DSIDocumentInfo( + title=d.title, + url=d.url, + source_url=d.source_url, + language=d.language, + doc_type=d.doc_type, + word_count=d.word_count, + text_preview=d.text[:500] if d.text else "", + ) + for d in result.documents + ], + total_found=result.total_found, + languages_detected=result.languages_detected, + errors=result.errors, + scanned_at=datetime.now(timezone.utc).isoformat(), + ) diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py new file mode 100644 index 0000000..8f686db --- /dev/null +++ b/consent-tester/services/dsi_discovery.py @@ -0,0 +1,469 @@ +""" +DSI Discovery — Generic privacy document finder and parser. + +Finds all privacy/data protection documents on any website regardless of: +- Technology (static HTML, SPA, WordPress, Typo3, etc.) +- Structure (accordion, sidebar, footer, inline links, separate pages) +- Format (HTML sections, PDF downloads, cross-domain links) +- Language (all 26 EU/EEA official languages) + +Flow: +1. Load page with Playwright (full JS rendering) +2. Find all links matching DSI keywords (26 languages) +3. Expand accordions, click tabs, open dropdowns +4. Follow cross-domain links (e.g. instagram.com → help.instagram.com) +5. Extract document text from each link target +6. Return structured list of discovered documents +""" + +import logging +import re +from dataclasses import dataclass, field +from urllib.parse import urlparse, urljoin + +from playwright.async_api import Page + +logger = logging.getLogger(__name__) + +# Legal document keywords in all EU/EEA official languages. +# Covers: DSI (privacy), AGB (terms), Widerruf (cancellation), +# Cookie-Richtlinie, Impressum, NB (Nutzungsbedingungen). +DSI_KEYWORDS: dict[str, list[str]] = { + "de": [ + # Datenschutz + "datenschutz", "datenschutzerklaerung", "datenschutzinformation", + "datenschutzhinweis", "datenschutzrichtlinie", "dsgvo", "privatsphäre", + "datenschutzbestimmung", "verarbeitung personenbezogener daten", + # AGB / Nutzungsbedingungen + "allgemeine geschäftsbedingungen", "agb", "nutzungsbedingungen", + "nutzungsordnung", "geschäftsbedingungen", + # Widerruf + "widerrufsbelehrung", "widerrufsrecht", "widerrufsformular", + "widerruf", "rücktrittsrecht", + # Cookie + "cookie-richtlinie", "cookie-policy", "cookie-hinweis", + ], + "en": [ + "privacy policy", "privacy notice", "data protection", "data policy", + "privacy statement", "gdpr", "personal data", "cookie policy", + "terms of service", "terms and conditions", "terms of use", + "cancellation policy", "right of withdrawal", "refund policy", + "cookie notice", + ], + "fr": [ + "politique de confidentialité", "protection des données", + "données personnelles", "vie privée", "rgpd", + "conditions générales", "conditions d'utilisation", + "droit de rétractation", "politique de cookies", + ], + "es": [ + "política de privacidad", "protección de datos", + "datos personales", "aviso de privacidad", + "términos y condiciones", "condiciones de uso", + "derecho de desistimiento", "política de cookies", + ], + "it": [ + "informativa sulla privacy", "protezione dei dati", + "dati personali", "privacy policy", + "termini e condizioni", "condizioni d'uso", + "diritto di recesso", "politica dei cookie", + ], + "nl": [ + "privacybeleid", "gegevensbescherming", "privacyverklaring", + "persoonsgegevens", "avg", + "algemene voorwaarden", "gebruiksvoorwaarden", + "herroepingsrecht", "cookiebeleid", + ], + "pl": [ + "polityka prywatności", "ochrona danych osobowych", + "dane osobowe", "rodo", + "regulamin", "warunki korzystania", + "prawo odstąpienia", "polityka cookies", + ], + "pt": [ + "política de privacidade", "proteção de dados", + "dados pessoais", "lgpd", + "termos e condições", "condições de utilização", + "direito de resolução", "política de cookies", + ], + "sv": [ + "integritetspolicy", "dataskydd", "personuppgifter", + "sekretesspolicy", + "allmänna villkor", "användarvillkor", + "ångerrätt", "cookiepolicy", + ], + "da": [ + "privatlivspolitik", "databeskyttelse", "personoplysninger", + "persondatapolitik", + "handelsbetingelser", "brugsbetingelser", + "fortrydelsesret", "cookiepolitik", + ], + "fi": [ + "tietosuojaseloste", "tietosuoja", "henkilötiedot", + "rekisteriseloste", + "yleiset ehdot", "käyttöehdot", + "peruutusoikeus", "evästekäytäntö", + ], + "cs": ["zásady ochrany osobních údajů", "ochrana osobních údajů", + "zpracování osobních údajů", "obchodní podmínky", "zásady cookies"], + "el": ["πολιτική απορρήτου", "προστασία δεδομένων", + "προσωπικά δεδομένα", "όροι χρήσης", "πολιτική cookies"], + "hu": ["adatvédelmi szabályzat", "adatvédelem", "személyes adatok", + "általános szerződési feltételek", "cookie szabályzat"], + "ro": ["politica de confidențialitate", "protecția datelor", + "date cu caracter personal", "termeni și condiții", "politica cookies"], + "bg": ["политика за поверителност", "защита на данните", + "лични данни", "общи условия", "политика за бисквитки"], + "hr": ["politika privatnosti", "zaštita podataka", "osobni podaci", + "opći uvjeti", "politika kolačića"], + "sk": ["zásady ochrany osobných údajov", "ochrana osobných údajov", + "obchodné podmienky", "zásady cookies"], + "sl": ["politika zasebnosti", "varstvo podatkov", "osebni podatki", + "splošni pogoji", "politika piškotkov"], + "et": ["privaatsuspoliitika", "andmekaitse", "isikuandmed", + "kasutustingimused", "küpsiste poliitika"], + "lt": ["privatumo politika", "duomenų apsauga", "asmens duomenys", + "naudojimosi sąlygos", "slapukų politika"], + "lv": ["privātuma politika", "datu aizsardzība", "personas dati", + "lietošanas noteikumi", "sīkdatņu politika"], + "mt": ["politika tal-privatezza", "protezzjoni tad-data", + "termini u kundizzjonijiet"], + "ga": ["polasaí príobháideachais", "cosaint sonraí", + "téarmaí agus coinníollacha"], + "is": ["persónuverndarstefna", "persónuvernd", + "skilmálar og skilyrði"], + "no": ["personvernerklæring", "personvern", "personopplysninger", + "brukervilkår", "angrerett", "informasjonskapsler"], +} + +# Flatten all keywords for quick matching +ALL_DSI_KEYWORDS: list[str] = [] +for kw_list in DSI_KEYWORDS.values(): + ALL_DSI_KEYWORDS.extend(kw_list) + + +@dataclass +class DiscoveredDSI: + """A discovered privacy/data protection document.""" + title: str + url: str + source_url: str # Page where the link was found + language: str = "" + doc_type: str = "" # "html_section", "html_page", "pdf", "accordion", "cross_domain" + text: str = "" # Extracted full text + sections: list[dict] = field(default_factory=list) # Parsed sections + word_count: int = 0 + + +@dataclass +class DSIDiscoveryResult: + """Result of DSI discovery scan.""" + base_url: str + documents: list[DiscoveredDSI] = field(default_factory=list) + total_found: int = 0 + languages_detected: list[str] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + + +def _matches_dsi_keyword(text: str) -> tuple[bool, str]: + """Check if text contains any DSI keyword. Returns (match, language).""" + text_lower = text.lower().strip() + for lang, keywords in DSI_KEYWORDS.items(): + for kw in keywords: + if kw in text_lower: + return True, lang + return False, "" + + +def _is_allowed_domain(href: str, base_domain: str) -> bool: + """Allow same domain + known related domains (e.g. help.instagram.com).""" + try: + link_domain = urlparse(href).netloc.replace("www.", "") + base_clean = base_domain.replace("www.", "") + # Same domain + if link_domain == base_clean: + return True + # Subdomain (help.instagram.com for instagram.com) + if link_domain.endswith(f".{base_clean}"): + return True + # Parent domain (instagram.com links from about.instagram.com) + if base_clean.endswith(f".{link_domain}"): + return True + # Known related patterns + parts_base = base_clean.split(".") + parts_link = link_domain.split(".") + if len(parts_base) >= 2 and len(parts_link) >= 2: + if parts_base[-2] == parts_link[-2] and parts_base[-1] == parts_link[-1]: + return True # Same registrable domain + except Exception: + pass + return False + + +async def discover_dsi_documents( + page: Page, + url: str, + max_documents: int = 30, +) -> DSIDiscoveryResult: + """Discover all privacy/data protection documents on a website. + + Works generically regardless of website technology, structure, or language. + """ + result = DSIDiscoveryResult(base_url=url) + base_domain = urlparse(url).netloc + seen_urls: set[str] = set() + seen_titles: set[str] = set() + + try: + # Step 1: Load the page + await page.goto(url, wait_until="networkidle", timeout=30000) + await page.wait_for_timeout(2000) + + # Step 2: Find DSI links in current page + links = await _find_dsi_links(page, base_domain) + logger.info("Found %d DSI links on %s", len(links), url) + + # Step 3: Expand accordions, tabs, dropdowns to find hidden content + await _expand_all_interactive(page) + await page.wait_for_timeout(1000) + + # Step 3b: Re-scan after expanding (may reveal new links) + links_after = await _find_dsi_links(page, base_domain) + for link in links_after: + if link["href"] not in [l["href"] for l in links]: + links.append(link) + + # Step 4: Check for inline DSI sections (accordion content already visible) + inline_sections = await _find_inline_dsi_sections(page) + for section in inline_sections: + title_norm = section["title"].strip().lower() + if title_norm not in seen_titles: + seen_titles.add(title_norm) + is_dsi, lang = _matches_dsi_keyword(section["title"]) + doc = DiscoveredDSI( + title=section["title"], + url=f"{url}#{section.get('id', '')}", + source_url=url, + language=lang, + doc_type="html_section", + text=section["text"], + word_count=len(section["text"].split()), + ) + result.documents.append(doc) + + # Step 5: Follow each DSI link and extract content + for link_info in links[:max_documents]: + href = link_info["href"] + if href in seen_urls: + continue + seen_urls.add(href) + + title = link_info["text"] + title_norm = title.strip().lower() + if title_norm in seen_titles: + continue + seen_titles.add(title_norm) + + is_dsi, lang = _matches_dsi_keyword(title) + is_pdf = href.lower().endswith(".pdf") + + if is_pdf: + result.documents.append(DiscoveredDSI( + title=title, url=href, source_url=url, + language=lang, doc_type="pdf", + text="[PDF — Textextraktion erforderlich]", + )) + continue + + # Navigate to the link and extract text + try: + is_anchor = "#" in href and href.split("#")[0] == url.split("#")[0] + if is_anchor: + anchor = href.split("#")[1] + text = await page.evaluate(f""" + () => {{ + const el = document.getElementById('{anchor}'); + if (!el) return ''; + return el.closest('section,article,div')?.textContent?.trim() || el.textContent?.trim() || ''; + }} + """) + if text and len(text) > 50: + result.documents.append(DiscoveredDSI( + title=title, url=href, source_url=url, + language=lang, doc_type="anchor_section", + text=text[:50000], word_count=len(text.split()), + )) + continue + + # External or same-domain page + resp = await page.goto(href, wait_until="networkidle", timeout=20000) + if resp and resp.status < 400: + await page.wait_for_timeout(2000) + await _expand_all_interactive(page) # Expand accordions on target page too + await page.wait_for_timeout(500) + + text = await page.evaluate(""" + () => { + const main = document.querySelector('main, article, [role="main"], .content, #content'); + return (main || document.body).textContent?.trim() || ''; + } + """) + if text and len(text) > 50: + result.documents.append(DiscoveredDSI( + title=title, url=href, source_url=url, + language=lang, + doc_type="cross_domain" if not _is_allowed_domain(href, base_domain) else "html_page", + text=text[:50000], word_count=len(text.split()), + )) + + # Navigate back to source page for next link + await page.goto(url, wait_until="networkidle", timeout=20000) + await page.wait_for_timeout(1000) + await _expand_all_interactive(page) + + except Exception as e: + result.errors.append(f"Failed to load {href}: {str(e)[:80]}") + try: + await page.goto(url, wait_until="networkidle", timeout=20000) + except Exception: + pass + + except Exception as e: + result.errors.append(f"Discovery failed: {str(e)[:100]}") + logger.error("DSI discovery failed: %s", e) + + result.total_found = len(result.documents) + result.languages_detected = list(set( + d.language for d in result.documents if d.language + )) + logger.info("DSI discovery complete: %d documents found in %s", + result.total_found, result.languages_detected) + return result + + +async def _find_dsi_links(page: Page, base_domain: str) -> list[dict]: + """Find all links whose text or href matches DSI keywords.""" + try: + all_links = await page.evaluate(""" + () => [...document.querySelectorAll('a[href]')].map(a => ({ + href: a.href, + text: (a.textContent || '').trim().substring(0, 200), + ariaLabel: a.getAttribute('aria-label') || '', + title: a.getAttribute('title') || '', + visible: a.getBoundingClientRect().width > 0, + })) + """) + dsi_links = [] + for link in (all_links or []): + search_text = f"{link['text']} {link['ariaLabel']} {link['title']}".lower() + href = link["href"] + href_lower = href.lower() + + # Match by link text or href + is_match = any(kw in search_text or kw in href_lower for kw in ALL_DSI_KEYWORDS) + if not is_match: + continue + + # Allow same domain + related domains + PDFs + if _is_allowed_domain(href, base_domain) or href.endswith(".pdf"): + dsi_links.append({ + "href": href, + "text": link["text"], + "visible": link["visible"], + }) + + return dsi_links + except Exception as e: + logger.warning("DSI link scan failed: %s", e) + return [] + + +async def _expand_all_interactive(page: Page) -> None: + """Expand all accordions, tabs, details, dropdowns on the page.""" + try: + await page.evaluate(""" + () => { + // 1. Open all
elements + document.querySelectorAll('details:not([open])').forEach(d => d.open = true); + + // 2. Click all accordion buttons + const accSelectors = [ + 'button[aria-expanded="false"]', + '[class*="accordion"]:not([class*="open"]) > button', + '[class*="accordion"]:not([class*="open"]) > a', + '[class*="collapse"] > button', + '[class*="toggle"]:not(.active)', + '[data-toggle="collapse"]', + '[data-bs-toggle="collapse"]', + '.panel-heading:not(.active) a', + ]; + for (const sel of accSelectors) { + document.querySelectorAll(sel).forEach(el => { + try { el.click(); } catch {} + }); + } + + // 3. Click all "show more" / "read more" buttons + const moreButtons = document.querySelectorAll( + 'button, a' + ); + for (const btn of moreButtons) { + const text = (btn.textContent || '').toLowerCase().trim(); + if (/^(mehr|more|weiterlesen|read more|show more|anzeigen|details|alle anzeigen)/.test(text)) { + try { btn.click(); } catch {} + } + } + + // 4. Expand all tab panels (click each tab) + document.querySelectorAll('[role="tab"]').forEach(tab => { + try { tab.click(); } catch {} + }); + } + """) + except Exception as e: + logger.debug("Expand interactive elements: %s", e) + + +async def _find_inline_dsi_sections(page: Page) -> list[dict]: + """Find DSI content already visible on the page (e.g. expanded accordions).""" + try: + sections = await page.evaluate(""" + () => { + const results = []; + // Find headings that match DSI keywords + const headings = document.querySelectorAll('h1, h2, h3, h4, h5'); + const dsiKeywords = [ + 'datenschutz', 'privacy', 'données', 'privacidad', 'protezione', + 'gegevensbescherming', 'ochrona danych', 'tietosuoja', 'integritet', + 'databeskyttelse', 'ochrana', 'adatvédel', 'confidential', + ]; + for (const h of headings) { + const text = (h.textContent || '').trim(); + const textLower = text.toLowerCase(); + if (!dsiKeywords.some(kw => textLower.includes(kw))) continue; + + // Get the section content following this heading + let content = ''; + let el = h.nextElementSibling; + let count = 0; + while (el && count < 50) { + if (el.tagName.match(/^H[1-5]$/)) break; + content += (el.textContent || '').trim() + '\\n'; + el = el.nextElementSibling; + count++; + } + + if (content.length > 100) { + results.push({ + title: text.substring(0, 200), + text: content.substring(0, 50000), + id: h.id || '', + }); + } + } + return results; + } + """) + return sections or [] + except Exception: + return []