"""DSE-Agent v3 — DB-Controls (doc_check_controls) via run_v3_pipeline + kuratierter Art-13-Regex-Boost (Layer 0). Volle Parität zu impressum/cookie. Die Tests prüfen die deterministischen Bausteine (regex_boost/mcs) ohne DB und den Agent-Pfad mit gemocktem run_v3_pipeline (CI hat keine DB). """ from __future__ import annotations import asyncio import compliance.services.specialist_agents.dse.agent as dse_agent from compliance.services.specialist_agents import REGISTRY, AgentInput from compliance.services.specialist_agents.dse.mcs import MCS, MC_IDS from compliance.services.specialist_agents.dse.regex_boost import ( boost_matches_db_mc, compute_regex_boosts, criteria_on_topic, ) _DSE_SAMPLE = ( "Datenschutzerklaerung. Verantwortlich im Sinne der DSGVO ist die Muster " "GmbH, Musterstrasse 1, 12345 Berlin. E-Mail: info@muster.de. " "Datenschutzbeauftragter: dsb@muster.de. Zwecke der Verarbeitung und " "Rechtsgrundlage Art. 6 Abs. 1 lit. f berechtigtes Interesse. Empfaenger " "Ihrer Daten sind Auftragsverarbeiter. Speicherdauer der Daten richtet " "sich nach Aufbewahrungsfristen. Sie haben das Recht auf Auskunft, das " "Recht auf Berichtigung, das Recht auf Loeschung sowie ein " "Widerspruchsrecht. Beschwerde bei der Aufsichtsbehoerde moeglich. Stand: " "Januar 2026. ") * 3 # ── Registrierung ──────────────────────────────────────────────────────── def test_dse_agent_registered(): agent = REGISTRY.get("dse") assert agent is not None assert agent.agent_version == "3.0" assert agent.doc_type == "dse" def test_owned_mc_ids_match_checklist(): # owned_mc_ids = die Boost-Pattern-IDs (aus ART13_CHECKLIST gehoben). assert MC_IDS == tuple(m.mc_id for m in MCS) assert len(MC_IDS) >= 10 # mind. die 10 L1-Pflichtfelder + L2 # ── Layer-0 Regex-Boost (deterministisch, ohne DB) ─────────────────────── def test_regex_boost_detects_core_fields(): boosts = compute_regex_boosts(_DSE_SAMPLE) # Die zentralen Art-13-Felder müssen erkannt werden. for field in ("controller", "legal_basis", "rights", "complaint", "retention", "dse_version_date"): assert field in boosts, f"{field} nicht erkannt: {sorted(boosts)}" def test_regex_boost_empty_on_short_text(): assert compute_regex_boosts("zu kurz") == set() def test_criteria_on_topic_accepts_dse_rejects_foreign(): dse_crit = ["Rechtsgrundlage gemäß Art. 6 DSGVO benannt", "Speicherdauer und Löschfrist angegeben"] assert criteria_on_topic(dse_crit) is True foreign = ["Bestellbestätigung wird per E-Mail versendet", "Versandkosten werden im Warenkorb angezeigt"] assert criteria_on_topic(foreign) is False # leere Kriterien → konservativ on-topic behalten assert criteria_on_topic([]) is True def test_boost_matches_db_mc_third_country(): boosts = {"third_country", "controller"} crit = ["Standardvertragsklauseln für Drittland benannt", "Geeignete Garantien bei Übermittlung in ein Drittland"] assert boost_matches_db_mc(boosts, crit) == "third_country" # ohne passende Boosts → None assert boost_matches_db_mc(set(), crit) is None # ── Agent-Pfad mit gemocktem run_v3_pipeline ───────────────────────────── def _mock_v3(results, telemetry=None): async def _fake(text, scope, db_url="", skip_embedding=False): return results, (telemetry or { "total_mcs": len(results), "layer_0_field_hits": 0, "layer_0_field_ids": [], "layer_0_boost_overrides": 0, "sector_dropped": 0, "offtopic_dropped": 0}) return _fake def _run(text, context=None): return asyncio.run(REGISTRY.get("dse").evaluate( AgentInput(doc_type="dse", text=text, context=context or {}))) def test_dse_short_text_skips(): out = _run("zu kurz") assert out.confidence == 0.0 assert out.mc_coverage and all( c.status == "skipped" for c in out.mc_coverage) def test_dse_findings_from_failed_db_mc(monkeypatch): results = [{ "control_id": "DATA-525-A17", "passed": False, "severity": "HIGH", "label": "Berechtigte Interessen ausweisen", "regulation": None, "article": None, "_pass_criteria": ["berechtigtes interesse benannt"], "matched_text": "", "source": "keyword_match", }, { "control_id": "AUTH-2051-A11", "passed": True, "severity": "LOW", "label": "Prägnante Form", "regulation": None, "article": None, "_pass_criteria": [], "matched_text": "ok", }] monkeypatch.setattr(dse_agent, "run_v3_pipeline", _mock_v3(results)) out = _run(_DSE_SAMPLE, context={"skip_llm": True}) fids = {f.field_id for f in out.findings} assert "DATA-525-A17" in fids # failed → Finding assert "AUTH-2051-A11" not in fids # passed → kein Finding f = next(f for f in out.findings if f.field_id == "DATA-525-A17") assert f.severity == "HIGH" assert f.norm == "DSGVO Art. 13/14" # NULL-regulation → Fallback-Norm assert len(f.action) < 410 def test_dse_third_country_override_to_high(monkeypatch): # MEDIUM-Drittland-MC → HIGH bei dokumentiertem Transfer (scan_context). results = [{ "control_id": "DATA-900-A01", "passed": False, "severity": "MEDIUM", "label": "Drittlandtransfer Schutzgarantien benennen", "regulation": None, "article": None, "_pass_criteria": ["standardvertragsklauseln", "drittland garantien"], "matched_text": "", "source": "keyword_match", }] monkeypatch.setattr(dse_agent, "run_v3_pipeline", _mock_v3(results)) out = _run(_DSE_SAMPLE, context={ "skip_llm": True, "scan_context": {"third_country_transfer": "yes"}}) f = next(f for f in out.findings if f.field_id == "DATA-900-A01") assert f.severity == "HIGH" assert f.severity_reason == "db_mc_failed_third_country_transfer" def test_dse_no_transfer_keeps_medium(monkeypatch): results = [{ "control_id": "DATA-900-A01", "passed": False, "severity": "MEDIUM", "label": "Drittlandtransfer Schutzgarantien benennen", "regulation": None, "article": None, "_pass_criteria": ["standardvertragsklauseln", "drittland garantien"], "matched_text": "", "source": "keyword_match", }] monkeypatch.setattr(dse_agent, "run_v3_pipeline", _mock_v3(results)) out = _run(_DSE_SAMPLE, context={"skip_llm": True}) f = next(f for f in out.findings if f.field_id == "DATA-900-A01") assert f.severity == "MEDIUM"