""" Doc-Anchor-Locator — fuer ein Finding den passendsten Einfuege-Ort im existierenden Dokument finden. Primary strategy: BGE-M3 Embedding-Match zwischen einer pro-Finding Anchor-Query und allen Absaetzen des Docs. Echtes semantisches Matching (BMW schreibt "Verarbeiter" statt "Auftragsverarbeiter" → Keyword waere out, Embedding catches it). Fallback: Keyword-Match (wenn kein Embedding-Service erreichbar). Output pro Anchor: - anchor_phrase : Originaltext-Auszug - position_hint : "Nach Absatz X von Y: '...'" - confidence : 'high' | 'medium' | 'low' - score : float (cosine similarity oder keyword-rank) - method : 'embedding' | 'keyword' | 'fallback' """ from __future__ import annotations import logging import math import os import re import threading from typing import Iterable import httpx logger = logging.getLogger(__name__) EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087") # Pro Finding-Typ eine semantisch reiche Anchor-Query, die der Embedding- # Matcher gegen den Doc-Text wirft. Reicher als die kurze MC-check_question. # Sucht NICHT den Pflichttext selbst (der fehlt ja) sondern den Absatz wo # der Fix HINEIN-soll — also den thematisch verwandten Kontext. _ANCHOR_QUERIES: list[tuple[str, str, str]] = [ # (finding_label_partial, anchor_query, fallback_hint) ( "Auftragsverarbeiter erwaehnt", "Empfaenger der Daten Verarbeiter Dienstleister Cloud Hosting CRM " "Auftragsverarbeitung Weitergabe Datenuebermittlung an Dritte", "Im Abschnitt 'Empfaenger' oder 'Datenuebermittlung'", ), ( "Automatisierte Entscheidungen", "Betroffenenrechte automatisierte Entscheidung Profiling Logik " "Tragweite Auswirkung Art. 22 DSGVO", "Am Ende des Abschnitts 'Betroffenenrechte'", ), ( "Konkrete Aufsichtsbehoerde", "Beschwerderecht Datenschutzaufsicht Aufsichtsbehoerde Beschwerde " "bei der Behoerde einreichen Recht auf Beschwerde", "Im Abschnitt 'Beschwerderecht'", ), ( "Angemessenheitsbeschluss", "Drittlandtransfer USA Standardvertragsklauseln SCC DPF Data Privacy " "Framework Angemessenheitsbeschluss internationale Datenuebermittlung", "Im Abschnitt 'Drittlandtransfer'", ), ( "Anschrift des Verantwortlichen", "Verantwortlicher Verantwortliche Stelle Datenschutz Betreiber dieser " "Website Firma Anschrift Kontakt", "Am Anfang der Datenschutzerklaerung / Cookie-Richtlinie", ), ( "Konkrete Cookie-Namen", "Welche Cookies verwenden wir Cookie-Tabelle Liste der Cookies " "Cookie-Kategorien Auflistung der Cookies Name Anbieter Zweck", "Im Abschnitt 'Welche Cookies verwenden wir?'", ), ( "Konkrete Anbieter/Dienste", "Drittanbieter Dienste Anbieter wir nutzen folgende Dienste " "Empfaenger der Cookie-Daten Liste der Dienstleister", "In der Drittanbieter-Liste der Cookie-Richtlinie", ), ( "Analytics-/Statistik-Tools konkret benannt", "Statistik Analytics Reichweitenmessung Webanalyse Tracking " "Google Analytics Matomo Adobe Analytics", "Im Abschnitt 'Statistik / Analyse-Cookies'", ), ( "Konkrete Speicherdauer", "Speicherdauer Lebensdauer wie lange Ablauf Cookie-Tabelle Spalte " "Speicherdauer pro Cookie", "In der Cookie-Tabelle pro Eintrag", ), ( "Opt-Out-Links", "Widerruf widersprechen deaktivieren Cookie-Einstellungen aendern " "Opt-Out Einstellungen anpassen", "Im Abschnitt 'Wie kann ich widersprechen?'", ), ( "Privacy-Policy-Links", "Datenschutzerklaerung des Drittanbieters Privacy Policy Link auf " "Datenschutzhinweise der Drittanbieter", "Im Drittanbieter-Listing der Cookie-Richtlinie", ), ( "Verbraucherstreitbeilegung", "Online-Streitbeilegung OS-Plattform Verbraucherschlichtungsstelle " "Streitbeilegung Verbraucher", "Am Ende des Impressums (eigener Abschnitt 'Streitbeilegung')", ), ( "Rechtswidriger Haftungsausschluss", "Haftung Disclaimer wir distanzieren uns Links zu externen Webseiten " "Haftungsausschluss Drittinhalte", "Am Ende des Impressums (Disclaimer-Absatz)", ), ( "Name der vertretungsberechtigten", "Vertreten durch Vorstand Geschaeftsfuehrung Geschaeftsfuehrer " "vertretungsberechtigt Repraesentant", "Im Impressum nach Firmenname + Anschrift", ), ( "Zustaendige Kammer", "Berufsrecht Kammer Aufsichtsbehoerde berufsrechtliche Angaben " "zustaendige Kammer", "Im Impressum im Abschnitt 'Berufsrechtliche Angaben'", ), ( "Drittlaender", "Drittland Drittlaender USA Indien China internationale Datenuebermittlung " "Datenexport in Nicht-EU-Staaten", "Im Abschnitt 'Drittlandtransfer'", ), ( "Schutzgarantien", "Schutzgarantien Schutzvorkehrungen Schutzmassnahmen SCC " "Standardvertragsklauseln einsehen Anforderung", "Im Abschnitt 'Drittlandtransfer / Schutzgarantien'", ), ] # ─── Thread-local Cache fuer Doc-Chunks + Embeddings ─────────────── # Pro Compliance-Check-Run werden die Doc-Texte einmal embedded und im # Thread-Local-Storage gehalten, damit mehrere Findings im selben Run # nicht jeweils neu embedded werden. _tls = threading.local() def _get_cache() -> dict: if not hasattr(_tls, "cache"): _tls.cache = {} return _tls.cache def reset_cache() -> None: """Per-request-cache leeren (sollte am Start jedes Doc-Check-Runs aufgerufen werden, damit Vorgaenger-Daten kein Leak verursachen).""" if hasattr(_tls, "cache"): _tls.cache = {} # ─── Helfer ──────────────────────────────────────────────────────── def _normalize(text: str) -> str: return (text or "").lower().replace("\xad", "").replace("ß", "ss") def _split_paragraphs(text: str) -> list[str]: """Split a doc into paragraphs (by double newline, fallback single).""" if not text: return [] paras = re.split(r"\n\s*\n", text) if len(paras) < 3: paras = re.split(r"(?<=[\.\?\!])\s+(?=[A-ZÄÖÜ])", text) return [p.strip() for p in paras if p.strip()] def _embed_sync(texts: list[str], timeout: float = 60.0, batch_size: int = 32) -> list[list[float]]: """Synchroner Batch-Embed-Call (Anchor-Lokalisierung laeuft in Sync-HTML-Render, nicht in async context).""" if not texts: return [] out: list[list[float]] = [] with httpx.Client(timeout=timeout) as client: for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] try: r = client.post(f"{EMBEDDING_URL}/embed", json={"texts": batch}) r.raise_for_status() out.extend(r.json().get("embeddings") or []) except Exception as e: logger.warning("Anchor embed sub-batch [%d-%d] failed: %s", i, i + len(batch), e) out.extend([[] for _ in batch]) return out def _cosine(a: list[float], b: list[float]) -> float: if not a or not b or len(a) != len(b): return 0.0 dot = sum(x * y for x, y in zip(a, b)) na = math.sqrt(sum(x * x for x in a)) nb = math.sqrt(sum(y * y for y in b)) if na == 0 or nb == 0: return 0.0 return dot / (na * nb) def _doc_paragraphs_and_vectors( doc_id: str, doc_text: str, ) -> tuple[list[str], list[list[float]]]: """Lazy-cache: Absaetze + Vektoren pro Doc-ID. Wird genau einmal pro Doc und Run berechnet.""" cache = _get_cache() if doc_id in cache: return cache[doc_id] paras = _split_paragraphs(doc_text) if not paras: cache[doc_id] = ([], []) return cache[doc_id] vecs = _embed_sync(paras) cache[doc_id] = (paras, vecs) return cache[doc_id] def _keyword_fallback(fl: str, doc_text: str) -> dict | None: """Fallback wenn Embedding-Service ausfaellt oder zu wenig Trefferqualitaet.""" # Use the old _ANCHOR_QUERIES list — extract just the fallback hint for label_partial, _query, fallback_hint in _ANCHOR_QUERIES: if _normalize(label_partial) in fl: return { "anchor_phrase": None, "position_hint": fallback_hint, "confidence": "low", "method": "fallback", } return None def locate_anchor( finding_label: str, doc_text: str, doc_id: str | None = None, ) -> dict | None: """Fuer ein Finding den passendsten Anker-Absatz im Doc-Text finden. Primary: Embedding-Match. Sekundaer: Keyword-Hits als Bonus. Fallback: rein keyword-basiert wenn Embedding-Service nicht erreichbar ist. `doc_id` ist ein cache-key (z.B. doc_type oder url). Wenn None, wird aus dem doc_text-Hash abgeleitet. """ if not doc_text or not finding_label: return None fl = _normalize(finding_label) # Welche Anchor-Query matched dieses Finding? query = None fallback_hint = None matched_label = None for label_partial, q, fb in _ANCHOR_QUERIES: if _normalize(label_partial) in fl: query, fallback_hint, matched_label = q, fb, label_partial break if not query: return None doc_id = doc_id or f"doc-{hash(doc_text) & 0xffffffff:08x}" # 1) Embedding-Match paras, doc_vecs = _doc_paragraphs_and_vectors(doc_id, doc_text) if not paras: return None embeddings_available = any(v for v in doc_vecs) if not embeddings_available: return _keyword_fallback(fl, doc_text) try: q_vec = _embed_sync([query])[0] if query else None except Exception: q_vec = None if not q_vec: return _keyword_fallback(fl, doc_text) # Per-Absatz Score = cosine + Heading-Bonus best_idx = -1 best_score = 0.0 for i, (p, dv) in enumerate(zip(paras, doc_vecs)): if not dv: continue sim = _cosine(q_vec, dv) # Heading-Bonus: kurze Absaetze + Markdown-Heading-Marker if len(p.split()) <= 8 or p.strip().startswith("#"): sim += 0.05 if sim > best_score: best_score = sim best_idx = i # Konfidenz-Schwellen — kalibriert anhand BMW-Run if best_idx < 0 or best_score < 0.40: # Zu schwacher Match — Fallback verwenden return { "anchor_phrase": None, "position_hint": fallback_hint, "confidence": "low", "score": round(best_score, 3) if best_idx >= 0 else 0, "method": "embedding-no-match", } if best_score >= 0.62: confidence = "high" elif best_score >= 0.50: confidence = "medium" else: confidence = "low" anchor = paras[best_idx] words = anchor.split() snippet = " ".join(words[:30]) + ("…" if len(words) > 30 else "") return { "anchor_phrase": snippet, "anchor_index": best_idx, "total_paragraphs": len(paras), "position_hint": f"Nach Absatz {best_idx + 1} von {len(paras)}: '{snippet}'", "confidence": confidence, "score": round(best_score, 3), "method": "embedding", } def annotate_findings_with_anchors( findings: Iterable[dict], doc_text: str, doc_id: str | None = None, ) -> list[dict]: """Pro Finding den Anchor suchen und das Dict um 'anchor' erweitern.""" out = [] for f in findings: a = locate_anchor(f.get("label") or f.get("title") or "", doc_text, doc_id) out.append({**f, "anchor": a}) return out