""" DSE Matcher — matches detected services against DSE sections and generates TextReferences with original text, position, and corrections. """ import logging import re from dataclasses import dataclass from compliance.services.dse_parser import DSESection, find_section_by_content, find_section_by_category logger = logging.getLogger(__name__) # Category → typical DSE section heading keywords CATEGORY_SECTION_MAP = { "tracking": ["cookie", "tracking", "webanalyse", "analytics", "statistik", "reichweitenmessung"], "marketing": ["marketing", "werbung", "newsletter", "remarketing", "werbe"], "payment": ["zahlung", "payment", "bezahl", "zahlungsabwicklung", "zahlungsdienst"], "chatbot": ["chat", "kommunikation", "kundenservice", "kontakt", "livechat"], "cdn": ["hosting", "bereitstellung", "technisch", "infrastruktur", "content delivery"], "other": ["sonstig", "weitere", "dritte", "extern", "dienstleister"], } @dataclass class TextReference: """Reference to a specific text block in the DSE.""" found: bool source_url: str = "" document_type: str = "Datenschutzerklaerung" section_heading: str = "" section_number: str = "" parent_section: str = "" paragraph_index: int = 0 original_text: str = "" issue: str = "" # "missing", "incomplete", "incorrect" correction_type: str = "" # "insert", "replace", "append" correction_text: str = "" insert_after: str = "" def match_service_to_dse( service_name: str, service_category: str, sections: list[DSESection], url: str = "", ) -> TextReference: """Find where a service is mentioned in the DSE and build a TextReference.""" # Step 1: Search for exact service name section = find_section_by_content(sections, service_name) if section: # Found — extract the relevant paragraph original = _extract_relevant_paragraph(section.content, service_name) return TextReference( found=True, source_url=url, section_heading=section.heading, section_number=section.section_number, parent_section=section.parent_heading, paragraph_index=_find_paragraph_index(section.content, service_name), original_text=original, issue="", # Found and present — caller determines if complete ) # Step 2: Search for provider name (e.g., "Google" for "Google Analytics") # But only if the provider name is specific enough — avoid "Google" matching YouTube provider = service_name.split()[0] if " " in service_name else service_name if len(provider) < 4 or provider.lower() in ("the", "a", "an"): provider = service_name # Too short/generic, use full name section = find_section_by_content(sections, provider) # Verify: the section must actually be about THIS service, not just mention the provider if section and provider.lower() != service_name.lower(): # Check if the full service name or a close variant is in the section content_lower = section.content.lower() service_words = service_name.lower().split() # At least 2 words of the service name must match (not just "Google") matching_words = sum(1 for w in service_words if w in content_lower) if matching_words < 2 and service_name.lower() not in content_lower: section = None # False match — provider name found but wrong context if section: original = _extract_relevant_paragraph(section.content, provider) return TextReference( found=True, source_url=url, section_heading=section.heading, section_number=section.section_number, parent_section=section.parent_heading, paragraph_index=_find_paragraph_index(section.content, provider), original_text=original, issue="incomplete", # Provider mentioned but not specific service ) # Step 3: Not found — suggest insertion point insert_section = find_section_by_category(sections, service_category) insert_after = insert_section.heading if insert_section else "" # If no category match, find the last "Cookies"/"Tracking" or "Sonstiges" section if not insert_after: for s in reversed(sections): h = s.heading.lower() if any(kw in h for kw in ["cookie", "datenschutz", "daten"]): insert_after = s.heading break return TextReference( found=False, source_url=url, document_type="Datenschutzerklaerung", issue="missing", correction_type="insert", insert_after=insert_after, ) def build_text_references( detected_services: list[dict], dse_services: list[dict], sections: list[DSESection], url: str = "", ) -> dict[str, TextReference]: """Build TextReferences for all detected services. Returns dict: service_id → TextReference """ refs: dict[str, TextReference] = {} for svc in detected_services: service_id = svc.get("id", svc.get("name", "")) service_name = svc.get("name", "") category = svc.get("category", "other") ref = match_service_to_dse(service_name, category, sections, url) # Check if service is in the DSE SOLL list dse_match = _find_in_dse_list(service_name, dse_services) if ref.found and dse_match: ref.issue = "" # All good — documented and present elif ref.found and not dse_match: # Found in text but not in LLM extraction — still OK ref.issue = "" elif not ref.found: ref.issue = "missing" ref.correction_type = "insert" refs[service_id] = ref return refs def _extract_relevant_paragraph(content: str, search_term: str) -> str: """Extract the paragraph containing the search term.""" search_lower = search_term.lower() content_lower = content.lower() # Find position of search term pos = content_lower.find(search_lower) if pos == -1: return content[:300] # Find sentence/paragraph boundaries # Look backwards for paragraph break start = max(0, content.rfind(".", 0, pos)) if start > 0: start += 2 # Skip ". " else: start = max(0, pos - 100) # Look forward for end of paragraph end = content.find(".", pos + len(search_term)) if end == -1 or end - pos > 500: end = min(len(content), pos + 300) else: end += 1 # Include the period return content[start:end].strip() def _find_paragraph_index(content: str, search_term: str) -> int: """Find which paragraph (1-based) contains the search term.""" paragraphs = re.split(r"\n\n|\n(?=[A-Z])", content) search_lower = search_term.lower() for i, para in enumerate(paragraphs, 1): if search_lower in para.lower(): return i return 0 def _find_in_dse_list(service_name: str, dse_services: list[dict]) -> dict | None: """Check if a service appears in the LLM-extracted DSE service list.""" name_lower = service_name.lower() for svc in dse_services: dse_name = svc.get("name", "").lower() if name_lower in dse_name or dse_name in name_lower: return svc # Check first word (provider match) if name_lower.split()[0] in dse_name: return svc return None