test(dse): adopt canonical v3 tests + criteria/GT/validation
CI / detect-changes (pull_request) Failing after 5s
CI / branch-name (pull_request) Successful in 2s
CI / guardrail-integrity (pull_request) Failing after 4s
CI / secret-scan (pull_request) Failing after 4s
CI / dep-audit (pull_request) Failing after 2s
CI / sbom-scan (pull_request) Failing after 2s
CI / build-sha-integrity (pull_request) Failing after 3s
CI / validate-canonical-controls (pull_request) Failing after 3s
CI / loc-budget (pull_request) Has been skipped
CI / go-lint (pull_request) Has been skipped
CI / python-lint (pull_request) Has been skipped
CI / nodejs-lint (pull_request) Has been skipped
CI / nodejs-build (pull_request) Has been skipped
CI / test-go (pull_request) Has been skipped
CI / iace-gt-coverage (pull_request) Has been skipped
CI / test-python-backend (pull_request) Has been skipped
CI / test-python-document-crawler (pull_request) Has been skipped
CI / test-python-dsms-gateway (pull_request) Has been skipped

Replace the reconstructed test_dse_agent.py with the canonical version and add
the companion unit tests (classification_gate, embedding_recall) covering the
recovered v3 modules. Include the curated DSE criteria backup + changelog
(legal-note rationale per control), the v1 validation writeup, and the
multi-company DSE ground-truth fulltexts (elli/eto/mercedes/safetykon) used
for threshold calibration.

18 DSE tests green offline (DB/embedding/LLM stubbed).

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-21 12:35:16 +02:00
parent ce6b4c58e3
commit 8af9584d09
10 changed files with 1278 additions and 72 deletions
@@ -1,96 +1,153 @@
"""DSEAgent v3 (4-Layer: Regex-Boost / Keyword / BGE-M3-Recall / Semantic).
"""DSE-Agent v3 — DB-Controls (doc_check_controls) via run_v3_pipeline +
kuratierter Art-13-Regex-Boost (Layer 0). Volle Parität zu impressum/cookie.
DB (_load_controls), Embedding-Service und LLM sind offline gestubbt → die Tests
sind deterministisch und brauchen kein Netzwerk. Die reinen Schichten
(compute_regex_boosts, embedding_recall-Reachability) werden direkt geprüft, die
Result→Finding-Konvertierung über einen gestubbten run_v3_pipeline.
Die Tests prüfen die deterministischen Bausteine (regex_boost/mcs) ohne DB und
den Agent-Pfad mit gemocktem run_v3_pipeline (CI hat keine DB).
"""
from __future__ import annotations
import asyncio
import compliance.services.specialist_agents.dse.agent as dse_agent
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.services.specialist_agents.dse._embedding_recall import (
embedding_recall,
)
from compliance.services.specialist_agents.dse.mcs import MCS, MC_IDS
from compliance.services.specialist_agents.dse.regex_boost import (
boost_matches_db_mc,
compute_regex_boosts,
criteria_on_topic,
)
_TELEMETRY = {
"layer_0_field_hits": 0, "layer_0_field_ids": [],
"layer_1_pass": 0, "embedding_passes": 0, "total_mcs": 1,
"sector_dropped": 0, "offtopic_dropped": 0,
"gate_excluded": 0, "organizational_checklist": [],
}
_DSE_SAMPLE = (
"Datenschutzerklaerung. Verantwortlich im Sinne der DSGVO ist die Muster "
"GmbH, Musterstrasse 1, 12345 Berlin. E-Mail: info@muster.de. "
"Datenschutzbeauftragter: dsb@muster.de. Zwecke der Verarbeitung und "
"Rechtsgrundlage Art. 6 Abs. 1 lit. f berechtigtes Interesse. Empfaenger "
"Ihrer Daten sind Auftragsverarbeiter. Speicherdauer der Daten richtet "
"sich nach Aufbewahrungsfristen. Sie haben das Recht auf Auskunft, das "
"Recht auf Berichtigung, das Recht auf Loeschung sowie ein "
"Widerspruchsrecht. Beschwerde bei der Aufsichtsbehoerde moeglich. Stand: "
"Januar 2026. ") * 3
def _pipeline_stub(results):
async def _stub(text, scope):
return results, dict(_TELEMETRY, total_mcs=len(results))
return _stub
# ── Registrierung ────────────────────────────────────────────────────────
def test_dse_agent_registered():
agent = REGISTRY.get("dse")
assert agent is not None
assert agent.agent_version == "3.0"
assert agent.doc_type == "dse"
def _evaluate(text, context=None):
return asyncio.run(dse_agent.DSEAgent().evaluate(
def test_owned_mc_ids_match_checklist():
# owned_mc_ids = die Boost-Pattern-IDs (aus ART13_CHECKLIST gehoben).
assert MC_IDS == tuple(m.mc_id for m in MCS)
assert len(MC_IDS) >= 10 # mind. die 10 L1-Pflichtfelder + L2
# ── Layer-0 Regex-Boost (deterministisch, ohne DB) ───────────────────────
def test_regex_boost_detects_core_fields():
boosts = compute_regex_boosts(_DSE_SAMPLE)
# Die zentralen Art-13-Felder müssen erkannt werden.
for field in ("controller", "legal_basis", "rights", "complaint",
"retention", "dse_version_date"):
assert field in boosts, f"{field} nicht erkannt: {sorted(boosts)}"
def test_regex_boost_empty_on_short_text():
assert compute_regex_boosts("zu kurz") == set()
def test_criteria_on_topic_accepts_dse_rejects_foreign():
dse_crit = ["Rechtsgrundlage gemäß Art. 6 DSGVO benannt",
"Speicherdauer und Löschfrist angegeben"]
assert criteria_on_topic(dse_crit) is True
foreign = ["Bestellbestätigung wird per E-Mail versendet",
"Versandkosten werden im Warenkorb angezeigt"]
assert criteria_on_topic(foreign) is False
# leere Kriterien → konservativ on-topic behalten
assert criteria_on_topic([]) is True
def test_boost_matches_db_mc_third_country():
boosts = {"third_country", "controller"}
crit = ["Standardvertragsklauseln für Drittland benannt",
"Geeignete Garantien bei Übermittlung in ein Drittland"]
assert boost_matches_db_mc(boosts, crit) == "third_country"
# ohne passende Boosts → None
assert boost_matches_db_mc(set(), crit) is None
# ── Agent-Pfad mit gemocktem run_v3_pipeline ─────────────────────────────
def _mock_v3(results, telemetry=None):
async def _fake(text, scope, db_url="", skip_embedding=False):
return results, (telemetry or {
"total_mcs": len(results), "layer_0_field_hits": 0,
"layer_0_field_ids": [], "layer_0_boost_overrides": 0,
"sector_dropped": 0, "offtopic_dropped": 0})
return _fake
def _run(text, context=None):
return asyncio.run(REGISTRY.get("dse").evaluate(
AgentInput(doc_type="dse", text=text, context=context or {})))
def test_dse_agent_registered_is_v3():
agent = REGISTRY.get("dse")
assert agent is not None and agent.agent_version == "3.0"
def test_dse_short_text_skips():
out = _evaluate("zu kurz")
out = _run("zu kurz")
assert out.confidence == 0.0
assert all(c.status == "skipped" for c in out.mc_coverage)
assert out.mc_coverage and all(
c.status == "skipped" for c in out.mc_coverage)
def test_regex_boost_detects_core_fields():
text = ("Verantwortlicher im Sinne der DSGVO ist die Muster GmbH. "
"Rechtsgrundlage ist Art. 6. Speicherdauer der Daten. Beschwerde "
"bei der Aufsichtsbehoerde. ") * 2
hits = compute_regex_boosts(text, set())
assert {"controller", "legal_basis", "retention", "complaint"} & hits
def test_dse_findings_from_failed_db_mc(monkeypatch):
results = [{
"control_id": "DATA-525-A17", "passed": False, "severity": "HIGH",
"label": "Berechtigte Interessen ausweisen", "regulation": None,
"article": None, "_pass_criteria": ["berechtigtes interesse benannt"],
"matched_text": "", "source": "keyword_match",
}, {
"control_id": "AUTH-2051-A11", "passed": True, "severity": "LOW",
"label": "Prägnante Form", "regulation": None, "article": None,
"_pass_criteria": [], "matched_text": "ok",
}]
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _mock_v3(results))
out = _run(_DSE_SAMPLE, context={"skip_llm": True})
fids = {f.field_id for f in out.findings}
assert "DATA-525-A17" in fids # failed → Finding
assert "AUTH-2051-A11" not in fids # passed → kein Finding
f = next(f for f in out.findings if f.field_id == "DATA-525-A17")
assert f.severity == "HIGH"
assert f.norm == "DSGVO Art. 13/14" # NULL-regulation → Fallback-Norm
assert len(f.action) < 410
def test_embedding_recall_offline_returns_empty():
# Kein Embedding-Service (Unit) -> Reachability-Guard -> leeres Set, kein Hang.
got = asyncio.run(embedding_recall("x" * 200, ["DSE-X-1"]))
assert got == set()
def test_evaluate_builds_finding_from_failed_db_mc(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "DATA-RETENTION-1", "passed": False, "severity": "MEDIUM",
"label": "Speicherdauer der Daten", "regulation": "DSGVO", "article": "13",
"source": "keyword_match",
}]))
out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True})
f = next((f for f in out.findings if f.field_id == "DATA-RETENTION-1"), None)
assert f is not None and f.severity == "MEDIUM"
assert f.action and len(f.action) <= 400
def test_evaluate_passed_db_mc_no_finding(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "PURPOSE-1", "passed": True, "severity": "MEDIUM",
"label": "Zwecke", "matched_text": "Zwecke der Verarbeitung",
}]))
out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True})
assert "PURPOSE-1" not in [f.field_id for f in out.findings]
assert any(c.mc_id == "PURPOSE-1" and c.status == "ok" for c in out.mc_coverage)
def test_evaluate_third_country_high_on_documented_transfer(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "TRANSFER-1", "passed": False, "severity": "MEDIUM",
"label": "Drittlanduebermittlung", "regulation": "DSGVO", "article": "13",
}]))
out = _evaluate(
"Datenschutzerklaerung " + "x" * 200,
context={"skip_llm": True,
"scan_context": {"third_country_transfer": "yes"}})
f = next((f for f in out.findings if f.field_id == "TRANSFER-1"), None)
assert f is not None and f.severity == "HIGH"
def test_dse_third_country_override_to_high(monkeypatch):
# MEDIUM-Drittland-MC → HIGH bei dokumentiertem Transfer (scan_context).
results = [{
"control_id": "DATA-900-A01", "passed": False, "severity": "MEDIUM",
"label": "Drittlandtransfer Schutzgarantien benennen",
"regulation": None, "article": None,
"_pass_criteria": ["standardvertragsklauseln", "drittland garantien"],
"matched_text": "", "source": "keyword_match",
}]
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _mock_v3(results))
out = _run(_DSE_SAMPLE, context={
"skip_llm": True,
"scan_context": {"third_country_transfer": "yes"}})
f = next(f for f in out.findings if f.field_id == "DATA-900-A01")
assert f.severity == "HIGH"
assert f.severity_reason == "db_mc_failed_third_country_transfer"
def test_dse_no_transfer_keeps_medium(monkeypatch):
results = [{
"control_id": "DATA-900-A01", "passed": False, "severity": "MEDIUM",
"label": "Drittlandtransfer Schutzgarantien benennen",
"regulation": None, "article": None,
"_pass_criteria": ["standardvertragsklauseln", "drittland garantien"],
"matched_text": "", "source": "keyword_match",
}]
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _mock_v3(results))
out = _run(_DSE_SAMPLE, context={"skip_llm": True})
f = next(f for f in out.findings if f.field_id == "DATA-900-A01")
assert f.severity == "MEDIUM"
@@ -0,0 +1,59 @@
"""Tests fuer das DSE-Applicability-Gate (_classification_gate).
Deckt die reine Split-Logik (apply_gate) und das defensive Verhalten von
load_dse_gate ohne DB ab. Die DB-Abfrage selbst ist I/O und wird hier nicht
gegen eine echte DB getestet (defensiver Pfad: kein DSN -> leeres Dict)."""
import asyncio
import os
from compliance.services.specialist_agents.dse._classification_gate import (
apply_gate,
load_dse_gate,
)
def test_apply_gate_splits_findings_and_organizational():
controls = [
{"control_id": "AUTH-2051-A02", "title": "Speicherdauer nennen"},
{"control_id": "AUTH-2049-A01", "title": "VVT fuehren"},
]
gate = {
"AUTH-2049-A01": {
"obligation_type": "EVIDENCE",
"check_intent": "DIRECT_EVIDENCE",
"applicable_artifacts": ["VVT", "AUDIT"],
"reference_allowed": "NO",
}
}
kept, organizational = apply_gate(controls, gate)
assert [c["control_id"] for c in kept] == ["AUTH-2051-A02"]
assert len(organizational) == 1
org = organizational[0]
assert org["control_id"] == "AUTH-2049-A01"
assert org["title"] == "VVT fuehren"
assert org["applicable_artifacts"] == ["VVT", "AUDIT"]
assert org["check_intent"] == "DIRECT_EVIDENCE"
def test_apply_gate_empty_gate_keeps_all():
controls = [{"control_id": "X-1"}, {"control_id": "X-2"}]
kept, organizational = apply_gate(controls, {})
assert len(kept) == 2
assert organizational == []
def test_load_dse_gate_without_dsn_is_defensive():
"""Kein DSN + keine Env -> leeres Dict (kein Filter), kein Fehler."""
saved = (
os.environ.pop("DATABASE_URL", None),
os.environ.pop("COMPLIANCE_DATABASE_URL", None),
)
try:
result = asyncio.run(load_dse_gate(""))
assert result == {}
finally:
if saved[0] is not None:
os.environ["DATABASE_URL"] = saved[0]
if saved[1] is not None:
os.environ["COMPLIANCE_DATABASE_URL"] = saved[1]
@@ -0,0 +1,67 @@
"""DSE Embedding-Recall — deterministische semantische Schicht (gecacht).
Testet die reine Logik OHNE Embedding-Service: Cache-Treffer-Pfad,
Schwellen-Filter, Kandidaten-Schnitt, Reachability-Guard. Das Einbetten selbst
(Embedding-Service) ist Integration und wird auf macmini/Prod validiert.
"""
from __future__ import annotations
import asyncio
import json
import compliance.services.specialist_agents.dse._embedding_recall as er
_TEXT = ("Datenschutzerklaerung der Muster GmbH. " * 20) # > 100 Zeichen
def _seed_cache(tmp_path, scores: dict[str, float]) -> str:
p = tmp_path / "dse_embed_cache.json"
p.write_text(json.dumps({er._doc_hash(_TEXT): scores}))
return str(p)
def test_doc_hash_deterministic():
# feste Funktion: gleicher Text → gleicher Hash (Reproduzierbarkeit)
assert er._doc_hash(_TEXT) == er._doc_hash(_TEXT)
assert er._doc_hash("a") != er._doc_hash("b")
def test_cache_hit_threshold_filter(tmp_path, monkeypatch):
# Cache-Treffer: kein Embedding-Service nötig. Nur Scores >= Schwelle UND
# in den Kandidaten werden zurückgegeben.
scores = {"DATA-1": 0.71, "DATA-2": 0.60, "AUTH-3": 0.68, "SEC-4": 0.50}
monkeypatch.setenv("DSE_EMBED_CACHE", _seed_cache(tmp_path, scores))
monkeypatch.setattr(er, "_CACHE_PATH", str(tmp_path / "dse_embed_cache.json"))
cands = ["DATA-1", "DATA-2", "AUTH-3", "SEC-4"]
out = asyncio.run(er.embedding_recall(_TEXT, cands, threshold=0.65))
# >=0.65: DATA-1 (0.71), AUTH-3 (0.68). NICHT DATA-2 (0.60), SEC-4 (0.50).
assert out == {"DATA-1", "AUTH-3"}
def test_cache_hit_candidate_intersection(tmp_path, monkeypatch):
# Nur Kandidaten (durchgefallene Controls) zählen — andere ignoriert.
scores = {"DATA-1": 0.90, "DATA-2": 0.90}
monkeypatch.setattr(er, "_CACHE_PATH", str(tmp_path / "c.json"))
(tmp_path / "c.json").write_text(json.dumps({er._doc_hash(_TEXT): scores}))
out = asyncio.run(er.embedding_recall(_TEXT, ["DATA-1"], threshold=0.65))
assert out == {"DATA-1"} # DATA-2 nicht in Kandidaten
def test_empty_inputs():
assert asyncio.run(er.embedding_recall("zu kurz", ["X"])) == set()
assert asyncio.run(er.embedding_recall(_TEXT, [])) == set()
def test_service_down_returns_empty(tmp_path, monkeypatch):
# Kein Cache + Service nicht erreichbar → leer (deterministischer Layer trägt),
# KEIN Hang.
monkeypatch.setattr(er, "_CACHE_PATH", str(tmp_path / "none.json"))
async def _unreachable(timeout=2.0):
return False
monkeypatch.setattr(er, "_embedding_reachable", _unreachable)
out = asyncio.run(er.embedding_recall(_TEXT, ["DATA-1"]))
assert out == set()