"""DSEAgent v3 (4-Layer: Regex-Boost / Keyword / BGE-M3-Recall / Semantic). DB (_load_controls), Embedding-Service und LLM sind offline gestubbt → die Tests sind deterministisch und brauchen kein Netzwerk. Die reinen Schichten (compute_regex_boosts, embedding_recall-Reachability) werden direkt geprüft, die Result→Finding-Konvertierung über einen gestubbten run_v3_pipeline. """ import asyncio import compliance.services.specialist_agents.dse.agent as dse_agent from compliance.services.specialist_agents import REGISTRY, AgentInput from compliance.services.specialist_agents.dse._embedding_recall import ( embedding_recall, ) from compliance.services.specialist_agents.dse.regex_boost import ( compute_regex_boosts, ) _TELEMETRY = { "layer_0_field_hits": 0, "layer_0_field_ids": [], "layer_1_pass": 0, "embedding_passes": 0, "total_mcs": 1, "sector_dropped": 0, "offtopic_dropped": 0, "gate_excluded": 0, "organizational_checklist": [], } def _pipeline_stub(results): async def _stub(text, scope): return results, dict(_TELEMETRY, total_mcs=len(results)) return _stub def _evaluate(text, context=None): return asyncio.run(dse_agent.DSEAgent().evaluate( AgentInput(doc_type="dse", text=text, context=context or {}))) def test_dse_agent_registered_is_v3(): agent = REGISTRY.get("dse") assert agent is not None and agent.agent_version == "3.0" def test_dse_short_text_skips(): out = _evaluate("zu kurz") assert out.confidence == 0.0 assert all(c.status == "skipped" for c in out.mc_coverage) def test_regex_boost_detects_core_fields(): text = ("Verantwortlicher im Sinne der DSGVO ist die Muster GmbH. " "Rechtsgrundlage ist Art. 6. Speicherdauer der Daten. Beschwerde " "bei der Aufsichtsbehoerde. ") * 2 hits = compute_regex_boosts(text, set()) assert {"controller", "legal_basis", "retention", "complaint"} & hits def test_embedding_recall_offline_returns_empty(): # Kein Embedding-Service (Unit) -> Reachability-Guard -> leeres Set, kein Hang. got = asyncio.run(embedding_recall("x" * 200, ["DSE-X-1"])) assert got == set() def test_evaluate_builds_finding_from_failed_db_mc(monkeypatch): monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{ "control_id": "DATA-RETENTION-1", "passed": False, "severity": "MEDIUM", "label": "Speicherdauer der Daten", "regulation": "DSGVO", "article": "13", "source": "keyword_match", }])) out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True}) f = next((f for f in out.findings if f.field_id == "DATA-RETENTION-1"), None) assert f is not None and f.severity == "MEDIUM" assert f.action and len(f.action) <= 400 def test_evaluate_passed_db_mc_no_finding(monkeypatch): monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{ "control_id": "PURPOSE-1", "passed": True, "severity": "MEDIUM", "label": "Zwecke", "matched_text": "Zwecke der Verarbeitung", }])) out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True}) assert "PURPOSE-1" not in [f.field_id for f in out.findings] assert any(c.mc_id == "PURPOSE-1" and c.status == "ok" for c in out.mc_coverage) def test_evaluate_third_country_high_on_documented_transfer(monkeypatch): monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{ "control_id": "TRANSFER-1", "passed": False, "severity": "MEDIUM", "label": "Drittlanduebermittlung", "regulation": "DSGVO", "article": "13", }])) out = _evaluate( "Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True, "scan_context": {"third_country_transfer": "yes"}}) f = next((f for f in out.findings if f.field_id == "TRANSFER-1"), None) assert f is not None and f.severity == "HIGH" assert f.severity_reason == "db_mc_failed_third_country_transfer"