From f6d018234b527de6499946ebc2c689771891a837 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 21 Jun 2026 11:15:52 +0200 Subject: [PATCH] feat(dse): recover v3 DSE engine from container + wire into live check path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The calibrated DSE engine (4-layer: regex-boost / keyword / BGE-M3 embedding recall @0.65 / semantic-validator) existed ONLY in the running macmini container (docker cp'd, never committed) — at risk of loss on any container rebuild. This recovers it into git and wires it into the live check path. - Recover dse/{agent,v3_engine,_embedding_recall,_classification_gate, regex_boost,mcs,deep_check}.py. DSEAgent (v3, BaseSpecialistAgent) replaces the keyword-only stub: delegates MC-loading to the main engine (rag_document_checker._load_controls), deterministic cached embedding recall (reachability-gated), semantic-validator LLM layer honoring skip_llm, third-country -> HIGH on documented transfer. - Wire "dse" into _agent_outputs._TOPIC_AGENTS -> live check emits a validated DSE tab (was snapshot/legacy-only). - Tests rewritten for v3 (DB/embedding/LLM stubbed offline): regex-boost detection, embedding-recall reachability guard, result->Finding conversion, third-country HIGH; topic-wiring asserts "dse". - deep_check.py recovered for preservation (alternate LLM-judge path, unwired). Runtime data deps for full live behavior (note for prod): doc_check_controls in DB + /data/mc_classification.db embedding sidecar + embedding-service; all degrade gracefully (keyword layer carries) if absent. dev-only, no deploy. Co-Authored-By: Claude Opus 4.7 --- .../api/agent_check/_agent_outputs.py | 1 + .../dse/_classification_gate.py | 80 +++++ .../dse/_embedding_recall.py | 170 +++++++++++ .../services/specialist_agents/dse/agent.py | 289 +++++++++++++++++- .../specialist_agents/dse/deep_check.py | 129 ++++++++ .../services/specialist_agents/dse/mcs.py | 78 +++++ .../specialist_agents/dse/regex_boost.py | 179 +++++++++++ .../specialist_agents/dse/v3_engine.py | 209 +++++++++++++ .../tests/test_agent_outputs_topics.py | 4 + .../compliance/tests/test_dse_agent.py | 127 +++++--- 10 files changed, 1202 insertions(+), 64 deletions(-) create mode 100644 backend-compliance/compliance/services/specialist_agents/dse/_classification_gate.py create mode 100644 backend-compliance/compliance/services/specialist_agents/dse/_embedding_recall.py create mode 100644 backend-compliance/compliance/services/specialist_agents/dse/deep_check.py create mode 100644 backend-compliance/compliance/services/specialist_agents/dse/mcs.py create mode 100644 backend-compliance/compliance/services/specialist_agents/dse/regex_boost.py create mode 100644 backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py diff --git a/backend-compliance/compliance/api/agent_check/_agent_outputs.py b/backend-compliance/compliance/api/agent_check/_agent_outputs.py index a343295f..17b7d284 100644 --- a/backend-compliance/compliance/api/agent_check/_agent_outputs.py +++ b/backend-compliance/compliance/api/agent_check/_agent_outputs.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) _TOPIC_AGENTS: dict[str, str] = { "impressum": "impressum", "agb": "agb", # v2: AGBAgent mit decision_method-Routing (71% FP -> ~0) + "dse": "dse", # v3: 4-Layer (Regex-Boost/Keyword/BGE-M3-Recall/Semantic) } _MIN_TEXT = 100 diff --git a/backend-compliance/compliance/services/specialist_agents/dse/_classification_gate.py b/backend-compliance/compliance/services/specialist_agents/dse/_classification_gate.py new file mode 100644 index 00000000..b4a42187 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/dse/_classification_gate.py @@ -0,0 +1,80 @@ +"""Applicability-Gate fuer den DSE-Scan. + +Schliesst Controls aus dem DSE-FINDINGS-Scan aus, die laut +`compliance.control_classification` NICHT gegen eine DSE laufen +('DSE' nicht in applicable_artifacts) UND sicher klassifiziert sind +(needs_review=false). Diese werden NICHT geloescht, sondern als +*organisatorische Checkliste* zurueckgegeben (Routing zu VVT/TOM/Audit). + +Fail-safe: unsichere Klassifikationen (needs_review=true) bleiben im +Findings-Scan. Defensiv: fehlt die Tabelle (z.B. Prod ohne Migration), +liefert das Gate ein leeres Dict -> es wird NICHT gefiltert. +""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +logger = logging.getLogger(__name__) + + +async def load_dse_gate(db_url: str = "") -> dict[str, dict[str, Any]]: + """Liefert {control_id: meta} fuer Controls, die aus dem DSE-Findings-Scan + auszuschliessen sind (hochsicher organisatorisch). Leeres Dict = kein Filter. + """ + dsn = (db_url or os.getenv("DATABASE_URL") + or os.getenv("COMPLIANCE_DATABASE_URL") or "") + if not dsn: + return {} + try: + import asyncpg + conn = await asyncpg.connect(dsn) + try: + rows = await conn.fetch( + """SELECT control_id, obligation_type, check_intent, + applicable_artifacts, reference_allowed + FROM compliance.control_classification + WHERE is_active AND NOT needs_review + AND NOT ('DSE' = ANY(applicable_artifacts))""") + finally: + await conn.close() + except Exception as e: # Tabelle fehlt / DB nicht erreichbar -> kein Filter + logger.info("dse classification gate inaktiv: %s", str(e)[:90]) + return {} + return { + r["control_id"]: { + "obligation_type": r["obligation_type"], + "check_intent": r["check_intent"], + "applicable_artifacts": list(r["applicable_artifacts"] or []), + "reference_allowed": r["reference_allowed"], + } + for r in rows if r["control_id"] + } + + +def apply_gate( + controls: list[dict[str, Any]], + gate: dict[str, dict[str, Any]], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Teilt geladene Controls in (findings_controls, organizational). + + findings_controls: laufen normal durch den DSE-Scan. + organizational: aus dem Scan genommen, als Checkliste ausgegeben + (control_id + title + Klassifikations-Metadaten fuer das Routing). + """ + kept: list[dict[str, Any]] = [] + organizational: list[dict[str, Any]] = [] + for c in controls: + cid = c.get("control_id") + meta = gate.get(cid) if cid else None + if meta: + organizational.append({ + "control_id": cid, + "title": c.get("title"), + **meta, + }) + else: + kept.append(c) + return kept, organizational diff --git a/backend-compliance/compliance/services/specialist_agents/dse/_embedding_recall.py b/backend-compliance/compliance/services/specialist_agents/dse/_embedding_recall.py new file mode 100644 index 00000000..c33eefa6 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/dse/_embedding_recall.py @@ -0,0 +1,170 @@ +"""Deterministische semantische Recall-Schicht für den DSE-Check. + +WARUM: Reines Keyword-Matching hat schlechten Recall (eine Pflicht lässt sich +auf viele Arten formulieren). Der frühere Regex-Boost war zu stumpf (über-passt +auf vollständigen Dokumenten). BGE-M3-Embeddings erkennen den SINN — und sind +dabei DETERMINISTISCH: ein Embedding-Modell ist eine feste Funktion, gleicher +Text → gleicher Vektor → gleiches Pass/Fail bei fester Schwelle. Reproduzierbar, +auditierbar, kein Keyword-Katalog, kein generatives LLM zur Checkzeit. + +Design: + - Doc wird EINMAL pro Dokument-Hash eingebettet (teuer: ~37s/64k-Doc), die + Per-Control-Scores werden gecacht (/data) → Folge-Checks sind instant. + - Reachability-Guard: ist der Embedding-Service nicht erreichbar, liefert die + Schicht leer zurück (der deterministische Keyword-Layer trägt) — KEIN Hang. + - Schwelle ist die einzige Stellschraube (DSE-Default 0.65, an BMW-GT kalibriert; + braucht Mehr-Firmen-Kalibrierung gegen Overfitting — bewusst konservativ). +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import os +import sqlite3 +from typing import Iterable + +logger = logging.getLogger(__name__) + +# DSE-Schwelle: an BMW-Haiku-GT vermessen (PASS-Median 0.648 / FAIL-Median 0.612). +# 0.65 = präzisionsfreundlich (wenig Über-Pass). Per ENV überschreibbar für +# spätere Mehr-Firmen-Kalibrierung, ohne Code-Änderung. +DSE_EMBED_THRESHOLD = float(os.getenv("DSE_EMBED_THRESHOLD", "0.65")) +_CACHE_PATH = os.getenv("DSE_EMBED_CACHE", "/data/dse_embed_cache.json") +_SIDECAR_DB = os.getenv("MC_CLASS_DB", "/data/mc_classification.db") + + +def _doc_hash(text: str) -> str: + return hashlib.sha256(text.encode("utf-8", "ignore")).hexdigest()[:20] + + +def _load_cache() -> dict: + try: + with open(_CACHE_PATH, encoding="utf-8") as f: + return json.load(f) + except Exception: + return {} + + +def _save_cache(cache: dict) -> None: + try: + # LRU-Kappung: max 30 Dokumente im Cache (Scores sind klein) + if len(cache) > 30: + for k in list(cache.keys())[:-30]: + cache.pop(k, None) + tmp = _CACHE_PATH + ".tmp" + with open(tmp, "w", encoding="utf-8") as f: + json.dump(cache, f) + os.replace(tmp, _CACHE_PATH) + except Exception as e: + logger.warning("dse embed-cache save failed: %s", e) + + +def _load_control_vecs(cids: Iterable[str]) -> dict[str, list[float]]: + from compliance.services.mc_embedding_matcher import _blob_to_vec + cid_list = [c for c in cids if c] + if not cid_list: + return {} + try: + with sqlite3.connect(_SIDECAR_DB) as c: + ph = ",".join("?" * len(cid_list)) + rows = c.execute( + f"SELECT control_id, embedding FROM mc_classification " + f"WHERE control_id IN ({ph}) AND doc_type='dse' " + f"AND check_type='text' AND embedding IS NOT NULL", + cid_list, + ).fetchall() + return {cid: _blob_to_vec(b) for cid, b in rows} + except Exception as e: + logger.warning("dse control-vec load failed: %s", e) + return {} + + +async def _embedding_reachable(timeout: float = 2.0) -> bool: + """Schneller TCP-Connect zum Embedding-Service. Verhindert, dass ein toter + Service den Check blockiert (macmini-Lehrer-Last hat das Embedding früher + verstopft).""" + url = os.getenv("EMBEDDING_URL", "http://embedding-service:8087") + hostport = url.split("://", 1)[-1].split("/", 1)[0] + host, _, port = hostport.partition(":") + port = int(port or "8087") + try: + fut = asyncio.open_connection(host, port) + reader, writer = await asyncio.wait_for(fut, timeout=timeout) + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + return True + except Exception as e: + logger.warning("dse embedding-service nicht erreichbar (%s) — " + "deterministischer Layer trägt", e) + return False + + +async def _compute_scores(text: str, all_cids: list[str]) -> dict[str, float]: + """Bettet das Dokument EINMAL ein und liefert max-Cosinus je Control.""" + from compliance.services.mc_embedding_matcher import ( + _chunk_text, _cosine, _embed_texts, DIM, + ) + mc_vecs = _load_control_vecs(all_cids) + if not mc_vecs: + return {} + chunks = _chunk_text(text) + if not chunks: + return {} + chunk_vecs = await _embed_texts(chunks) + chunk_vecs = [v for v in chunk_vecs if v and len(v) == DIM] + if not chunk_vecs: + return {} + return { + cid: round(float(max((_cosine(mv, cv) for cv in chunk_vecs), + default=0.0)), 4) + for cid, mv in mc_vecs.items() + } + + +async def embedding_recall( + text: str, + candidate_cids: Iterable[str], + threshold: float | None = None, + embed_timeout: float = 90.0, +) -> set[str]: + """Returns die candidate control_ids, die semantisch (>= Schwelle) im Doc + vorkommen. Deterministisch + gecacht. Leeres Set, wenn Service down/Fehler. + + candidate_cids: die im Keyword-Layer DURCHGEFALLENEN Controls (Recall-Rescue). + """ + cands = [c for c in candidate_cids if c] + if not text or len(text) < 100 or not cands: + return set() + thr = DSE_EMBED_THRESHOLD if threshold is None else threshold + + h = _doc_hash(text) + cache = _load_cache() + scores = cache.get(h) + + if scores is None: + if not await _embedding_reachable(): + return set() + try: + scores = await asyncio.wait_for( + _compute_scores(text, cands), timeout=embed_timeout) + except (Exception, asyncio.TimeoutError) as e: + logger.warning("dse embedding_recall skipped: %s", e) + return set() + if not scores: + return set() + cache[h] = scores + _save_cache(cache) + logger.info("dse embedding_recall: doc %s eingebettet (%d Scores)", + h, len(scores)) + else: + logger.info("dse embedding_recall: Cache-Treffer doc %s", h) + + cand_set = set(cands) + return {cid for cid, s in scores.items() + if cid in cand_set and s >= thr} diff --git a/backend-compliance/compliance/services/specialist_agents/dse/agent.py b/backend-compliance/compliance/services/specialist_agents/dse/agent.py index babd9f2d..49ff6ab1 100644 --- a/backend-compliance/compliance/services/specialist_agents/dse/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/dse/agent.py @@ -1,29 +1,286 @@ -"""DSEAgent — Datenschutzerklärung / Datenschutzinformation (Art. 13/14 DSGVO). +"""DSE-Agent v3 — Datenschutzerklärung / Datenschutzinformation (Art. 13/14 +DSGVO), baut auf doc_check_controls (267 text-MCs aus DB). -Thin-Subclass von ChecklistAgent über die kuratierte ART13_CHECKLIST (KEIN -90k-Library-Firehose). Einzige Spezialität: Drittland wird bei dokumentiertem -Drittlandtransfer (Scan-Kontext) zu HIGH angehoben. +Volle Parität zu impressum/ + cookie_policy/ (User-Vorgabe 2026-06-17): + Layer 0 — Regex-Boost (kuratierte Art-13-Patterns aus mcs.py / ART13_CHECKLIST) + Layer 1 — Keyword-Match aus pass_criteria der DSE-DB-MCs (deterministisch) + Layer 2 — BGE-M3 Embedding-Match + Layer 3 — Semantic-Validator (LLM) für offene HIGH/MEDIUM-Fails + Auto-Learning + +Die kuratierten Patterns gehen NICHT verloren — sie boosten (Layer 0) die DB- +Controls (z.B. präzises "keine Drittlandübermittlung" → drittland-MC PASS, kein +False-Positive). DSE-Spezialität bleibt: Drittland → HIGH bei dokumentiertem +Transfer (scan_context). + +Output-Layer (Linter / Rollup / Methodik-UI) bleibt 1:1. """ from __future__ import annotations -from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST +import logging +from datetime import datetime, timezone -from .._base import AgentInput -from .._checklist_agent import ChecklistAgent +from .._base import ( + AgentInput, + AgentOutput, + BaseSpecialistAgent, + EscalationLog, + EvidenceSource, + Finding, + McCoverage, + Severity, + SourceType, + lint_output, +) +from .._pattern_library import record as record_pattern +from .._rollup import rollup +from .._semantic_validator import build_rename_action, validate_present +from .mcs import MC_IDS, MCS +from .regex_boost import BOOST_KEYWORDS +from .v3_engine import run_v3_pipeline + +logger = logging.getLogger(__name__) -class DSEAgent(ChecklistAgent): - CHECKLIST = ART13_CHECKLIST +_SEV_TO_ENUM = { + "CRITICAL": Severity.HIGH, + "HIGH": Severity.HIGH, + "MEDIUM": Severity.MEDIUM, + "LOW": Severity.LOW, + "INFO": Severity.INFO, +} + +# Drittland-Vokabeln für die scan_context-Heraufstufung (Art. 13(1)(f)). +_THIRD_COUNTRY_KW = tuple(set( + BOOST_KEYWORDS.get("third_country", ()) + + BOOST_KEYWORDS.get("third_country_mechanism", ()) +)) + + +def _build_measure(label: str, norm: str) -> str: + """Maßnahme (Imperativ) statt Pruef-Frage als action.""" + base = (label or "").strip().rstrip(".") + if not base: + return ("Datenschutz-Pflichtangabe ergänzen und gegen Art. 13/14 " + "DSGVO prüfen.") + msg = f"Pflichtangabe ergänzen: {base}." + if norm: + msg += f" Rechtsgrundlage: {norm}." + return msg + + +def _is_third_country_topic(result: dict) -> bool: + """Ist dieses DB-MC thematisch ein Drittland-Control?""" + parts: list[str] = [str(result.get("label") or "").lower()] + for c in (result.get("_pass_criteria") or []): + if c: + parts.append(str(c).lower()) + blob = " ".join(parts) + hits = sum(1 for kw in _THIRD_COUNTRY_KW if kw in blob) + return hits >= 1 + + +class DSEAgent(BaseSpecialistAgent): agent_id = "dse" - agent_version = "1.0" + agent_version = "3.0" doc_type = "dse" - owned_mc_ids = tuple(c["id"] for c in ART13_CHECKLIST) + owned_mc_ids = MC_IDS - def _severity_override(self, c: dict, agent_input: AgentInput): + def _third_country_transfer(self, agent_input: AgentInput) -> bool: sc = (agent_input.context or {}).get("scan_context") or {} - tc = str(sc.get("third_country_transfer", "")).lower() in ( + return str(sc.get("third_country_transfer", "")).lower() in ( "yes", "true", "1", "ja") - if tc and c["id"] in ("third_country", "third_country_mechanism"): - return "HIGH" - return None + + async def evaluate(self, agent_input: AgentInput) -> AgentOutput: + start = datetime.now(timezone.utc) + text = (agent_input.text or "").strip() + scope = set(agent_input.business_scope or []) + coverage: list[McCoverage] = [] + findings: list[Finding] = [] + esc_logs: list[EscalationLog] = [] + notes_parts: list[str] = [] + + if len(text) < 100: + for mc in MCS: + coverage.append(McCoverage( + mc_id=mc.mc_id, status="skipped", + label=mc.label, reason="text too short", + )) + return self._finalize( + start, findings, esc_logs, coverage, + confidence=0.0, + notes="DSE-Text zu kurz oder leer.", + ) + + tc_transfer = self._third_country_transfer(agent_input) + # Embedding-Recall (Layer 2) läuft IMMER — deterministisch, gecacht + # (pro Doc-Hash → Folge-Views instant) und Reachability-gegated + # (kein Hang, wenn der Service fehlt). Ersetzt den über-passenden Boost. + results, telemetry = await run_v3_pipeline(text, scope) + notes_parts.append( + f"v3-pipeline: {telemetry.get('total_mcs', 0)} DB-MCs · " + f"{telemetry.get('layer_1_pass', 0)} Keyword-Treffer · " + f"{telemetry.get('embedding_passes', 0)} semantisch (Embedding)" + ) + if telemetry.get("sector_dropped") or telemetry.get("offtopic_dropped"): + notes_parts.append( + f"Scope-Filter: {telemetry.get('sector_dropped', 0)} " + f"Branchen-MCs, {telemetry.get('offtopic_dropped', 0)} " + "themenfremde MCs entfernt" + ) + + seen: set[str] = set() + for r in results: + mc_id = r.get("control_id") or "" + if not mc_id or mc_id in seen: + continue + seen.add(mc_id) + passed = bool(r.get("passed")) + sev = _SEV_TO_ENUM.get( + (r.get("severity") or "MEDIUM").upper(), Severity.MEDIUM, + ) + # DSE-Spezialität: Drittland → HIGH bei dokumentiertem Transfer. + sev_reason = "db_mc_failed" + if tc_transfer and _is_third_country_topic(r): + sev = Severity.HIGH + sev_reason = "db_mc_failed_third_country_transfer" + coverage.append(McCoverage( + mc_id=mc_id, + status="ok" if passed else sev.value.lower(), + reason=str(r.get("matched_text") or r.get("hint") or "")[:120], + )) + if passed: + continue + label = r.get("label") or r.get("hint") or "" + norm_str = str(r.get("regulation") or "").strip() + if r.get("article"): + norm_str = (norm_str + f" Art. {r.get('article')}").strip() + if not norm_str: + norm_str = "DSGVO Art. 13/14" + findings.append(Finding( + check_id=f"DSE-DBMC-{mc_id}", + agent=self.agent_id, + agent_version=self.agent_version, + field_id=mc_id, + severity=sev, + severity_reason=sev_reason, + title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt", + norm=norm_str, + evidence="", + action=_build_measure(str(label), norm_str)[:400], + confidence=0.9, + sources=[EvidenceSource( + source_type=SourceType.MC, + source_id=mc_id, + detail=str(r.get("source") or "keyword_match")[:120], + confidence=0.9, + )], + )) + + # Boost-Coverage: meine Pattern-Treffer (regex-boost field_ids). + boost_ids = set(telemetry.get("layer_0_field_ids") or []) + for mc in MCS: + coverage.append(McCoverage( + mc_id=mc.mc_id, + status="ok" if mc.field_id in boost_ids else "na", + label=mc.label, + reason=("regex-boost hit" + if mc.field_id in boost_ids + else "kein Pattern-Treffer (kein Veto)"), + )) + + if not (agent_input.context or {}).get("skip_llm"): + await self._semantic_demote(text, findings, coverage) + + confs = [f.confidence for f in findings if f.confidence] or [0.95] + overall = sum(confs) / len(confs) + + return self._finalize( + start, findings, esc_logs, coverage, + confidence=overall, notes=" · ".join(notes_parts), + ) + + async def _semantic_demote( + self, text: str, findings: list[Finding], + coverage: list[McCoverage], + ) -> None: + """LLM-Layer für HIGH/MEDIUM-DB-MCs: Label-Mismatch-Check. + Bei Fund → HIGH/MEDIUM → LOW + Rename-Action.""" + candidates = [ + f for f in findings + if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value) + and f.severity_reason in ( + "db_mc_failed", "db_mc_failed_third_country_transfer") + ] + if not candidates: + return + result = await validate_present( + text, [(f.field_id, f.title[:80]) for f in candidates], + ) + if not result: + return + for finding in candidates: + row = result.get(finding.field_id) + if not row or not row.get("found"): + continue + if row.get("confidence", 0) < 0.6: + continue + label_used = row.get("label_used") or "abweichendes Label" + conf = float(row.get("confidence") or 0.8) + finding.severity = Severity.LOW.value + finding.severity_reason = "label_mismatch" + finding.title = ( + f"Label '{label_used}' weicht von Standard ab" + ) + finding.evidence = str(row.get("evidence") or "")[:200] + finding.action = build_rename_action( + finding.field_id, label_used, + ) + finding.confidence = conf + finding.sources.append(EvidenceSource( + source_type=SourceType.LLM_LOCAL, + source_id="semantic_validator", + detail=f"LLM-confirmed: '{label_used}'", + confidence=conf, + )) + for c in coverage: + if c.mc_id == finding.field_id: + c.status = "low" + c.reason = f"label_mismatch: '{label_used}'" + try: + record_pattern( + field_id=finding.field_id, + label_used=label_used, + confidence=conf, + agent_id=self.agent_id, + ) + except Exception as e: + logger.warning("pattern-library record failed: %s", e) + + def _finalize( + self, start: datetime, findings: list[Finding], + esc_logs: list[EscalationLog], coverage: list[McCoverage], + confidence: float, notes: str = "", + ) -> AgentOutput: + end = datetime.now(timezone.utc) + recs = rollup(findings) + out = AgentOutput( + agent=self.agent_id, + agent_version=self.agent_version, + started_at=start, + finished_at=end, + duration_ms=int((end - start).total_seconds() * 1000), + findings=findings, + recommendations=recs, + mc_coverage=coverage, + escalation_log=esc_logs, + confidence=confidence, + notes=notes, + mc_total=len(coverage), + mc_ok=sum(1 for c in coverage if c.status == "ok"), + mc_na=sum(1 for c in coverage if c.status == "na"), + mc_high=sum(1 for c in coverage if c.status == "high"), + mc_medium=sum(1 for c in coverage if c.status == "medium"), + mc_low=sum(1 for c in coverage if c.status == "low"), + ) + return lint_output(out) diff --git a/backend-compliance/compliance/services/specialist_agents/dse/deep_check.py b/backend-compliance/compliance/services/specialist_agents/dse/deep_check.py new file mode 100644 index 00000000..bad4f70a --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/dse/deep_check.py @@ -0,0 +1,129 @@ +"""DSE-Tiefenprüfung: LLM-Kaskade auf die UNSCHARFEN Findings. + +User-Architektur (2026-06-18): die deterministische Engine (Keyword + Embedding) +triagiert. Eindeutige Fälle (sehr hoher/niedriger Embedding-Score) bleiben +deterministisch. Die UNSCHARFE Mitte + grenzwertig-Bestandene gehen durch die +Kaskade — denn dort entstehen sowohl 'verpasste Lücken' (schlimmster Fehler) als +auch Falsch-Findings (Rework). + +Eskalation auf ANTWORT-UNSICHERHEIT (nicht JSON-Gültigkeit): jedes Tier liefert +{erfuellt, confidence, begruendung}. Confidence < Schwelle → nächstes Tier. + Tier 1: Qwen 35B (lokal, schnell, billig) + Tier 2: OVH gpt-oss-120B + Tier 3: Claude — NUR mit Freigabe (allow_claude), sonst 'needs_freigabe'. + +Judging-Leitplanken (User-Vorgaben): + - Speicherdauer nur erfüllt bei konkreter Höchstdauer ODER echtem, + nachvollziehbarem Kriterium — NICHT zirkulär ('bis Zweck wegfällt'). + - Ohne ausreichenden Kontext → eher nicht erfüllt (nichts fehlen lassen). +""" + +from __future__ import annotations + +import json +import logging +import os + +from compliance.services.llm_cascade import ( + _call_anthropic, _call_ollama, _call_ovh, +) + +logger = logging.getLogger(__name__) + +# Unscharfe Embedding-Zone (kalibriert an 5-Firmen-GT 2026-06-18): außerhalb ist +# die Engine sicher genug, innen entscheidet der LLM. +FUZZY_LO = float(os.getenv("DSE_FUZZY_LO", "0.50")) +FUZZY_HI = float(os.getenv("DSE_FUZZY_HI", "0.72")) +# Selbstkonfidenz-Schwelle: darunter → eskalieren. +ESC_CONF = float(os.getenv("DSE_ESC_CONF", "0.75")) + +_JUDGE_SYS = ( + "Du bist ein erfahrener DSGVO-Datenschutz-Auditor. Du prüfst, ob eine " + "konkrete Pflicht in einer Datenschutzerklärung (DSE) ERFÜLLT ist. " + "Sei streng wie ein Fachanwalt: lieber 'nicht erfüllt' wenn unklar — eine " + "übersehene Lücke ist schlimmer als ein Hinweis zu viel. " + "Speicherdauer ist NUR erfüllt bei konkreter Höchstdauer ODER einem echten, " + "nachvollziehbaren Kriterium; zirkuläre Formeln ('bis der Zweck wegfällt') " + "erfüllen die Pflicht NICHT. " + 'Antworte AUSSCHLIESSLICH als JSON: ' + '{"erfuellt": true|false, "confidence": 0.0-1.0, "begruendung": "kurz"}' +) + + +def _build_user(doc_text: str, title: str, criteria: list) -> str: + crit = "; ".join(str(c) for c in (criteria or []) if c)[:600] + return ( + f"PFLICHT: {title}\n" + f"Erfüllt, wenn: {crit}\n\n" + f"DATENSCHUTZERKLÄRUNG (Auszug):\n{doc_text[:14000]}\n\n" + "Ist die Pflicht im Text inhaltlich erfüllt?" + ) + + +def _parse(text: str) -> dict | None: + if not text: + return None + s, e = text.find("{"), text.rfind("}") + if s < 0 or e <= s: + return None + try: + o = json.loads(text[s:e + 1]) + return { + "erfuellt": bool(o.get("erfuellt")), + "confidence": float(o.get("confidence") or 0.0), + "begruendung": str(o.get("begruendung") or "")[:300], + } + except Exception: + return None + + +async def judge_control( + doc_text: str, title: str, criteria: list, allow_claude: bool = False, +) -> dict: + """Tiered judgment mit Selbstkonfidenz-Eskalation. Returns + {erfuellt, confidence, source, begruendung, needs_freigabe}.""" + user = _build_user(doc_text, title, criteria) + tiers = [("qwen", _call_ollama), ("ovh_120b", _call_ovh)] + best: dict | None = None + for name, call in tiers: + try: + if name == "qwen": + txt = await call(_JUDGE_SYS, user, max_tokens=400, + timeout=60, think=False) + else: + txt = await call(_JUDGE_SYS, user, max_tokens=400) + except Exception as e: + logger.warning("deep_check tier %s failed: %s", name, e) + txt = "" + p = _parse(txt) + if p: + p["source"] = name + best = p + if p["confidence"] >= ESC_CONF: + return {**p, "needs_freigabe": False} + # Tier 3: Claude — nur mit Freigabe + if not allow_claude: + if best: + return {**best, "needs_freigabe": True} + return {"erfuellt": False, "confidence": 0.0, "source": "none", + "begruendung": "Unsicher — Anwalt/Claude-Freigabe nötig", + "needs_freigabe": True} + try: + txt = await _call_anthropic(_JUDGE_SYS, user, max_tokens=400) + p = _parse(txt) + if p: + return {**p, "source": "anthropic_claude", "needs_freigabe": False} + except Exception as e: + logger.warning("deep_check claude failed: %s", e) + if best: + return {**best, "needs_freigabe": False} + return {"erfuellt": False, "confidence": 0.0, "source": "none", + "begruendung": "Kein LLM-Ergebnis", "needs_freigabe": False} + + +def is_fuzzy(score: float, kw_pass: bool) -> bool: + """Unscharf = im Embedding-Graubereich UND nicht durch Keyword klar bestätigt. + Klar-bestanden (kw) bleibt deterministisch; klar-hoch/niedrig auch.""" + if kw_pass: + return False + return FUZZY_LO <= score <= FUZZY_HI diff --git a/backend-compliance/compliance/services/specialist_agents/dse/mcs.py b/backend-compliance/compliance/services/specialist_agents/dse/mcs.py new file mode 100644 index 00000000..05d5616c --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/dse/mcs.py @@ -0,0 +1,78 @@ +"""Machine-Check-Definitionen für den DSE-Agent (Layer-0 Regex-Boost). + +Eine MC = ein abgegrenztes Art-13/14-DSGVO-Pflichtfeld mit deterministischen +Patterns. Quelle der Patterns ist die EINE kuratierte ART13_CHECKLIST +(doc_checks/dse_checks.py) — hier nur in das MC-Format gehoben, damit der +Regex-Boost (regex_boost.py) und die v3-Engine (v3_engine.py) dieselbe Struktur +nutzen wie impressum/ + cookie_policy/. KEINE Pattern-Duplikation: die Patterns +bleiben in dse_checks.py, dieses Modul kompiliert sie nur. + +Owner = dse-agent. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Pattern + +from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST + + +@dataclass(frozen=True) +class MC: + """Eine Machine-Check-Definition (Boost-Pattern für ein DSE-Feld).""" + mc_id: str # DSE-MC-001 ... + field_id: str # controller, legal_basis, third_country ... + label: str + norm: str + patterns: tuple[Pattern[str], ...] = field(default_factory=tuple) + severity_if_missing: str = "MEDIUM" + level: int = 1 + + +_NORM_RE = re.compile(r"\((Art\.[^)]+|§\s*\d+[^)]*)\)") + + +def _norm_of(label: str) -> str: + m = _NORM_RE.search(label or "") + return m.group(1).strip() if m else "Art. 13/14 DSGVO" + + +def _compile(patterns: list[str]) -> tuple[Pattern[str], ...]: + out: list[Pattern[str]] = [] + for p in patterns or (): + try: + out.append(re.compile(p, re.IGNORECASE | re.MULTILINE)) + except re.error: + continue + return tuple(out) + + +def _build_mcs() -> tuple[MC, ...]: + """Hebt die ART13_CHECKLIST in das MC-Format (Boost-Pattern pro Feld).""" + mcs: list[MC] = [] + for i, c in enumerate(ART13_CHECKLIST, start=1): + mcs.append(MC( + mc_id=f"DSE-MC-{i:03d}", + field_id=c["id"], + label=c["label"], + norm=_norm_of(c["label"]), + patterns=_compile(c.get("patterns", [])), + severity_if_missing=c.get("severity", "MEDIUM"), + level=c.get("level", 1), + )) + return tuple(mcs) + + +MCS: tuple[MC, ...] = _build_mcs() + +# Public list of all MC-IDs for the Registry / owned_mc_ids. +MC_IDS: tuple[str, ...] = tuple(m.mc_id for m in MCS) + + +def scope_matches(mc: MC, scope: set[str]) -> bool: + """Art-13/14-Pflichten gelten universell für jede DSE — keine Branchen- + Gating auf Boost-Ebene (anders als Impressum mit Kammerberufen). Das + Sektor-Gate über den control_id-Prefix passiert in der v3-Engine.""" + return True diff --git a/backend-compliance/compliance/services/specialist_agents/dse/regex_boost.py b/backend-compliance/compliance/services/specialist_agents/dse/regex_boost.py new file mode 100644 index 00000000..e17f4299 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/dse/regex_boost.py @@ -0,0 +1,179 @@ +"""Layer-0 Regex-Boost für den DSE-Agent — die kuratierten Art-13/14-Patterns +als deterministische Vor-Stufe vor dem Keyword-Match aus doc_check_controls. + +Analog zu impressum/regex_boost.py + cookie_policy/regex_boost.py: + - run_v3_pipeline lädt die 267 text-MCs (doc_type='dse') und macht + Keyword-Match aus deren pass_criteria. + - MEIN Beitrag (Layer 0): die präzisen Art-13-Patterns (mcs.py / aus + ART13_CHECKLIST) laufen ZUERST. Trifft ein Pattern → das thematisch + passende DB-MC wird zu PASS geboostet (auch wenn der Keyword-Match unklar + war). Mapping: field_id → typische Wörter in der pass_criteria der DB-MC. + +Damit gehen die kuratierten DSE-Patterns nicht verloren, sondern boosten das +DB-Control-System (statt es zu ersetzen). +""" + +from __future__ import annotations + +import logging + +from .mcs import MCS, scope_matches + +logger = logging.getLogger(__name__) + + +# field_id (aus ART13_CHECKLIST) → Wörter, wie sie in der pass_criteria der +# zugehörigen DSE-DB-MCs vorkommen. Treffen ≥2 dieser Wörter in den criteria +# eines DB-MC, gehört es zu diesem Feld → Boost. Die Vokabeln sind an den +# real beobachteten DB-Kriterien ausgerichtet (DATA/SEC/AUTH-Controls). +BOOST_KEYWORDS: dict[str, tuple[str, ...]] = { + "controller": ( + "verantwortlich", "verantwortliche stelle", "verantwortlichen", + "kontaktdaten des verantwortlichen", "name und kontaktdaten", + "firmenname", "rechtsform", "anschrift", "ladungsfähige", + "identität des verantwortlichen", "controller", + ), + "dpo": ( + "datenschutzbeauftragt", "datenschutzbeauftragter", + "datenschutzbeauftragte", "data protection officer", + "kontaktdaten des datenschutz", "benennung", "art. 37", + ), + "purposes": ( + "zweck", "zwecke", "verarbeitungszweck", "zweck der verarbeitung", + "zwecke der verarbeitung", "verarbeitungstätigkeit", "purpose", + "zweckbindung", "erhebung", + ), + "legal_basis": ( + "rechtsgrundlage", "berechtigte interesse", "berechtigtes interesse", + "einwilligung", "vertragserfüllung", "vertragserfuellung", + "interessenabwägung", "interessenabwaegung", "art. 6", + "rechtmäßigkeit", "rechtmaessigkeit", "erforderlich", + ), + "recipients": ( + "empfänger", "empfaenger", "empfängerkategorien", + "empfaengerkategorien", "weitergabe", "übermittlung an", + "auftragsverarbeit", "auftragsverarbeiter", "dienstleister", + "dritte", "drittempfänger", "kategorien von empfängern", + ), + "third_country": ( + "drittland", "drittstaat", "drittländer", "drittlaender", + "standardvertragsklausel", "angemessenheitsbeschluss", + "übermittlung in ein drittland", "geeignete garantien", + "schutzgarantien", "data privacy framework", "ewr", + "internationale übermittlung", "transfermechanismus", + ), + "third_country_mechanism": ( + "standardvertragsklausel", "angemessenheitsbeschluss", + "geeignete garantien", "data privacy framework", + "schutzgarantien", "art. 46", "art. 45", + ), + "retention": ( + "speicherdauer", "aufbewahrungsfrist", "aufbewahrungsdauer", + "löschfrist", "loeschfrist", "speicherbegrenzung", "löschung", + "loeschung", "dauer der speicherung", "kriterien für die festlegung", + "speicherfrist", "aufbewahrung", + ), + "retention_periods": ( + "aufbewahrungsfrist", "löschfrist", "speicherdauer", + "gesetzliche frist", "handelsrechtlich", "steuerrechtlich", + ), + "rights": ( + "betroffenenrecht", "betroffenenrechte", "recht auf auskunft", + "auskunft", "berichtigung", "löschung", "loeschung", + "einschränkung", "einschraenkung", "datenübertragbarkeit", + "datenuebertragbarkeit", "widerspruch", "widerruf", + "rechte der betroffenen", "art. 15", "art. 17", "art. 21", + ), + "complaint": ( + "beschwerderecht", "aufsichtsbehörde", "aufsichtsbehoerde", + "datenschutzbehörde", "datenschutzbehoerde", "beschwerde", + "recht auf beschwerde", "art. 77", "zuständige aufsichtsbehörde", + ), + "rights_art22_profiling": ( + "automatisierte entscheidung", "profiling", "art. 22", + "automatisierte einzelentscheidung", "scoring", + ), + "dse_version_date": ( + "stand", "letzte aktualisierung", "zuletzt geändert", + "gültig ab", "gueltig ab", "version", "versionsdatum", + "aktualität", "nachweisbarkeit", + ), +} + + +def compute_regex_boosts(text: str, business_scope: set[str] | None = None) -> set[str]: + """Welche DSE-field_ids haben die kuratierten Patterns erkannt? + + Returns die Menge gehit'ter field_ids, über die später entschieden wird, + ob ein DB-MC darüber automatisch passed werden kann. business_scope wird + akzeptiert (Signatur-Parität mit impressum), für DSE aber nicht gegated — + Art-13-Pflichten sind universell. + """ + if not text or len(text) < 50: + return set() + scope = business_scope or set() + hits: set[str] = set() + for mc in MCS: + if not scope_matches(mc, scope): + continue + if any(p.search(text) for p in mc.patterns): + hits.add(mc.field_id) + return hits + + +def boost_matches_db_mc( + boosts: set[str], + pass_criteria: list, + fail_criteria: list | None = None, +) -> str | None: + """Hat ein gebooster field_id ≥2 Keyword-Überlapp mit den pass/fail_criteria + eines DB-MC? Returns field_id (höchster Match-Count) oder None.""" + if not boosts: + return None + crit_parts: list[str] = [] + for c in (pass_criteria or []): + if c: + crit_parts.append(str(c).lower()) + for c in (fail_criteria or []): + if c: + crit_parts.append(str(c).lower()) + if not crit_parts: + return None + crit_text = " ".join(crit_parts) + best: tuple[int, str] | None = None + for field_id in boosts: + kws = BOOST_KEYWORDS.get(field_id) or () + match_count = sum(1 for kw in kws if kw in crit_text) + if match_count >= 2: + if best is None or match_count > best[0]: + best = (match_count, field_id) + return best[1] if best else None + + +def criteria_on_topic( + pass_criteria: list | None, + fail_criteria: list | None = None, + min_hits: int = 2, +) -> bool: + """Deterministischer Themen-Gate: gehört ein DB-MC überhaupt ins DSE- + Themenfeld (Art 13/14)? ≥min_hits unterschiedliche Schlüsselwörter aus + IRGENDEINEM DSE-Feld in den kombinierten criteria. Fängt fremd-getaggte + MCs ab. Leere Kriterien → on-topic behalten (konservativ).""" + crit_parts: list[str] = [] + for c in (pass_criteria or []): + if c: + crit_parts.append(str(c).lower()) + for c in (fail_criteria or []): + if c: + crit_parts.append(str(c).lower()) + if not crit_parts: + return True + crit_text = " ".join(crit_parts) + hits: set[str] = set() + for kws in BOOST_KEYWORDS.values(): + for kw in kws: + if kw in crit_text: + hits.add(kw) + if len(hits) >= min_hits: + return True + return False diff --git a/backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py new file mode 100644 index 00000000..b77a87cf --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py @@ -0,0 +1,209 @@ +"""v3-Engine: läuft die 4-Layer-Pipeline auf einem DSE-Text (Art. 13/14 DSGVO). + + Layer 0 — Regex-Boost (die kuratierten Art-13-Patterns aus mcs.py) + Layer 1 — MC-Laden + Keyword-Match. Das LADEN delegiert an die Main-Tool- + Engine (rag_document_checker._load_controls, doc_type='dse'): + eine Quelle der Wahrheit inkl. P72-Scope, check_type='text' + (267 von 571) und fits_doc_type/scope_requires aus dem Sidecar. + Layer 2 — BGE-M3 Embedding-Match (mc_embedding_matcher, shared) + Layer 0 Override — failed MCs, deren criteria zu einem gebooster field_id + passen, werden zu PASS überschrieben. + +Zusätzlich am Agent-Rand: subtraktives Sektor-/Themen-Gate (_filter_controls) +— das Sektor-Gate (Branchen-Prefix GOV/FIN/MED…) verwirft branchenfremde MCs, +das Themen-Gate fremd-getaggte. Analog impressum/v3_engine.py. + +Output: Liste Result-Dicts kompatibel mit rag_document_checker. Der Agent +konvertiert sie zu Finding-Objekten. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from .regex_boost import ( + compute_regex_boosts, + criteria_on_topic, +) + +logger = logging.getLogger(__name__) + +# Branchen-Prefix -> erwarteter Scope-Token. Reuse aus dem Mail-V2-Scope- +# Filter, damit Agent-Pfad und Report-Pfad dieselbe Quelle nutzen. Import +# defensiv: faellt der Mail-Pfad weg, bleibt der Agent lauffaehig. +try: + from compliance.services.mail_render_v2._scope_filter import ( + SECTOR_PREFIXES, + ) +except Exception: # pragma: no cover - defensiver Fallback + SECTOR_PREFIXES = {} + + +async def run_v3_pipeline( + text: str, + business_scope: set[str], + db_url: str = "", + skip_embedding: bool = False, +) -> tuple[list[dict[str, Any]], dict[str, Any]]: + """Returns (results, telemetry). + + results: pro DB-MC ein dict {control_id, passed, severity, ...} + telemetry: counters für Frontend-Anzeige (Layer-Aufschlüsselung) + + skip_embedding: Layer-2 (BGE-M3 Recall) überspringen. Nur für Unit-Tests + ohne Embedding-Service. Im Betrieb läuft die Recall-Schicht: sie ist + gecacht (pro Doc-Hash) und Reachability-gegated, blockiert also nie. + """ + if not text or len(text) < 100: + return [], {"reason": "text too short"} + + # Layer 0: kuratierte Art-13-Patterns + boosts = compute_regex_boosts(text, business_scope) + boost_field_ids = sorted(boosts) + logger.info("dse v3 Layer-0 boosts: %d hits — %s", + len(boost_field_ids), boost_field_ids) + + # Layer 1: MC-Laden DELEGIERT an die Main-Tool-Engine (Scope-Schutz inkl.). + try: + from compliance.services.rag_document_checker import _load_controls + controls = await _load_controls( + "dse", db_url, 0, business_scope, + ) + except Exception as e: + logger.warning("dse v3 load via main-tool engine failed: %s", e) + controls = [] + _normalize_criteria(controls) + # Agent-Rand-Backstop: Sektor-Gate (Branchen-Prefix) + Themen-Gate. + controls, drop_stats = _filter_controls(controls, business_scope) + # Applicability-Gate: hochsichere organisatorische Controls (laut + # control_classification NICHT DSE, needs_review=false) aus dem + # FINDINGS-Scan nehmen -> organisatorische Checkliste statt False-FEHLT. + # Fail-safe: needs_review bleiben drin. Defensiv: fehlt die Tabelle, kein + # Filter (Prod-sicher). Siehe _classification_gate. + from ._classification_gate import apply_gate, load_dse_gate + gate = await load_dse_gate(db_url) + organizational: list[dict[str, Any]] = [] + if gate: + controls, organizational = apply_gate(controls, gate) + results: list[dict[str, Any]] = [] + if controls: + try: + from compliance.services.rag_document_checker import ( + _check_mc_deterministic, + ) + text_lower = text.lower().replace("\xad", "") + for mc in controls: + r = _check_mc_deterministic(text_lower, mc) + if r: + r["_pass_criteria"] = mc.get("pass_criteria") + r["_fail_criteria"] = mc.get("fail_criteria") + results.append(r) + except Exception as e: + logger.warning("layer-1 keyword check failed: %s", e) + results = [] + + layer_1_pass = sum(1 for r in results if r.get("passed")) + + # Layer 2: DETERMINISTISCHE semantische Recall-Schicht (BGE-M3, gecacht). + # Ersetzt den früheren Regex-Boost, der auf vollständigen DSE-Dokumenten + # massiv über-passte (BMW: 71/94 Boost-Overrides → 49% Übereinstimmung). + # Embedding ist genauer (BMW-GT: KW|EMB@0.65 = 75%) UND deterministisch + # (feste Funktion, reproduzierbar — kein Keyword-Katalog, kein LLM). + embedding_passes = 0 + if not skip_embedding: + failed_cids = [r.get("control_id") for r in results + if r and not r.get("passed") and r.get("control_id")] + if failed_cids: + try: + from ._embedding_recall import embedding_recall + semantic = await embedding_recall(text, failed_cids) + except Exception as e: + logger.warning("dse embedding recall failed: %s", e) + semantic = set() + for r in results: + if (r.get("control_id") in semantic + and not r.get("passed")): + r["passed"] = True + r["matched_text"] = "[semantisch erkannt — Embedding]" + r["source"] = (r.get("source") or "") + "+embedding" + embedding_passes += 1 + + telemetry = { + "layer_0_field_hits": len(boost_field_ids), + "layer_0_field_ids": boost_field_ids, + "layer_1_pass": layer_1_pass, + "embedding_passes": embedding_passes, + "total_mcs": len(results), + "sector_dropped": drop_stats.get("sector_dropped", 0), + "offtopic_dropped": drop_stats.get("offtopic_dropped", 0), + "gate_excluded": len(organizational), + "organizational_checklist": organizational, + } + logger.info("dse v3 telemetry: %s", telemetry) + return results, telemetry + + +def _filter_controls( + controls: list[dict[str, Any]], + business_scope: set[str], +) -> tuple[list[dict[str, Any]], dict[str, int]]: + """Subtraktiver Scope-Filter VOR der Bewertung. + + 1. Sektor-Gate — MCs deren control_id-Prefix eine Branche bezeichnet + (FIN/GOV/MED/INS/EDU/LEG/REL/POL), die NICHT im business_scope liegt + UND die nicht on-topic ist, werden verworfen. + 2. Themen-Gate — MCs ohne DSE-Themenüberlapp werden verworfen. + + Rein subtraktiv: entfernt nur falsch-positive Kandidaten. + """ + scope_lc = {s.lower() for s in (business_scope or set())} + kept: list[dict[str, Any]] = [] + sector_dropped = 0 + offtopic_dropped = 0 + for c in controls: + cid = c.get("control_id") or "" + prefix = cid.split("-")[0].upper() if "-" in cid else "" + on_topic = criteria_on_topic(c.get("pass_criteria"), + c.get("fail_criteria")) + required = SECTOR_PREFIXES.get(prefix) + # Sektor-Gate nur fuer NICHT-on-topic Controls: ein klar DSE- + # thematischer Control (z.B. GOV-Prefix aus der Domain-Erkennung) + # darf nicht am Branchen-Prefix scheitern. + if required and not (scope_lc & required) and not on_topic: + sector_dropped += 1 + continue + if not on_topic: + offtopic_dropped += 1 + continue + kept.append(c) + if sector_dropped or offtopic_dropped: + logger.info( + "dse v3 scope-filter: -%d Branchen-MCs, -%d themenfremde MCs " + "(scope=%s)", sector_dropped, offtopic_dropped, + sorted(scope_lc) or "leer", + ) + return kept, { + "sector_dropped": sector_dropped, + "offtopic_dropped": offtopic_dropped, + } + + +def _normalize_criteria(controls: list[dict[str, Any]]) -> None: + """asyncpg liefert JSONB-Spalten (pass_criteria/fail_criteria) als + Roh-String. In echte Listen parsen, damit Sektor-/Themen-Gate und der + Boost-Layer Element-weise iterieren.""" + import json + for c in controls: + for key in ("pass_criteria", "fail_criteria"): + v = c.get(key) + if isinstance(v, list): + continue + if isinstance(v, str): + try: + parsed = json.loads(v) + c[key] = parsed if isinstance(parsed, list) else [v] + except Exception: + c[key] = [v] if v else [] + else: + c[key] = [] diff --git a/backend-compliance/compliance/tests/test_agent_outputs_topics.py b/backend-compliance/compliance/tests/test_agent_outputs_topics.py index a88a0a2e..d15c2c32 100644 --- a/backend-compliance/compliance/tests/test_agent_outputs_topics.py +++ b/backend-compliance/compliance/tests/test_agent_outputs_topics.py @@ -6,5 +6,9 @@ def test_agb_wired_into_live_topic_agents(): assert _TOPIC_AGENTS.get("agb") == "agb" +def test_dse_wired_into_live_topic_agents(): + assert _TOPIC_AGENTS.get("dse") == "dse" + + def test_impressum_still_wired(): assert _TOPIC_AGENTS.get("impressum") == "impressum" diff --git a/backend-compliance/compliance/tests/test_dse_agent.py b/backend-compliance/compliance/tests/test_dse_agent.py index d5f2acdc..7ee213b6 100644 --- a/backend-compliance/compliance/tests/test_dse_agent.py +++ b/backend-compliance/compliance/tests/test_dse_agent.py @@ -1,65 +1,96 @@ -"""DSEAgent — kuratierte Art-13/14-Checkliste (kein Library-Firehose).""" - -from __future__ import annotations +"""DSEAgent v3 (4-Layer: Regex-Boost / Keyword / BGE-M3-Recall / Semantic). +DB (_load_controls), Embedding-Service und LLM sind offline gestubbt → die Tests +sind deterministisch und brauchen kein Netzwerk. Die reinen Schichten +(compute_regex_boosts, embedding_recall-Reachability) werden direkt geprüft, die +Result→Finding-Konvertierung über einen gestubbten run_v3_pipeline. +""" import asyncio +import compliance.services.specialist_agents.dse.agent as dse_agent from compliance.services.specialist_agents import REGISTRY, AgentInput +from compliance.services.specialist_agents.dse._embedding_recall import ( + embedding_recall, +) +from compliance.services.specialist_agents.dse.regex_boost import ( + compute_regex_boosts, +) + +_TELEMETRY = { + "layer_0_field_hits": 0, "layer_0_field_ids": [], + "layer_1_pass": 0, "embedding_passes": 0, "total_mcs": 1, + "sector_dropped": 0, "offtopic_dropped": 0, + "gate_excluded": 0, "organizational_checklist": [], +} -def _run(text: str): - return asyncio.run( - REGISTRY.get("dse").evaluate(AgentInput(doc_type="dse", text=text))) +def _pipeline_stub(results): + async def _stub(text, scope): + return results, dict(_TELEMETRY, total_mcs=len(results)) + return _stub -def test_dse_agent_registered(): - assert REGISTRY.get("dse") is not None +def _evaluate(text, context=None): + return asyncio.run(dse_agent.DSEAgent().evaluate( + AgentInput(doc_type="dse", text=text, context=context or {}))) -def test_dse_detects_core_obligations(): - text = ( - "Datenschutzerklaerung. Verantwortlich im Sinne der DSGVO ist die " - "Muster GmbH, Musterstrasse 1, 12345 Berlin. E-Mail: info@muster.de. " - "Datenschutzbeauftragter: dsb@muster.de. Zwecke der Verarbeitung und " - "Rechtsgrundlage Art. 6 Abs. 1. Empfaenger Ihrer Daten. Speicherdauer " - "der Daten. Ihre Rechte: Auskunft, Loeschung, Widerspruch, Beschwerde " - "bei der Aufsichtsbehoerde. ") * 3 - out = _run(text) - assert out.agent == "dse" - # 10 L1-Pflichtangaben immer + L2-Details deren Parent vorhanden ist - # (fehlende Parents → L2 übersprungen, kein 'na'-Rauschen). - assert 10 <= out.mc_total <= 33 - ok = [c.label for c in out.mc_coverage if c.status == "ok"] - assert any("Verantwortlich" in lbl for lbl in ok) - assert any("Rechtsgrundlage" in lbl for lbl in ok) - - -def test_dse_missing_obligations_are_findings(): - out = _run("Lorem ipsum dolor sit amet consectetur adipiscing elit. " * 6) - assert out.findings - assert any(f.severity == "HIGH" for f in out.findings) +def test_dse_agent_registered_is_v3(): + agent = REGISTRY.get("dse") + assert agent is not None and agent.agent_version == "3.0" def test_dse_short_text_skips(): - out = _run("zu kurz") + out = _evaluate("zu kurz") assert out.confidence == 0.0 assert all(c.status == "skipped" for c in out.mc_coverage) -def test_third_country_high_when_applicable_no_na_detail_short_action(): - # Text ohne Drittland-Abschnitt + Scan-Kontext drittland=ja: - # - third_country (L1) fehlt → HIGH (nicht weiches MEDIUM) - # - Transfermechanismus (L2) → KEIN 'na' (übersprungen, Parent deckt ab) - # - Titel/Maßnahme kurz (kein 280-Zeichen-Hint als Recommendation-Titel) - text = ("Datenschutz. Verantwortlich ist die Muster GmbH, info@muster.de. " - "Zwecke und Rechtsgrundlage Art. 6. Speicherdauer. Ihre Rechte. ") * 4 - out = asyncio.run(REGISTRY.get("dse").evaluate(AgentInput( - doc_type="dse", text=text, - context={"scan_context": {"third_country_transfer": "yes"}}))) - tc = [f for f in out.findings if "Drittland" in f.title] - assert tc and tc[0].severity == "HIGH" - assert not any(c.status == "na" and "Transfermechanismus" in c.label - for c in out.mc_coverage) - assert all(len(f.action) < 110 for f in out.findings) - # Detail-Begründung bleibt als evidence erhalten - assert any(f.evidence for f in out.findings) +def test_regex_boost_detects_core_fields(): + text = ("Verantwortlicher im Sinne der DSGVO ist die Muster GmbH. " + "Rechtsgrundlage ist Art. 6. Speicherdauer der Daten. Beschwerde " + "bei der Aufsichtsbehoerde. ") * 2 + hits = compute_regex_boosts(text, set()) + assert {"controller", "legal_basis", "retention", "complaint"} & hits + + +def test_embedding_recall_offline_returns_empty(): + # Kein Embedding-Service (Unit) -> Reachability-Guard -> leeres Set, kein Hang. + got = asyncio.run(embedding_recall("x" * 200, ["DSE-X-1"])) + assert got == set() + + +def test_evaluate_builds_finding_from_failed_db_mc(monkeypatch): + monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{ + "control_id": "DATA-RETENTION-1", "passed": False, "severity": "MEDIUM", + "label": "Speicherdauer der Daten", "regulation": "DSGVO", "article": "13", + "source": "keyword_match", + }])) + out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True}) + f = next((f for f in out.findings if f.field_id == "DATA-RETENTION-1"), None) + assert f is not None and f.severity == "MEDIUM" + assert f.action and len(f.action) <= 400 + + +def test_evaluate_passed_db_mc_no_finding(monkeypatch): + monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{ + "control_id": "PURPOSE-1", "passed": True, "severity": "MEDIUM", + "label": "Zwecke", "matched_text": "Zwecke der Verarbeitung", + }])) + out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True}) + assert "PURPOSE-1" not in [f.field_id for f in out.findings] + assert any(c.mc_id == "PURPOSE-1" and c.status == "ok" for c in out.mc_coverage) + + +def test_evaluate_third_country_high_on_documented_transfer(monkeypatch): + monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{ + "control_id": "TRANSFER-1", "passed": False, "severity": "MEDIUM", + "label": "Drittlanduebermittlung", "regulation": "DSGVO", "article": "13", + }])) + out = _evaluate( + "Datenschutzerklaerung " + "x" * 200, + context={"skip_llm": True, + "scan_context": {"third_country_transfer": "yes"}}) + f = next((f for f in out.findings if f.field_id == "TRANSFER-1"), None) + assert f is not None and f.severity == "HIGH" + assert f.severity_reason == "db_mc_failed_third_country_transfer"