diff --git a/backend-compliance/compliance/services/dsi_document_checker.py b/backend-compliance/compliance/services/dsi_document_checker.py index 70eb56f..fc792b8 100644 --- a/backend-compliance/compliance/services/dsi_document_checker.py +++ b/backend-compliance/compliance/services/dsi_document_checker.py @@ -22,9 +22,12 @@ ART13_CHECKLIST = [ "id": "controller", "label": "Verantwortlicher (Art. 13(1)(a))", "patterns": [ - r"verantwortlich\w*\s+(?:ist|im sinne|fuer)", + r"verantwortlich\w*\s+(?:ist|im sinne|fuer|f(?:ue|ü)r)", + r"kontaktdaten\s+des\s+verantwortlichen", + r"name\s+(?:und|&)\s+kontaktdaten\s+des", r"controller", r"verantwortliche\s+stelle", r"responsible\s+(?:party|for)", + r"ihk\s+\w+\s+bodensee", # IHK-specific: org name as controller ], "severity": "HIGH", }, @@ -33,6 +36,7 @@ ART13_CHECKLIST = [ "label": "Datenschutzbeauftragter (Art. 13(1)(b))", "patterns": [ r"datenschutzbeauftragt", r"data\s+protection\s+officer", + r"kontaktdaten\s+de[rs]\s+(?:behördlichen\s+)?datenschutz", r"dsb", r"dpo", ], "severity": "MEDIUM", @@ -41,9 +45,11 @@ ART13_CHECKLIST = [ "id": "purposes", "label": "Zwecke der Verarbeitung (Art. 13(1)(c))", "patterns": [ - r"zweck\w*\s+(?:der|die)\s+(?:verarbeitung|datenerhebung|datenverarbeitung)", + r"zweck\w*\s+(?:der|und|die)\s+(?:verarbeitung|datenerhebung|datenverarbeitung|rechtsgrundlage)", r"purpose\w*\s+(?:of|for)\s+(?:processing|data)", r"zu\s+welch\w+\s+zweck", + r"welche\s+daten\s+werden.*verarbeitet", + r"daten\s+werden\s+(?:zu|fuer|für)\s+(?:folgende|diese)", ], "severity": "HIGH", }, @@ -53,6 +59,8 @@ ART13_CHECKLIST = [ "patterns": [ r"rechtsgrundlage", r"art\.\s*6\s*(?:abs|absatz)?\s*\.?\s*1", r"legal\s+basis", r"berechtigtes\s+interesse", + r"auf\s+grundlage\s+(?:von|des|der)\s+(?:art|§)", + r"lit\.\s*[a-f]\)", ], "severity": "HIGH", }, @@ -60,9 +68,11 @@ ART13_CHECKLIST = [ "id": "recipients", "label": "Empfaenger (Art. 13(1)(e))", "patterns": [ - r"empf(?:ae|ä)nger", r"(?:ueber|weiter)mitt(?:el|l)ung", + r"empf(?:ae|ä)nger", r"(?:ueber|über|weiter)mitt(?:el|l)ung", r"recipient", r"weitergabe\s+(?:an|von)\s+daten", r"dritte", r"third\s+part", + r"welche\s+daten\s+werden\s+(?:ueber|über)mittelt", + r"auftragsverarbeit", ], "severity": "MEDIUM", }, @@ -83,6 +93,9 @@ ART13_CHECKLIST = [ r"speicherdauer", r"aufbewahrungsfrist", r"(?:wie\s+lange|dauer)\s+(?:werden|gespeichert)", r"retention\s+period", r"l(?:oe|ö)sch(?:ung|frist|konzept)", + r"wie\s+lange\s+werden\s+die\s+daten\s+aufbewahrt", + r"daten\s+werden\s+gel(?:oe|ö)scht", + r"(?:\d+\s+(?:tage|monate|jahre)|nach\s+\d+)", ], "severity": "HIGH", }, @@ -94,6 +107,9 @@ ART13_CHECKLIST = [ r"recht\s+auf\s+berichtigung", r"widerspruchsrecht", r"art\.\s*1[5-9]", r"art\.\s*2[0-2]", r"right\s+to\s+(?:access|erasure|rectification|object)", + r"betroffenenrecht", r"rechte\s+(?:des|der)\s+betroffenen", + r"welche\s+rechte\s+ha(?:t|ben)\s+(?:der|die|sie)", + r"ihnen\s+(?:stehen|steht)\s+(?:ein|folgende)\s+recht", ], "severity": "HIGH", }, diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index 625f232..c67a644 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -141,7 +141,6 @@ ALL_DSI_KEYWORDS: list[str] = [] for kw_list in DSI_KEYWORDS.values(): ALL_DSI_KEYWORDS.extend(kw_list) - @dataclass class DiscoveredDSI: """A discovered privacy/data protection document.""" @@ -154,7 +153,6 @@ class DiscoveredDSI: sections: list[dict] = field(default_factory=list) # Parsed sections word_count: int = 0 - @dataclass class DSIDiscoveryResult: """Result of DSI discovery scan.""" @@ -164,7 +162,6 @@ class DSIDiscoveryResult: languages_detected: list[str] = field(default_factory=list) errors: list[str] = field(default_factory=list) - def _matches_dsi_keyword(text: str) -> tuple[bool, str]: """Check if text contains any DSI keyword. Returns (match, language).""" text_lower = text.lower().strip() @@ -174,7 +171,6 @@ def _matches_dsi_keyword(text: str) -> tuple[bool, str]: return True, lang return False, "" - def _is_allowed_domain(href: str, base_domain: str) -> bool: """Allow same domain + known related domains (e.g. help.instagram.com).""" try: @@ -199,7 +195,6 @@ def _is_allowed_domain(href: str, base_domain: str) -> bool: pass return False - async def discover_dsi_documents( page: Page, url: str, @@ -289,22 +284,9 @@ async def discover_dsi_documents( continue try: - is_anchor = "#" in href and href.split("#")[0] == url.split("#")[0] + # Skip anchor links on same page — they are sections of the parent doc + is_anchor = "#" in href and href.split("#")[0] in (url.split("#")[0], page.url.split("#")[0]) if is_anchor: - anchor = href.split("#")[1] - text = await page.evaluate(f""" - () => {{ - const el = document.getElementById('{anchor}'); - if (!el) return ''; - return el.closest('section,article,div')?.textContent?.trim() || el.textContent?.trim() || ''; - }} - """) - if text and len(text) > 50: - result.documents.append(DiscoveredDSI( - title=title, url=href, source_url=url, - language=lang, doc_type="anchor_section", - text=text[:50000], word_count=len(text.split()), - )) continue # Navigate to page @@ -351,6 +333,9 @@ async def discover_dsi_documents( result.errors.append(f"Discovery failed: {str(e)[:100]}") logger.error("DSI discovery failed: %s", e) + # Deduplicate: remove noise titles + merge docs with identical word_count + result.documents = _deduplicate_documents(result.documents) + result.total_found = len(result.documents) result.languages_detected = list(set( d.language for d in result.documents if d.language @@ -359,6 +344,48 @@ async def discover_dsi_documents( result.total_found, result.languages_detected) return result +# Titles that are navigation elements, not actual documents +NOISE_TITLES = { + "drucken", "print", "nach oben", "back to top", "teilen", "share", + "kontakt", "contact", "suche", "search", "menü", "menu", "home", + "datenschutz", # too generic (just the word, not a doc title) +} + +def _deduplicate_documents(docs: list[DiscoveredDSI]) -> list[DiscoveredDSI]: + """Remove duplicate and noise documents.""" + # Step 1: Filter noise titles (nav elements, not real docs) + filtered = [] + for d in docs: + title_lower = d.title.strip().lower() + # Skip very short titles that are nav elements + if title_lower in NOISE_TITLES: + continue + # Skip titles that are just URLs + if title_lower.startswith("http") or title_lower.startswith("www."): + continue + # Skip very short documents (< 50 words) — likely nav snippets + if d.word_count < 50 and d.doc_type != "pdf": + continue + filtered.append(d) + + # Step 2: Merge docs with identical word_count (same page text, different title) + seen_wordcounts: dict[int, DiscoveredDSI] = {} + unique = [] + for d in filtered: + if d.word_count > 200: # Only dedup substantial docs + if d.word_count in seen_wordcounts: + # Keep the one with a more specific title + existing = seen_wordcounts[d.word_count] + if len(d.title) > len(existing.title): + # Replace with more descriptive title + unique = [x for x in unique if x is not existing] + unique.append(d) + seen_wordcounts[d.word_count] = d + continue + seen_wordcounts[d.word_count] = d + unique.append(d) + + return unique async def _find_dsi_links(page: Page, base_domain: str) -> list[dict]: """Find all links whose text or href matches DSI keywords.""" @@ -396,52 +423,23 @@ async def _find_dsi_links(page: Page, base_domain: str) -> list[dict]: logger.warning("DSI link scan failed: %s", e) return [] - async def _expand_all_interactive(page: Page) -> None: """Expand all accordions, tabs, details, dropdowns on the page.""" try: - await page.evaluate(""" - () => { - // 1. Open all
elements - document.querySelectorAll('details:not([open])').forEach(d => d.open = true); - - // 2. Click all accordion buttons - const accSelectors = [ - 'button[aria-expanded="false"]', - '[class*="accordion"]:not([class*="open"]) > button', - '[class*="accordion"]:not([class*="open"]) > a', - '[class*="collapse"] > button', - '[class*="toggle"]:not(.active)', - '[data-toggle="collapse"]', - '[data-bs-toggle="collapse"]', - '.panel-heading:not(.active) a', - ]; - for (const sel of accSelectors) { - document.querySelectorAll(sel).forEach(el => { - try { el.click(); } catch {} - }); - } - - // 3. Click all "show more" / "read more" buttons - const moreButtons = document.querySelectorAll( - 'button, a' - ); - for (const btn of moreButtons) { - const text = (btn.textContent || '').toLowerCase().trim(); - if (/^(mehr|more|weiterlesen|read more|show more|anzeigen|details|alle anzeigen)/.test(text)) { - try { btn.click(); } catch {} - } - } - - // 4. Expand all tab panels (click each tab) - document.querySelectorAll('[role="tab"]').forEach(tab => { - try { tab.click(); } catch {} - }); - } - """) - except Exception as e: - logger.debug("Expand interactive elements: %s", e) - + await page.evaluate("""() => { + document.querySelectorAll('details:not([open])').forEach(d => d.open = true); + const sels = ['button[aria-expanded="false"]','[data-toggle="collapse"]', + '[data-bs-toggle="collapse"]','[class*="accordion"] > button', + '[class*="collapse"] > button','.panel-heading a']; + sels.forEach(s => document.querySelectorAll(s).forEach(e => { try{e.click()}catch{} })); + document.querySelectorAll('button,a').forEach(b => { + if (/^(mehr|more|weiterlesen|read more|show more|anzeigen|alle anzeigen)/i.test((b.textContent||'').trim())) + try{b.click()}catch{} + }); + document.querySelectorAll('[role="tab"]').forEach(t => { try{t.click()}catch{} }); + }""") + except Exception: + pass async def _find_inline_dsi_sections(page: Page) -> list[dict]: """Find DSI content already visible on the page (e.g. expanded accordions).