From 275bdf9848cef09525536da2fac9b1cccfab1c21 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 4 May 2026 23:22:30 +0200 Subject: [PATCH] fix: Add missing service modules required by agent_scan_routes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These files existed on the feature branch but were never cherry-picked to main, causing ModuleNotFoundError on import: - dse_parser.py — parses DSE HTML into structured sections - dse_matcher.py — matches detected services against DSE sections - mandatory_content_checker.py — checks Art. 13 DSGVO mandatory fields - legal_basis_validator.py — validates legal basis (lit. a-f) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../compliance/services/dse_matcher.py | 202 ++++++++++++ .../compliance/services/dse_parser.py | 224 +++++++++++++ .../services/legal_basis_validator.py | 179 +++++++++++ .../services/mandatory_content_checker.py | 302 ++++++++++++++++++ 4 files changed, 907 insertions(+) create mode 100644 backend-compliance/compliance/services/dse_matcher.py create mode 100644 backend-compliance/compliance/services/dse_parser.py create mode 100644 backend-compliance/compliance/services/legal_basis_validator.py create mode 100644 backend-compliance/compliance/services/mandatory_content_checker.py diff --git a/backend-compliance/compliance/services/dse_matcher.py b/backend-compliance/compliance/services/dse_matcher.py new file mode 100644 index 0000000..d802672 --- /dev/null +++ b/backend-compliance/compliance/services/dse_matcher.py @@ -0,0 +1,202 @@ +""" +DSE Matcher — matches detected services against DSE sections and +generates TextReferences with original text, position, and corrections. +""" + +import logging +import re +from dataclasses import dataclass + +from compliance.services.dse_parser import DSESection, find_section_by_content, find_section_by_category + +logger = logging.getLogger(__name__) + +# Category → typical DSE section heading keywords +CATEGORY_SECTION_MAP = { + "tracking": ["cookie", "tracking", "webanalyse", "analytics", "statistik", "reichweitenmessung"], + "marketing": ["marketing", "werbung", "newsletter", "remarketing", "werbe"], + "payment": ["zahlung", "payment", "bezahl", "zahlungsabwicklung", "zahlungsdienst"], + "chatbot": ["chat", "kommunikation", "kundenservice", "kontakt", "livechat"], + "cdn": ["hosting", "bereitstellung", "technisch", "infrastruktur", "content delivery"], + "other": ["sonstig", "weitere", "dritte", "extern", "dienstleister"], +} + + +@dataclass +class TextReference: + """Reference to a specific text block in the DSE.""" + found: bool + source_url: str = "" + document_type: str = "Datenschutzerklaerung" + section_heading: str = "" + section_number: str = "" + parent_section: str = "" + paragraph_index: int = 0 + original_text: str = "" + issue: str = "" # "missing", "incomplete", "incorrect" + correction_type: str = "" # "insert", "replace", "append" + correction_text: str = "" + insert_after: str = "" + + +def match_service_to_dse( + service_name: str, + service_category: str, + sections: list[DSESection], + url: str = "", +) -> TextReference: + """Find where a service is mentioned in the DSE and build a TextReference.""" + # Step 1: Search for exact service name + section = find_section_by_content(sections, service_name) + + if section: + # Found — extract the relevant paragraph + original = _extract_relevant_paragraph(section.content, service_name) + return TextReference( + found=True, + source_url=url, + section_heading=section.heading, + section_number=section.section_number, + parent_section=section.parent_heading, + paragraph_index=_find_paragraph_index(section.content, service_name), + original_text=original, + issue="", # Found and present — caller determines if complete + ) + + # Step 2: Search for provider name (e.g., "Google" for "Google Analytics") + # But only if the provider name is specific enough — avoid "Google" matching YouTube + provider = service_name.split()[0] if " " in service_name else service_name + if len(provider) < 4 or provider.lower() in ("the", "a", "an"): + provider = service_name # Too short/generic, use full name + + section = find_section_by_content(sections, provider) + # Verify: the section must actually be about THIS service, not just mention the provider + if section and provider.lower() != service_name.lower(): + # Check if the full service name or a close variant is in the section + content_lower = section.content.lower() + service_words = service_name.lower().split() + # At least 2 words of the service name must match (not just "Google") + matching_words = sum(1 for w in service_words if w in content_lower) + if matching_words < 2 and service_name.lower() not in content_lower: + section = None # False match — provider name found but wrong context + + if section: + original = _extract_relevant_paragraph(section.content, provider) + return TextReference( + found=True, + source_url=url, + section_heading=section.heading, + section_number=section.section_number, + parent_section=section.parent_heading, + paragraph_index=_find_paragraph_index(section.content, provider), + original_text=original, + issue="incomplete", # Provider mentioned but not specific service + ) + + # Step 3: Not found — suggest insertion point + insert_section = find_section_by_category(sections, service_category) + insert_after = insert_section.heading if insert_section else "" + + # If no category match, find the last "Cookies"/"Tracking" or "Sonstiges" section + if not insert_after: + for s in reversed(sections): + h = s.heading.lower() + if any(kw in h for kw in ["cookie", "datenschutz", "daten"]): + insert_after = s.heading + break + + return TextReference( + found=False, + source_url=url, + document_type="Datenschutzerklaerung", + issue="missing", + correction_type="insert", + insert_after=insert_after, + ) + + +def build_text_references( + detected_services: list[dict], + dse_services: list[dict], + sections: list[DSESection], + url: str = "", +) -> dict[str, TextReference]: + """Build TextReferences for all detected services. + + Returns dict: service_id → TextReference + """ + refs: dict[str, TextReference] = {} + + for svc in detected_services: + service_id = svc.get("id", svc.get("name", "")) + service_name = svc.get("name", "") + category = svc.get("category", "other") + + ref = match_service_to_dse(service_name, category, sections, url) + + # Check if service is in the DSE SOLL list + dse_match = _find_in_dse_list(service_name, dse_services) + + if ref.found and dse_match: + ref.issue = "" # All good — documented and present + elif ref.found and not dse_match: + # Found in text but not in LLM extraction — still OK + ref.issue = "" + elif not ref.found: + ref.issue = "missing" + ref.correction_type = "insert" + + refs[service_id] = ref + + return refs + + +def _extract_relevant_paragraph(content: str, search_term: str) -> str: + """Extract the paragraph containing the search term.""" + search_lower = search_term.lower() + content_lower = content.lower() + + # Find position of search term + pos = content_lower.find(search_lower) + if pos == -1: + return content[:300] + + # Find sentence/paragraph boundaries + # Look backwards for paragraph break + start = max(0, content.rfind(".", 0, pos)) + if start > 0: + start += 2 # Skip ". " + else: + start = max(0, pos - 100) + + # Look forward for end of paragraph + end = content.find(".", pos + len(search_term)) + if end == -1 or end - pos > 500: + end = min(len(content), pos + 300) + else: + end += 1 # Include the period + + return content[start:end].strip() + + +def _find_paragraph_index(content: str, search_term: str) -> int: + """Find which paragraph (1-based) contains the search term.""" + paragraphs = re.split(r"\n\n|\n(?=[A-Z])", content) + search_lower = search_term.lower() + for i, para in enumerate(paragraphs, 1): + if search_lower in para.lower(): + return i + return 0 + + +def _find_in_dse_list(service_name: str, dse_services: list[dict]) -> dict | None: + """Check if a service appears in the LLM-extracted DSE service list.""" + name_lower = service_name.lower() + for svc in dse_services: + dse_name = svc.get("name", "").lower() + if name_lower in dse_name or dse_name in name_lower: + return svc + # Check first word (provider match) + if name_lower.split()[0] in dse_name: + return svc + return None diff --git a/backend-compliance/compliance/services/dse_parser.py b/backend-compliance/compliance/services/dse_parser.py new file mode 100644 index 0000000..f10a201 --- /dev/null +++ b/backend-compliance/compliance/services/dse_parser.py @@ -0,0 +1,224 @@ +""" +DSE Parser — parses privacy policy HTML into structured sections. + +Extracts headings, section numbers, content blocks and builds a +hierarchical structure that enables precise text references. +""" + +import logging +import re +from dataclasses import dataclass, field +from html.parser import HTMLParser + +logger = logging.getLogger(__name__) + + +@dataclass +class DSESection: + """A section in a privacy policy.""" + heading: str + heading_level: int # 1-4 + section_number: str # "2.5" or "" if no number + content: str # Plain text content + html: str # Original HTML content + parent_heading: str = "" + url: str = "" + element_id: str = "" + paragraph_count: int = 0 + + +class _HeadingExtractor(HTMLParser): + """Extract headings and their content from HTML.""" + + def __init__(self): + super().__init__() + self.sections: list[dict] = [] + self._current_tag = "" + self._in_heading = False + self._heading_level = 0 + self._heading_text = "" + self._heading_id = "" + self._content_parts: list[str] = [] + self._html_parts: list[str] = [] + self._skip_tags = {"script", "style", "nav", "footer", "header"} + self._skip_depth = 0 + self._p_count = 0 + + def handle_starttag(self, tag, attrs): + attrs_dict = dict(attrs) + if tag in self._skip_tags: + self._skip_depth += 1 + return + if self._skip_depth > 0: + return + + if tag in ("h1", "h2", "h3", "h4"): + # Save previous section + if self._heading_text: + self._save_section() + self._in_heading = True + self._heading_level = int(tag[1]) + self._heading_text = "" + self._heading_id = attrs_dict.get("id", "") + self._content_parts = [] + self._html_parts = [] + self._p_count = 0 + + if tag == "p": + self._p_count += 1 + + # Reconstruct HTML + attr_str = " ".join(f'{k}="{v}"' for k, v in attrs) + self._html_parts.append(f"<{tag}{' ' + attr_str if attr_str else ''}>") + + def handle_endtag(self, tag): + if tag in self._skip_tags and self._skip_depth > 0: + self._skip_depth -= 1 + return + if self._skip_depth > 0: + return + + if tag in ("h1", "h2", "h3", "h4"): + self._in_heading = False + + self._html_parts.append(f"") + + def handle_data(self, data): + if self._skip_depth > 0: + return + if self._in_heading: + self._heading_text += data.strip() + else: + self._content_parts.append(data) + self._html_parts.append(data) + + def _save_section(self): + if not self._heading_text: + return + content = " ".join(self._content_parts) + content = re.sub(r"\s+", " ", content).strip() + self.sections.append({ + "heading": self._heading_text.strip(), + "heading_level": self._heading_level, + "element_id": self._heading_id, + "content": content, + "html": "".join(self._html_parts), + "paragraph_count": self._p_count, + }) + + def finalize(self): + """Call after feeding all data to save the last section.""" + if self._heading_text: + self._save_section() + + +def parse_dse(html: str, url: str = "") -> list[DSESection]: + """Parse privacy policy HTML into structured sections.""" + extractor = _HeadingExtractor() + try: + extractor.feed(html) + extractor.finalize() + except Exception as e: + logger.warning("HTML parsing failed, falling back to regex: %s", e) + return _regex_fallback(html, url) + + if not extractor.sections: + return _regex_fallback(html, url) + + # Build parent hierarchy + sections: list[DSESection] = [] + parent_stack: list[str] = [""] # Stack of parent headings by level + + for raw in extractor.sections: + heading = raw["heading"] + level = raw["heading_level"] + + # Extract section number (e.g., "2.5" from "2.5 Webanalyse") + num_match = re.match(r"^(\d+(?:\.\d+)*)\s*[.:]?\s*", heading) + section_number = num_match.group(1) if num_match else "" + + # Track parent headings + while len(parent_stack) > level: + parent_stack.pop() + parent = parent_stack[-1] if parent_stack else "" + parent_stack.append(heading) + + sections.append(DSESection( + heading=heading, + heading_level=level, + section_number=section_number, + content=raw["content"][:2000], # Cap content length + html=raw["html"][:3000], + parent_heading=parent, + url=url, + element_id=raw["element_id"], + paragraph_count=raw["paragraph_count"], + )) + + logger.info("Parsed DSE: %d sections from %s", len(sections), url) + return sections + + +def _regex_fallback(html: str, url: str) -> list[DSESection]: + """Fallback parser using regex when HTML parsing fails.""" + # Strip scripts and styles + clean = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) + + sections = [] + # Find all headings + for match in re.finditer(r"]*(?:id=[\"']([^\"']*)[\"'])?[^>]*>(.*?)", clean, re.DOTALL | re.IGNORECASE): + level = int(match.group(1)) + elem_id = match.group(2) or "" + heading = re.sub(r"<[^>]+>", "", match.group(3)).strip() + + # Get content until next heading + start = match.end() + next_heading = re.search(r"]+>", " ", content) + content = re.sub(r"\s+", " ", content).strip() + + num_match = re.match(r"^(\d+(?:\.\d+)*)", heading) + + sections.append(DSESection( + heading=heading, + heading_level=level, + section_number=num_match.group(1) if num_match else "", + content=content[:2000], + html="", + url=url, + element_id=elem_id, + )) + + return sections + + +def find_section_by_content(sections: list[DSESection], search_text: str) -> DSESection | None: + """Find the section that contains specific text.""" + search_lower = search_text.lower() + for section in sections: + if search_lower in section.content.lower(): + return section + return None + + +def find_section_by_category(sections: list[DSESection], category: str) -> DSESection | None: + """Find the section most likely to contain a service category.""" + category_keywords = { + "tracking": ["cookie", "tracking", "webanalyse", "analytics", "statistik"], + "marketing": ["marketing", "werbung", "newsletter", "remarketing"], + "payment": ["zahlung", "payment", "bezahlung", "zahlungsabwicklung"], + "chatbot": ["chat", "kommunikation", "kundenservice", "kontakt"], + "cdn": ["hosting", "bereitstellung", "technisch", "infrastruktur", "cdn"], + "other": ["sonstig", "weitere", "dritte", "extern"], + } + keywords = category_keywords.get(category, category_keywords["other"]) + + for section in sections: + heading_lower = section.heading.lower() + content_lower = section.content.lower()[:500] + for kw in keywords: + if kw in heading_lower or kw in content_lower: + return section + return None diff --git a/backend-compliance/compliance/services/legal_basis_validator.py b/backend-compliance/compliance/services/legal_basis_validator.py new file mode 100644 index 0000000..bf75d8a --- /dev/null +++ b/backend-compliance/compliance/services/legal_basis_validator.py @@ -0,0 +1,179 @@ +""" +Legal Basis Validator — checks if the correct DSGVO legal basis (lit. a-f) +is used for each processing purpose in the privacy policy. + +⚠️ TECHNISCHE SCHULD / HARDCODED KNOWLEDGE: +Dieses Modul enthält hartkodierte Rechtsgrundlagen-Zuordnungen (CORRECT_BASIS dict). +Das ist ein TEMPORAERER Fallback bis die Control Library entsprechende Controls hat. + +MITTELFRISTIGES ZIEL: Dieses Dict durch RAG/Control-Library-Abfragen ersetzen. +Neue Controls sollten in der Pipeline generiert werden, z.B.: + "Cookie-Tracking erfordert Art. 6(1)(a) Einwilligung (EuGH C-673/17 Planet49)" + → canonical_controls mit scope_conditions + legal_ref + +BIS DAHIN: Dieses Dict wird als Fallback genutzt mit einem Warning-Log wenn +es herangezogen wird. Bei jedem neuen Gesetz/Urteil muss SOWOHL die Pipeline +als auch dieses Dict aktualisiert werden — oder besser: das Dict entfernen und +nur noch Controls nutzen. + +Erstellt: 2026-04-29 | Review-Datum: 2026-07-01 | Owner: Agent-Team + +Common mistakes detected: +- Cookie tracking on lit. f (legitimate interest) instead of lit. a (consent) +- Marketing emails on lit. f instead of lit. a +- Analytics on lit. b (contract) — incorrect overextension +- Klarna credit check without Art. 22 reference +""" + +import logging +import re +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + + +@dataclass +class LitFinding: + purpose: str + stated_basis: str + correct_basis: str + severity: str + text: str + legal_ref: str + original_text: str = "" + + +# Purpose → correct legal basis mapping +# Based on: DSK Kurzpapiere, Planet49 (EuGH C-673/17), BGH Cookie-Urteil +CORRECT_BASIS: dict[str, dict] = { + "cookie_tracking": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f", "legitimate interest"], + "detect_patterns": ["cookie", "tracking", "pixel", "analytics.*cookie"], + "ref": "EuGH C-673/17 (Planet49), §25 TDDDG", + }, + "web_analytics": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f", "vertragserfuellung", "lit. b", "lit.b"], + "detect_patterns": ["google analytics", "webanalyse", "web analytics", "reichweitenmessung", + "nutzungsanalyse", "hotjar", "matomo"], + "ref": "DSK Orientierungshilfe Telemedien, §25 TDDDG", + }, + "marketing_email": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["newsletter", "marketing.*mail", "werbe.*mail", "werbe.*email", + "marketing.*email", "werbliche.*kommunikation"], + "ref": "Art. 7 DSGVO, §7 UWG (Double Opt-In)", + }, + "remarketing": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["remarketing", "retargeting", "personalisierte werbung", + "personalized advertising", "custom audience"], + "ref": "§25 TDDDG, EuGH C-673/17", + }, + "credit_check": { + "correct": "lit. b/f + Art. 22 DSGVO Hinweis", + "wrong_patterns": [], # Not about wrong basis, but missing Art. 22 + "detect_patterns": ["bonitaet", "bonität", "kreditprüfung", "kreditpruefung", + "schufa", "auskunftei", "klarna.*rechnung", "ratenzahlung"], + "ref": "Art. 22 DSGVO (automatisierte Einzelentscheidung)", + "must_contain": ["art. 22", "art.22", "automatisierte entscheidung", + "automated decision", "einzelentscheidung"], + }, + "social_media_embed": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["facebook.*plugin", "social.*plugin", "like.*button", + "share.*button", "instagram.*embed", "twitter.*embed"], + "ref": "EuGH C-40/17 (Fashion ID), 2-Klick-Loesung", + }, + "session_recording": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["session.?recording", "session.?replay", "heatmap", + "mouseflow", "hotjar.*recording", "clarity.*recording", + "fullstory", "lucky orange"], + "ref": "§25 TDDDG, Aufzeichnung von Nutzerverhalten", + }, +} + + +def validate_legal_bases(dse_text: str) -> list[LitFinding]: + """Check if correct legal bases are used in the privacy policy. + + ⚠️ Uses HARDCODED CORRECT_BASIS dict as fallback. + TODO: Replace with RAG/Control Library query when lit-mapping Controls exist. + """ + logger.warning( + "legal_basis_validator: Using HARDCODED rules (CORRECT_BASIS dict). " + "This should be replaced with Control Library queries. Review date: 2026-07-01" + ) + findings = [] + text_lower = dse_text.lower() + + for purpose_id, rules in CORRECT_BASIS.items(): + # Step 1: Is this purpose mentioned in the DSE? + purpose_found = False + matched_text = "" + for pattern in rules["detect_patterns"]: + match = re.search(pattern, text_lower) + if match: + purpose_found = True + # Extract surrounding context (200 chars) + start = max(0, match.start() - 100) + end = min(len(text_lower), match.end() + 200) + matched_text = dse_text[start:end].strip() + break + + if not purpose_found: + continue + + context_lower = matched_text.lower() + + # Step 2: Check if wrong legal basis is stated + for wrong in rules["wrong_patterns"]: + if wrong in context_lower: + findings.append(LitFinding( + purpose=purpose_id, + stated_basis=wrong, + correct_basis=rules["correct"], + severity="HIGH", + text=f"Falsche Rechtsgrundlage: '{_purpose_label(purpose_id)}' nutzt " + f"'{wrong}' statt '{rules['correct']}'", + legal_ref=rules["ref"], + original_text=matched_text[:300], + )) + break + + # Step 3: Special check — must_contain (e.g., Art. 22 for credit checks) + if "must_contain" in rules: + has_required = any(req in context_lower for req in rules["must_contain"]) + if not has_required: + findings.append(LitFinding( + purpose=purpose_id, + stated_basis="(fehlt)", + correct_basis=rules["correct"], + severity="HIGH", + text=f"Pflichthinweis fehlt: '{_purpose_label(purpose_id)}' erwaehnt " + f"keine automatisierte Entscheidungsfindung ({rules['ref']})", + legal_ref=rules["ref"], + original_text=matched_text[:300], + )) + + return findings + + +def _purpose_label(purpose_id: str) -> str: + """German label for purpose ID.""" + labels = { + "cookie_tracking": "Cookie-Tracking", + "web_analytics": "Webanalyse", + "marketing_email": "Marketing-Emails/Newsletter", + "remarketing": "Remarketing/Retargeting", + "credit_check": "Bonitaetspruefung", + "social_media_embed": "Social Media Einbindung", + "session_recording": "Session Recording/Heatmaps", + } + return labels.get(purpose_id, purpose_id) diff --git a/backend-compliance/compliance/services/mandatory_content_checker.py b/backend-compliance/compliance/services/mandatory_content_checker.py new file mode 100644 index 0000000..021a264 --- /dev/null +++ b/backend-compliance/compliance/services/mandatory_content_checker.py @@ -0,0 +1,302 @@ +""" +Mandatory Content Checker — verifies that legally required content +is present on a website. Checks for missing documents, sections, +and mandatory information within documents. + +Knows what MUST be there (not just what IS there). +""" + +import logging +import re +from dataclasses import dataclass, field + +from compliance.services.dse_parser import DSESection + +logger = logging.getLogger(__name__) + + +@dataclass +class MandatoryFinding: + code: str + severity: str # "HIGH", "MEDIUM", "LOW" + category: str # "document_missing", "section_missing", "info_missing" + text: str + legal_ref: str + expected: str # What should be there + suggestion: str = "" # How to fix + + +# ═══════════════════════════════════════════════════════════════ +# MANDATORY DOCUMENTS (must exist as pages/links on the website) +# ═══════════════════════════════════════════════════════════════ + +MANDATORY_DOCUMENTS = [ + { + "id": "impressum", + "name": "Impressum", + "legal_ref": "§5 TMG, §18 MStV", + "patterns": [r"impressum", r"imprint", r"legal.?notice"], + "severity": "HIGH", + }, + { + "id": "datenschutz", + "name": "Datenschutzerklaerung", + "legal_ref": "Art. 13/14 DSGVO", + "patterns": [r"datenschutz", r"privacy", r"dsgvo"], + "severity": "HIGH", + }, + { + "id": "agb", + "name": "AGB / Nutzungsbedingungen", + "legal_ref": "§305 BGB (bei Vertragsschluss)", + "patterns": [r"agb", r"nutzungsbedingung", r"terms"], + "severity": "MEDIUM", + "only_ecommerce": True, # Nur bei Shops/Buchungsseiten + }, + { + "id": "widerruf", + "name": "Widerrufsbelehrung", + "legal_ref": "§355 BGB, Art. 246a §1 EGBGB (nur Fernabsatz)", + "patterns": [r"widerruf", r"cancellation.?policy", r"right.?of.?withdrawal"], + "severity": "MEDIUM", + "only_ecommerce": True, # Nur bei Fernabsatzvertraegen + }, +] + + +# ═══════════════════════════════════════════════════════════════ +# MANDATORY DSE SECTIONS (Art. 13 DSGVO Pflichtangaben) +# ═══════════════════════════════════════════════════════════════ + +MANDATORY_DSE_CONTENT = [ + { + "id": "verantwortlicher", + "name": "Name und Kontakt des Verantwortlichen", + "legal_ref": "Art. 13 Abs. 1 lit. a DSGVO", + "keywords": ["verantwortlich", "responsible", "controller", "betreiber"], + "severity": "HIGH", + }, + { + "id": "dsb_kontakt", + "name": "Kontaktdaten des Datenschutzbeauftragten", + "legal_ref": "Art. 13 Abs. 1 lit. b DSGVO", + "keywords": ["datenschutzbeauftragt", "data protection officer", "dsb", "dpo", + "behördlichen datenschutz", "behoerdlichen datenschutz", + "datenschutz@", "datenschutzbeauftragter"], + "severity": "HIGH", + }, + { + "id": "zwecke", + "name": "Zwecke der Datenverarbeitung", + "legal_ref": "Art. 13 Abs. 1 lit. c DSGVO", + "keywords": ["zweck", "purpose", "verarbeitungszweck", "verarbeitungszwecke", + "wozu", "wofuer", "zu welchem zweck", "nutzungszweck", + "zweck und rechtsgrundlage", "zwecke der verarbeitung"], + "severity": "HIGH", + }, + { + "id": "rechtsgrundlage", + "name": "Rechtsgrundlagen der Verarbeitung", + "legal_ref": "Art. 13 Abs. 1 lit. c DSGVO", + "keywords": ["rechtsgrundlage", "legal basis", "art. 6", "art.6", + "berechtigtes interesse", "einwilligung", "vertragserfuellung", + "vertragserfüllung", "rechtliche verpflichtung"], + "severity": "HIGH", + }, + { + "id": "speicherdauer", + "name": "Speicherdauer / Loeschfristen", + "legal_ref": "Art. 13 Abs. 2 lit. a DSGVO", + "keywords": ["speicherdauer", "aufbewahrung", "loeschung", "loeschfrist", + "storage period", "retention", "deletion"], + "severity": "HIGH", + }, + { + "id": "betroffenenrechte", + "name": "Betroffenenrechte (Auskunft, Loeschung, etc.)", + "legal_ref": "Art. 13 Abs. 2 lit. b-d DSGVO", + "keywords": ["betroffenenrecht", "auskunft", "berichtigung", "loeschung", + "einschraenkung", "widerspruch", "data subject rights", + "right to access", "right to erasure"], + "severity": "HIGH", + }, + { + "id": "beschwerderecht", + "name": "Beschwerderecht bei Aufsichtsbehoerde", + "legal_ref": "Art. 13 Abs. 2 lit. d DSGVO", + "keywords": ["aufsichtsbehoerde", "aufsichtsbehörde", "beschwerde", + "supervisory authority", "datenschutzbehoerde", + "landesbeauftragte", "bundesdatenschutz", "bfdi"], + "severity": "MEDIUM", + }, + { + "id": "drittlandtransfer", + "name": "Drittlandtransfer-Information", + "legal_ref": "Art. 13 Abs. 1 lit. f DSGVO", + "keywords": ["drittland", "drittst", "third countr", "usa", "transfer", + "standardvertragsklausel", "adequacy"], + "severity": "MEDIUM", + }, + { + "id": "automatisierte_entscheidung", + "name": "Automatisierte Entscheidungsfindung / Profiling", + "legal_ref": "Art. 13 Abs. 2 lit. f DSGVO", + "keywords": ["automatisiert", "profiling", "automated decision", "scoring"], + "severity": "MEDIUM", + }, +] + + +# ═══════════════════════════════════════════════════════════════ +# MANDATORY IMPRESSUM CONTENT (§5 TMG) +# ═══════════════════════════════════════════════════════════════ + +MANDATORY_IMPRESSUM_CONTENT = [ + { + "id": "geschaeftsfuehrer", + "name": "Geschaeftsfuehrer / Vertretungsberechtigter", + "legal_ref": "§5 Abs. 1 Nr. 1 TMG", + "keywords": ["geschaeftsfuehrer", "geschäftsführer", "ceo", "managing director", + "vertretungsberechtig", "vorstand"], + "severity": "HIGH", + }, + { + "id": "handelsregister", + "name": "Handelsregisternummer", + "legal_ref": "§5 Abs. 1 Nr. 4 TMG", + "keywords": ["handelsregister", "hrb", "hra", "amtsgericht", "registergericht", + "commercial register"], + "severity": "HIGH", + }, + { + "id": "ust_id", + "name": "Umsatzsteuer-Identifikationsnummer", + "legal_ref": "§5 Abs. 1 Nr. 6 TMG", + "keywords": ["ust-id", "ust.-id", "umsatzsteuer", "vat", "de\\d{9}"], + "severity": "MEDIUM", + }, + { + "id": "anschrift", + "name": "Anschrift (Strasse, PLZ, Ort)", + "legal_ref": "§5 Abs. 1 Nr. 1 TMG", + "keywords": ["str.", "straße", "strasse", "plz", "postleitzahl"], + "severity": "HIGH", + }, + { + "id": "kontakt", + "name": "Kontaktmoeglichkeit (Email oder Telefon)", + "legal_ref": "§5 Abs. 1 Nr. 2 TMG", + "keywords": ["@", "telefon", "phone", "e-mail", "email", "kontakt"], + "severity": "HIGH", + }, +] + + +ECOMMERCE_INDICATORS = [ + r"warenkorb", r"cart", r"shop", r"bestell", r"order", + r"checkout", r"kasse", r"buy", r"kaufen", r"add.?to.?cart", + r"stripe|paypal|klarna|mollie|adyen", # Payment providers +] + + +def _is_ecommerce(scanned_pages: list[str], html_content: str = "") -> bool: + """Detect if website is an e-commerce/transactional site.""" + all_text = " ".join(scanned_pages).lower() + " " + html_content.lower() + return any(re.search(p, all_text) for p in ECOMMERCE_INDICATORS) + + +def check_mandatory_documents( + scanned_pages: list[str], page_status: dict[str, int], + html_content: str = "", +) -> list[MandatoryFinding]: + """Check if mandatory documents/pages exist on the website.""" + findings = [] + is_shop = _is_ecommerce(scanned_pages, html_content) + + for doc in MANDATORY_DOCUMENTS: + # Skip e-commerce-only checks for non-shop websites + if doc.get("only_ecommerce") and not is_shop: + continue + + found = False + for page in scanned_pages: + if any(re.search(p, page, re.IGNORECASE) for p in doc["patterns"]): + status = page_status.get(page, 200) + if status < 400: + found = True + else: + findings.append(MandatoryFinding( + code=f"DOC-ERROR-{doc['id'].upper()}", + severity="HIGH", + category="document_error", + text=f"{doc['name']} existiert aber gibt HTTP {status} zurueck (Ladefehler!)", + legal_ref=doc["legal_ref"], + expected=doc["name"], + suggestion=f"Seite {page} ist nicht erreichbar. Pruefen ob ein Deployment-Fehler vorliegt.", + )) + found = True # Exists but broken + break + + if not found: + findings.append(MandatoryFinding( + code=f"DOC-MISSING-{doc['id'].upper()}", + severity=doc["severity"], + category="document_missing", + text=f"{doc['name']} nicht auf der Website gefunden ({doc['legal_ref']})", + legal_ref=doc["legal_ref"], + expected=f"Link zu {doc['name']} muss von jeder Seite erreichbar sein", + )) + + return findings + + +def check_dse_mandatory_content( + sections: list[DSESection], full_text: str, +) -> list[MandatoryFinding]: + """Check if privacy policy contains all mandatory sections per Art. 13 DSGVO.""" + findings = [] + text_lower = full_text.lower() + + for req in MANDATORY_DSE_CONTENT: + found = any(kw in text_lower for kw in req["keywords"]) + if not found: + # Also check section headings + found = any( + any(kw in s.heading.lower() or kw in s.content.lower()[:200] + for kw in req["keywords"]) + for s in sections + ) + + if not found: + findings.append(MandatoryFinding( + code=f"DSE-CONTENT-{req['id'].upper()}", + severity=req["severity"], + category="section_missing", + text=f"Pflichtangabe fehlt: {req['name']} ({req['legal_ref']})", + legal_ref=req["legal_ref"], + expected=req["name"], + )) + + return findings + + +def check_impressum_mandatory_content( + impressum_text: str, +) -> list[MandatoryFinding]: + """Check if Impressum contains all mandatory info per §5 TMG.""" + findings = [] + text_lower = impressum_text.lower() + + for req in MANDATORY_IMPRESSUM_CONTENT: + found = any(re.search(kw, text_lower) for kw in req["keywords"]) + if not found: + findings.append(MandatoryFinding( + code=f"IMP-CONTENT-{req['id'].upper()}", + severity=req["severity"], + category="info_missing", + text=f"Impressum: {req['name']} fehlt ({req['legal_ref']})", + legal_ref=req["legal_ref"], + expected=req["name"], + )) + + return findings