""" DSI Discovery — Generic privacy document finder and parser. Finds all privacy/data protection documents on any website regardless of: - Technology (static HTML, SPA, WordPress, Typo3, etc.) - Structure (accordion, sidebar, footer, inline links, separate pages) - Format (HTML sections, PDF downloads, cross-domain links) - Language (all 26 EU/EEA official languages) Flow: 1. Load page with Playwright (full JS rendering) 2. Find all links matching DSI keywords (26 languages) 3. Expand accordions, click tabs, open dropdowns 4. Follow cross-domain links (e.g. instagram.com → help.instagram.com) 5. Extract document text from each link target 6. Return structured list of discovered documents """ import logging import re from dataclasses import dataclass, field from urllib.parse import urlparse, urljoin from playwright.async_api import Page logger = logging.getLogger(__name__) # Legal document keywords in all EU/EEA official languages. # Covers: DSI (privacy), AGB (terms), Widerruf (cancellation), # Cookie-Richtlinie, Impressum, NB (Nutzungsbedingungen). DSI_KEYWORDS: dict[str, list[str]] = { "de": [ # Datenschutz "datenschutz", "datenschutzerklaerung", "datenschutzinformation", "datenschutzhinweis", "datenschutzrichtlinie", "dsgvo", "privatsphäre", "datenschutzbestimmung", "verarbeitung personenbezogener daten", # AGB / Nutzungsbedingungen "allgemeine geschäftsbedingungen", "agb", "nutzungsbedingungen", "nutzungsordnung", "geschäftsbedingungen", # Widerruf "widerrufsbelehrung", "widerrufsrecht", "widerrufsformular", "widerruf", "rücktrittsrecht", # Cookie "cookie-richtlinie", "cookie-policy", "cookie-hinweis", ], "en": [ "privacy policy", "privacy notice", "data protection", "data policy", "privacy statement", "gdpr", "personal data", "cookie policy", "terms of service", "terms and conditions", "terms of use", "cancellation policy", "right of withdrawal", "refund policy", "cookie notice", ], "fr": [ "politique de confidentialité", "protection des données", "données personnelles", "vie privée", "rgpd", "conditions générales", "conditions d'utilisation", "droit de rétractation", "politique de cookies", ], "es": [ "política de privacidad", "protección de datos", "datos personales", "aviso de privacidad", "términos y condiciones", "condiciones de uso", "derecho de desistimiento", "política de cookies", ], "it": [ "informativa sulla privacy", "protezione dei dati", "dati personali", "privacy policy", "termini e condizioni", "condizioni d'uso", "diritto di recesso", "politica dei cookie", ], "nl": [ "privacybeleid", "gegevensbescherming", "privacyverklaring", "persoonsgegevens", "avg", "algemene voorwaarden", "gebruiksvoorwaarden", "herroepingsrecht", "cookiebeleid", ], "pl": [ "polityka prywatności", "ochrona danych osobowych", "dane osobowe", "rodo", "regulamin", "warunki korzystania", "prawo odstąpienia", "polityka cookies", ], "pt": [ "política de privacidade", "proteção de dados", "dados pessoais", "lgpd", "termos e condições", "condições de utilização", "direito de resolução", "política de cookies", ], "sv": [ "integritetspolicy", "dataskydd", "personuppgifter", "sekretesspolicy", "allmänna villkor", "användarvillkor", "ångerrätt", "cookiepolicy", ], "da": [ "privatlivspolitik", "databeskyttelse", "personoplysninger", "persondatapolitik", "handelsbetingelser", "brugsbetingelser", "fortrydelsesret", "cookiepolitik", ], "fi": [ "tietosuojaseloste", "tietosuoja", "henkilötiedot", "rekisteriseloste", "yleiset ehdot", "käyttöehdot", "peruutusoikeus", "evästekäytäntö", ], "cs": ["zásady ochrany osobních údajů", "ochrana osobních údajů", "zpracování osobních údajů", "obchodní podmínky", "zásady cookies"], "el": ["πολιτική απορρήτου", "προστασία δεδομένων", "προσωπικά δεδομένα", "όροι χρήσης", "πολιτική cookies"], "hu": ["adatvédelmi szabályzat", "adatvédelem", "személyes adatok", "általános szerződési feltételek", "cookie szabályzat"], "ro": ["politica de confidențialitate", "protecția datelor", "date cu caracter personal", "termeni și condiții", "politica cookies"], "bg": ["политика за поверителност", "защита на данните", "лични данни", "общи условия", "политика за бисквитки"], "hr": ["politika privatnosti", "zaštita podataka", "osobni podaci", "opći uvjeti", "politika kolačića"], "sk": ["zásady ochrany osobných údajov", "ochrana osobných údajov", "obchodné podmienky", "zásady cookies"], "sl": ["politika zasebnosti", "varstvo podatkov", "osebni podatki", "splošni pogoji", "politika piškotkov"], "et": ["privaatsuspoliitika", "andmekaitse", "isikuandmed", "kasutustingimused", "küpsiste poliitika"], "lt": ["privatumo politika", "duomenų apsauga", "asmens duomenys", "naudojimosi sąlygos", "slapukų politika"], "lv": ["privātuma politika", "datu aizsardzība", "personas dati", "lietošanas noteikumi", "sīkdatņu politika"], "mt": ["politika tal-privatezza", "protezzjoni tad-data", "termini u kundizzjonijiet"], "ga": ["polasaí príobháideachais", "cosaint sonraí", "téarmaí agus coinníollacha"], "is": ["persónuverndarstefna", "persónuvernd", "skilmálar og skilyrði"], "no": ["personvernerklæring", "personvern", "personopplysninger", "brukervilkår", "angrerett", "informasjonskapsler"], } # Flatten all keywords for quick matching ALL_DSI_KEYWORDS: list[str] = [] for kw_list in DSI_KEYWORDS.values(): ALL_DSI_KEYWORDS.extend(kw_list) @dataclass class DiscoveredDSI: """A discovered privacy/data protection document.""" title: str url: str source_url: str # Page where the link was found language: str = "" doc_type: str = "" # "html_section", "html_page", "pdf", "accordion", "cross_domain" text: str = "" # Extracted full text sections: list[dict] = field(default_factory=list) # Parsed sections word_count: int = 0 @dataclass class DSIDiscoveryResult: """Result of DSI discovery scan.""" base_url: str documents: list[DiscoveredDSI] = field(default_factory=list) total_found: int = 0 languages_detected: list[str] = field(default_factory=list) errors: list[str] = field(default_factory=list) def _matches_dsi_keyword(text: str) -> tuple[bool, str]: """Check if text contains any DSI keyword. Returns (match, language).""" text_lower = text.lower().strip() for lang, keywords in DSI_KEYWORDS.items(): for kw in keywords: if kw in text_lower: return True, lang return False, "" def _is_allowed_domain(href: str, base_domain: str) -> bool: """Allow same domain + known related domains (e.g. help.instagram.com).""" try: link_domain = urlparse(href).netloc.replace("www.", "") base_clean = base_domain.replace("www.", "") # Same domain if link_domain == base_clean: return True # Subdomain (help.instagram.com for instagram.com) if link_domain.endswith(f".{base_clean}"): return True # Parent domain (instagram.com links from about.instagram.com) if base_clean.endswith(f".{link_domain}"): return True # Known related patterns parts_base = base_clean.split(".") parts_link = link_domain.split(".") if len(parts_base) >= 2 and len(parts_link) >= 2: if parts_base[-2] == parts_link[-2] and parts_base[-1] == parts_link[-1]: return True # Same registrable domain except Exception: pass return False async def discover_dsi_documents( page: Page, url: str, max_documents: int = 30, ) -> DSIDiscoveryResult: """Discover all privacy/data protection documents on a website. Works generically regardless of website technology, structure, or language. """ result = DSIDiscoveryResult(base_url=url) base_domain = urlparse(url).netloc seen_urls: set[str] = set() seen_titles: set[str] = set() try: # Step 1: Load the page await page.goto(url, wait_until="networkidle", timeout=30000) await page.wait_for_timeout(2000) # Step 2: Find DSI links in current page links = await _find_dsi_links(page, base_domain) logger.info("Found %d DSI links on %s", len(links), url) # Step 3: Expand accordions, tabs, dropdowns to find hidden content await _expand_all_interactive(page) await page.wait_for_timeout(1000) # Step 3b: Re-scan after expanding (may reveal new links) links_after = await _find_dsi_links(page, base_domain) for link in links_after: if link["href"] not in [l["href"] for l in links]: links.append(link) # Step 4: Check for inline DSI sections (accordion content already visible) inline_sections = await _find_inline_dsi_sections(page) for section in inline_sections: title_norm = section["title"].strip().lower() if title_norm not in seen_titles: seen_titles.add(title_norm) is_dsi, lang = _matches_dsi_keyword(section["title"]) doc = DiscoveredDSI( title=section["title"], url=f"{url}#{section.get('id', '')}", source_url=url, language=lang, doc_type="html_section", text=section["text"], word_count=len(section["text"].split()), ) result.documents.append(doc) # Step 5: Follow each DSI link and extract content for link_info in links[:max_documents]: href = link_info["href"] if href in seen_urls: continue seen_urls.add(href) title = link_info["text"] title_norm = title.strip().lower() if title_norm in seen_titles: continue seen_titles.add(title_norm) is_dsi, lang = _matches_dsi_keyword(title) is_pdf = href.lower().endswith(".pdf") if is_pdf: result.documents.append(DiscoveredDSI( title=title, url=href, source_url=url, language=lang, doc_type="pdf", text="[PDF — Textextraktion erforderlich]", )) continue # Navigate to the link and extract text try: is_anchor = "#" in href and href.split("#")[0] == url.split("#")[0] if is_anchor: anchor = href.split("#")[1] text = await page.evaluate(f""" () => {{ const el = document.getElementById('{anchor}'); if (!el) return ''; return el.closest('section,article,div')?.textContent?.trim() || el.textContent?.trim() || ''; }} """) if text and len(text) > 50: result.documents.append(DiscoveredDSI( title=title, url=href, source_url=url, language=lang, doc_type="anchor_section", text=text[:50000], word_count=len(text.split()), )) continue # External or same-domain page resp = await page.goto(href, wait_until="networkidle", timeout=20000) if resp and resp.status < 400: await page.wait_for_timeout(2000) await _expand_all_interactive(page) # Expand accordions on target page too await page.wait_for_timeout(500) text = await page.evaluate(""" () => { const main = document.querySelector('main, article, [role="main"], .content, #content'); return (main || document.body).textContent?.trim() || ''; } """) if text and len(text) > 50: result.documents.append(DiscoveredDSI( title=title, url=href, source_url=url, language=lang, doc_type="cross_domain" if not _is_allowed_domain(href, base_domain) else "html_page", text=text[:50000], word_count=len(text.split()), )) # Navigate back to source page for next link await page.goto(url, wait_until="networkidle", timeout=20000) await page.wait_for_timeout(1000) await _expand_all_interactive(page) except Exception as e: result.errors.append(f"Failed to load {href}: {str(e)[:80]}") try: await page.goto(url, wait_until="networkidle", timeout=20000) except Exception: pass except Exception as e: result.errors.append(f"Discovery failed: {str(e)[:100]}") logger.error("DSI discovery failed: %s", e) result.total_found = len(result.documents) result.languages_detected = list(set( d.language for d in result.documents if d.language )) logger.info("DSI discovery complete: %d documents found in %s", result.total_found, result.languages_detected) return result async def _find_dsi_links(page: Page, base_domain: str) -> list[dict]: """Find all links whose text or href matches DSI keywords.""" try: all_links = await page.evaluate(""" () => [...document.querySelectorAll('a[href]')].map(a => ({ href: a.href, text: (a.textContent || '').trim().substring(0, 200), ariaLabel: a.getAttribute('aria-label') || '', title: a.getAttribute('title') || '', visible: a.getBoundingClientRect().width > 0, })) """) dsi_links = [] for link in (all_links or []): search_text = f"{link['text']} {link['ariaLabel']} {link['title']}".lower() href = link["href"] href_lower = href.lower() # Match by link text or href is_match = any(kw in search_text or kw in href_lower for kw in ALL_DSI_KEYWORDS) if not is_match: continue # Allow same domain + related domains + PDFs if _is_allowed_domain(href, base_domain) or href.endswith(".pdf"): dsi_links.append({ "href": href, "text": link["text"], "visible": link["visible"], }) return dsi_links except Exception as e: logger.warning("DSI link scan failed: %s", e) return [] async def _expand_all_interactive(page: Page) -> None: """Expand all accordions, tabs, details, dropdowns on the page.""" try: await page.evaluate(""" () => { // 1. Open all
elements document.querySelectorAll('details:not([open])').forEach(d => d.open = true); // 2. Click all accordion buttons const accSelectors = [ 'button[aria-expanded="false"]', '[class*="accordion"]:not([class*="open"]) > button', '[class*="accordion"]:not([class*="open"]) > a', '[class*="collapse"] > button', '[class*="toggle"]:not(.active)', '[data-toggle="collapse"]', '[data-bs-toggle="collapse"]', '.panel-heading:not(.active) a', ]; for (const sel of accSelectors) { document.querySelectorAll(sel).forEach(el => { try { el.click(); } catch {} }); } // 3. Click all "show more" / "read more" buttons const moreButtons = document.querySelectorAll( 'button, a' ); for (const btn of moreButtons) { const text = (btn.textContent || '').toLowerCase().trim(); if (/^(mehr|more|weiterlesen|read more|show more|anzeigen|details|alle anzeigen)/.test(text)) { try { btn.click(); } catch {} } } // 4. Expand all tab panels (click each tab) document.querySelectorAll('[role="tab"]').forEach(tab => { try { tab.click(); } catch {} }); } """) except Exception as e: logger.debug("Expand interactive elements: %s", e) async def _find_inline_dsi_sections(page: Page) -> list[dict]: """Find DSI content already visible on the page (e.g. expanded accordions).""" try: sections = await page.evaluate(""" () => { const results = []; // Find headings that match DSI keywords const headings = document.querySelectorAll('h1, h2, h3, h4, h5'); const dsiKeywords = [ 'datenschutz', 'privacy', 'données', 'privacidad', 'protezione', 'gegevensbescherming', 'ochrona danych', 'tietosuoja', 'integritet', 'databeskyttelse', 'ochrana', 'adatvédel', 'confidential', ]; for (const h of headings) { const text = (h.textContent || '').trim(); const textLower = text.toLowerCase(); if (!dsiKeywords.some(kw => textLower.includes(kw))) continue; // Get the section content following this heading let content = ''; let el = h.nextElementSibling; let count = 0; while (el && count < 50) { if (el.tagName.match(/^H[1-5]$/)) break; content += (el.textContent || '').trim() + '\\n'; el = el.nextElementSibling; count++; } if (content.length > 100) { results.push({ title: text.substring(0, 200), text: content.substring(0, 50000), id: h.id || '', }); } } return results; } """) return sections or [] except Exception: return []