From 0ba76d041ac30d649ef0643500c4402423357c07 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 29 Apr 2026 11:55:26 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20DSE=20parser=20+=20matcher=20=E2=80=94?= =?UTF-8?q?=20textblock=20references=20in=20scan=20findings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dse_parser.py: HTML → structured sections (heading, number, content, parent) Uses heading hierarchy (h1-h4) with regex fallback - dse_matcher.py: matches detected services against DSE sections Exact name → provider → category matching with insertion point suggestion - agent_scan_routes: TextReference model in findings (original text, section, paragraph, correction type, insert_after) Enables showing: "Google Analytics not found in DSE, insert after Section 2.4 Cookies und Tracking" Co-Authored-By: Claude Opus 4.6 (1M context) --- .../compliance/api/agent_scan_routes.py | 70 +++++- .../compliance/services/dse_matcher.py | 189 +++++++++++++++ .../compliance/services/dse_parser.py | 224 ++++++++++++++++++ 3 files changed, 478 insertions(+), 5 deletions(-) create mode 100644 backend-compliance/compliance/services/dse_matcher.py create mode 100644 backend-compliance/compliance/services/dse_parser.py diff --git a/backend-compliance/compliance/api/agent_scan_routes.py b/backend-compliance/compliance/api/agent_scan_routes.py index 7b00bfc..935656c 100644 --- a/backend-compliance/compliance/api/agent_scan_routes.py +++ b/backend-compliance/compliance/api/agent_scan_routes.py @@ -16,6 +16,8 @@ from pydantic import BaseModel from compliance.services.website_scanner import scan_website, DetectedService from compliance.services.dse_service_extractor import extract_dse_services, compare_services from compliance.services.smtp_sender import send_email +from compliance.services.dse_parser import parse_dse +from compliance.services.dse_matcher import build_text_references, TextReference logger = logging.getLogger(__name__) @@ -49,11 +51,27 @@ class ServiceInfo(BaseModel): status: str # "ok", "undocumented", "outdated" +class TextReferenceModel(BaseModel): + found: bool = False + source_url: str = "" + document_type: str = "Datenschutzerklaerung" + section_heading: str = "" + section_number: str = "" + parent_section: str = "" + paragraph_index: int = 0 + original_text: str = "" + issue: str = "" + correction_type: str = "" + correction_text: str = "" + insert_after: str = "" + + class ScanFinding(BaseModel): code: str severity: str text: str correction: str = "" + text_reference: TextReferenceModel | None = None class ScanResponse(BaseModel): @@ -87,14 +105,22 @@ async def scan_website_endpoint(req: ScanRequest): dse_services = await extract_dse_services(dse_text) if dse_text else [] logger.info("DSE mentions %d services", len(dse_services)) - # Step 4: SOLL/IST comparison + # Step 4: Parse DSE into structured sections + dse_html = await _fetch_dse_html(req.url, scan.pages_scanned) + dse_sections = parse_dse(dse_html, req.url) if dse_html else [] + logger.info("Parsed %d DSE sections", len(dse_sections)) + + # Step 5: SOLL/IST comparison detected_dicts = [_service_to_dict(s) for s in scan.detected_services] comparison = compare_services(detected_dicts, dse_services) - # Step 5: Generate findings - services_info, findings = _build_findings(comparison, scan, is_live) + # Step 6: Build TextReferences for each detected service + text_refs = build_text_references(detected_dicts, dse_services, dse_sections, req.url) - # Step 6: Generate corrections for pre-launch mode + # Step 7: Generate findings with text references + services_info, findings = _build_findings(comparison, scan, is_live, text_refs) + + # Step 8: Generate corrections for pre-launch mode if not is_live and findings: await _add_corrections(findings, dse_text) @@ -149,6 +175,24 @@ async def _fetch_dse_text(url: str, scanned_pages: list[str]) -> str: return "" +async def _fetch_dse_html(url: str, scanned_pages: list[str]) -> str: + """Fetch the raw HTML of the privacy policy page (for structured parsing).""" + import re + dse_url = None + for page in scanned_pages: + if re.search(r"datenschutz|privacy|dsgvo", page, re.IGNORECASE): + dse_url = page + break + if not dse_url: + dse_url = url + try: + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + resp = await client.get(dse_url, headers={"User-Agent": "BreakPilot-Compliance-Agent/1.0"}) + return resp.text + except Exception: + return "" + + def _service_to_dict(svc: DetectedService) -> dict: return { "id": svc.id, "name": svc.name, "category": svc.category, @@ -159,11 +203,25 @@ def _service_to_dict(svc: DetectedService) -> dict: def _build_findings( - comparison: dict, scan, is_live: bool, + comparison: dict, scan, is_live: bool, text_refs: dict | None = None, ) -> tuple[list[ServiceInfo], list[ScanFinding]]: """Build service info list and findings from comparison.""" services = [] findings = [] + text_refs = text_refs or {} + + def _get_ref(svc_id: str) -> TextReferenceModel | None: + ref = text_refs.get(svc_id) + if not ref: + return None + return TextReferenceModel( + found=ref.found, source_url=ref.source_url, + document_type=ref.document_type, section_heading=ref.section_heading, + section_number=ref.section_number, parent_section=ref.parent_section, + paragraph_index=ref.paragraph_index, original_text=ref.original_text, + issue=ref.issue, correction_type=ref.correction_type, + correction_text=ref.correction_text, insert_after=ref.insert_after, + ) # Undocumented services (on website, NOT in DSE) for svc in comparison["undocumented"]: @@ -175,12 +233,14 @@ def _build_findings( legal_ref=svc.get("legal_ref", ""), in_dse=False, status="undocumented", )) severity = "HIGH" if is_live else "MEDIUM" + ref = _get_ref(svc.get("id", "")) findings.append(ScanFinding( code=f"DSE-MISSING-{svc['id'].upper()}", severity=severity, text=f"{svc['name']} ({svc.get('provider', '')}, {svc.get('country', '')}) " f"ist auf der Website eingebunden aber NICHT in der Datenschutzerklaerung " f"dokumentiert (Art. 13 DSGVO).", + text_reference=ref, )) # Documented services (OK) diff --git a/backend-compliance/compliance/services/dse_matcher.py b/backend-compliance/compliance/services/dse_matcher.py new file mode 100644 index 0000000..61c51b4 --- /dev/null +++ b/backend-compliance/compliance/services/dse_matcher.py @@ -0,0 +1,189 @@ +""" +DSE Matcher — matches detected services against DSE sections and +generates TextReferences with original text, position, and corrections. +""" + +import logging +import re +from dataclasses import dataclass + +from compliance.services.dse_parser import DSESection, find_section_by_content, find_section_by_category + +logger = logging.getLogger(__name__) + +# Category → typical DSE section heading keywords +CATEGORY_SECTION_MAP = { + "tracking": ["cookie", "tracking", "webanalyse", "analytics", "statistik", "reichweitenmessung"], + "marketing": ["marketing", "werbung", "newsletter", "remarketing", "werbe"], + "payment": ["zahlung", "payment", "bezahl", "zahlungsabwicklung", "zahlungsdienst"], + "chatbot": ["chat", "kommunikation", "kundenservice", "kontakt", "livechat"], + "cdn": ["hosting", "bereitstellung", "technisch", "infrastruktur", "content delivery"], + "other": ["sonstig", "weitere", "dritte", "extern", "dienstleister"], +} + + +@dataclass +class TextReference: + """Reference to a specific text block in the DSE.""" + found: bool + source_url: str = "" + document_type: str = "Datenschutzerklaerung" + section_heading: str = "" + section_number: str = "" + parent_section: str = "" + paragraph_index: int = 0 + original_text: str = "" + issue: str = "" # "missing", "incomplete", "incorrect" + correction_type: str = "" # "insert", "replace", "append" + correction_text: str = "" + insert_after: str = "" + + +def match_service_to_dse( + service_name: str, + service_category: str, + sections: list[DSESection], + url: str = "", +) -> TextReference: + """Find where a service is mentioned in the DSE and build a TextReference.""" + # Step 1: Search for exact service name + section = find_section_by_content(sections, service_name) + + if section: + # Found — extract the relevant paragraph + original = _extract_relevant_paragraph(section.content, service_name) + return TextReference( + found=True, + source_url=url, + section_heading=section.heading, + section_number=section.section_number, + parent_section=section.parent_heading, + paragraph_index=_find_paragraph_index(section.content, service_name), + original_text=original, + issue="", # Found and present — caller determines if complete + ) + + # Step 2: Search for provider name (e.g., "Google" for "Google Analytics") + provider = service_name.split()[0] if " " in service_name else service_name + section = find_section_by_content(sections, provider) + + if section: + original = _extract_relevant_paragraph(section.content, provider) + return TextReference( + found=True, + source_url=url, + section_heading=section.heading, + section_number=section.section_number, + parent_section=section.parent_heading, + paragraph_index=_find_paragraph_index(section.content, provider), + original_text=original, + issue="incomplete", # Provider mentioned but not specific service + ) + + # Step 3: Not found — suggest insertion point + insert_section = find_section_by_category(sections, service_category) + insert_after = insert_section.heading if insert_section else "" + + # If no category match, find the last "Cookies"/"Tracking" or "Sonstiges" section + if not insert_after: + for s in reversed(sections): + h = s.heading.lower() + if any(kw in h for kw in ["cookie", "datenschutz", "daten"]): + insert_after = s.heading + break + + return TextReference( + found=False, + source_url=url, + document_type="Datenschutzerklaerung", + issue="missing", + correction_type="insert", + insert_after=insert_after, + ) + + +def build_text_references( + detected_services: list[dict], + dse_services: list[dict], + sections: list[DSESection], + url: str = "", +) -> dict[str, TextReference]: + """Build TextReferences for all detected services. + + Returns dict: service_id → TextReference + """ + refs: dict[str, TextReference] = {} + + for svc in detected_services: + service_id = svc.get("id", svc.get("name", "")) + service_name = svc.get("name", "") + category = svc.get("category", "other") + + ref = match_service_to_dse(service_name, category, sections, url) + + # Check if service is in the DSE SOLL list + dse_match = _find_in_dse_list(service_name, dse_services) + + if ref.found and dse_match: + ref.issue = "" # All good — documented and present + elif ref.found and not dse_match: + # Found in text but not in LLM extraction — still OK + ref.issue = "" + elif not ref.found: + ref.issue = "missing" + ref.correction_type = "insert" + + refs[service_id] = ref + + return refs + + +def _extract_relevant_paragraph(content: str, search_term: str) -> str: + """Extract the paragraph containing the search term.""" + search_lower = search_term.lower() + content_lower = content.lower() + + # Find position of search term + pos = content_lower.find(search_lower) + if pos == -1: + return content[:300] + + # Find sentence/paragraph boundaries + # Look backwards for paragraph break + start = max(0, content.rfind(".", 0, pos)) + if start > 0: + start += 2 # Skip ". " + else: + start = max(0, pos - 100) + + # Look forward for end of paragraph + end = content.find(".", pos + len(search_term)) + if end == -1 or end - pos > 500: + end = min(len(content), pos + 300) + else: + end += 1 # Include the period + + return content[start:end].strip() + + +def _find_paragraph_index(content: str, search_term: str) -> int: + """Find which paragraph (1-based) contains the search term.""" + paragraphs = re.split(r"\n\n|\n(?=[A-Z])", content) + search_lower = search_term.lower() + for i, para in enumerate(paragraphs, 1): + if search_lower in para.lower(): + return i + return 0 + + +def _find_in_dse_list(service_name: str, dse_services: list[dict]) -> dict | None: + """Check if a service appears in the LLM-extracted DSE service list.""" + name_lower = service_name.lower() + for svc in dse_services: + dse_name = svc.get("name", "").lower() + if name_lower in dse_name or dse_name in name_lower: + return svc + # Check first word (provider match) + if name_lower.split()[0] in dse_name: + return svc + return None diff --git a/backend-compliance/compliance/services/dse_parser.py b/backend-compliance/compliance/services/dse_parser.py new file mode 100644 index 0000000..f10a201 --- /dev/null +++ b/backend-compliance/compliance/services/dse_parser.py @@ -0,0 +1,224 @@ +""" +DSE Parser — parses privacy policy HTML into structured sections. + +Extracts headings, section numbers, content blocks and builds a +hierarchical structure that enables precise text references. +""" + +import logging +import re +from dataclasses import dataclass, field +from html.parser import HTMLParser + +logger = logging.getLogger(__name__) + + +@dataclass +class DSESection: + """A section in a privacy policy.""" + heading: str + heading_level: int # 1-4 + section_number: str # "2.5" or "" if no number + content: str # Plain text content + html: str # Original HTML content + parent_heading: str = "" + url: str = "" + element_id: str = "" + paragraph_count: int = 0 + + +class _HeadingExtractor(HTMLParser): + """Extract headings and their content from HTML.""" + + def __init__(self): + super().__init__() + self.sections: list[dict] = [] + self._current_tag = "" + self._in_heading = False + self._heading_level = 0 + self._heading_text = "" + self._heading_id = "" + self._content_parts: list[str] = [] + self._html_parts: list[str] = [] + self._skip_tags = {"script", "style", "nav", "footer", "header"} + self._skip_depth = 0 + self._p_count = 0 + + def handle_starttag(self, tag, attrs): + attrs_dict = dict(attrs) + if tag in self._skip_tags: + self._skip_depth += 1 + return + if self._skip_depth > 0: + return + + if tag in ("h1", "h2", "h3", "h4"): + # Save previous section + if self._heading_text: + self._save_section() + self._in_heading = True + self._heading_level = int(tag[1]) + self._heading_text = "" + self._heading_id = attrs_dict.get("id", "") + self._content_parts = [] + self._html_parts = [] + self._p_count = 0 + + if tag == "p": + self._p_count += 1 + + # Reconstruct HTML + attr_str = " ".join(f'{k}="{v}"' for k, v in attrs) + self._html_parts.append(f"<{tag}{' ' + attr_str if attr_str else ''}>") + + def handle_endtag(self, tag): + if tag in self._skip_tags and self._skip_depth > 0: + self._skip_depth -= 1 + return + if self._skip_depth > 0: + return + + if tag in ("h1", "h2", "h3", "h4"): + self._in_heading = False + + self._html_parts.append(f"") + + def handle_data(self, data): + if self._skip_depth > 0: + return + if self._in_heading: + self._heading_text += data.strip() + else: + self._content_parts.append(data) + self._html_parts.append(data) + + def _save_section(self): + if not self._heading_text: + return + content = " ".join(self._content_parts) + content = re.sub(r"\s+", " ", content).strip() + self.sections.append({ + "heading": self._heading_text.strip(), + "heading_level": self._heading_level, + "element_id": self._heading_id, + "content": content, + "html": "".join(self._html_parts), + "paragraph_count": self._p_count, + }) + + def finalize(self): + """Call after feeding all data to save the last section.""" + if self._heading_text: + self._save_section() + + +def parse_dse(html: str, url: str = "") -> list[DSESection]: + """Parse privacy policy HTML into structured sections.""" + extractor = _HeadingExtractor() + try: + extractor.feed(html) + extractor.finalize() + except Exception as e: + logger.warning("HTML parsing failed, falling back to regex: %s", e) + return _regex_fallback(html, url) + + if not extractor.sections: + return _regex_fallback(html, url) + + # Build parent hierarchy + sections: list[DSESection] = [] + parent_stack: list[str] = [""] # Stack of parent headings by level + + for raw in extractor.sections: + heading = raw["heading"] + level = raw["heading_level"] + + # Extract section number (e.g., "2.5" from "2.5 Webanalyse") + num_match = re.match(r"^(\d+(?:\.\d+)*)\s*[.:]?\s*", heading) + section_number = num_match.group(1) if num_match else "" + + # Track parent headings + while len(parent_stack) > level: + parent_stack.pop() + parent = parent_stack[-1] if parent_stack else "" + parent_stack.append(heading) + + sections.append(DSESection( + heading=heading, + heading_level=level, + section_number=section_number, + content=raw["content"][:2000], # Cap content length + html=raw["html"][:3000], + parent_heading=parent, + url=url, + element_id=raw["element_id"], + paragraph_count=raw["paragraph_count"], + )) + + logger.info("Parsed DSE: %d sections from %s", len(sections), url) + return sections + + +def _regex_fallback(html: str, url: str) -> list[DSESection]: + """Fallback parser using regex when HTML parsing fails.""" + # Strip scripts and styles + clean = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) + + sections = [] + # Find all headings + for match in re.finditer(r"]*(?:id=[\"']([^\"']*)[\"'])?[^>]*>(.*?)", clean, re.DOTALL | re.IGNORECASE): + level = int(match.group(1)) + elem_id = match.group(2) or "" + heading = re.sub(r"<[^>]+>", "", match.group(3)).strip() + + # Get content until next heading + start = match.end() + next_heading = re.search(r"]+>", " ", content) + content = re.sub(r"\s+", " ", content).strip() + + num_match = re.match(r"^(\d+(?:\.\d+)*)", heading) + + sections.append(DSESection( + heading=heading, + heading_level=level, + section_number=num_match.group(1) if num_match else "", + content=content[:2000], + html="", + url=url, + element_id=elem_id, + )) + + return sections + + +def find_section_by_content(sections: list[DSESection], search_text: str) -> DSESection | None: + """Find the section that contains specific text.""" + search_lower = search_text.lower() + for section in sections: + if search_lower in section.content.lower(): + return section + return None + + +def find_section_by_category(sections: list[DSESection], category: str) -> DSESection | None: + """Find the section most likely to contain a service category.""" + category_keywords = { + "tracking": ["cookie", "tracking", "webanalyse", "analytics", "statistik"], + "marketing": ["marketing", "werbung", "newsletter", "remarketing"], + "payment": ["zahlung", "payment", "bezahlung", "zahlungsabwicklung"], + "chatbot": ["chat", "kommunikation", "kundenservice", "kontakt"], + "cdn": ["hosting", "bereitstellung", "technisch", "infrastruktur", "cdn"], + "other": ["sonstig", "weitere", "dritte", "extern"], + } + keywords = category_keywords.get(category, category_keywords["other"]) + + for section in sections: + heading_lower = section.heading.lower() + content_lower = section.content.lower()[:500] + for kw in keywords: + if kw in heading_lower or kw in content_lower: + return section + return None