Files
breakpilot-compliance/backend-compliance/compliance/tests/test_dse_agent.py
T
Benjamin Admin f6d018234b feat(dse): recover v3 DSE engine from container + wire into live check path
The calibrated DSE engine (4-layer: regex-boost / keyword / BGE-M3 embedding
recall @0.65 / semantic-validator) existed ONLY in the running macmini
container (docker cp'd, never committed) — at risk of loss on any container
rebuild. This recovers it into git and wires it into the live check path.

- Recover dse/{agent,v3_engine,_embedding_recall,_classification_gate,
  regex_boost,mcs,deep_check}.py. DSEAgent (v3, BaseSpecialistAgent) replaces
  the keyword-only stub: delegates MC-loading to the main engine
  (rag_document_checker._load_controls), deterministic cached embedding recall
  (reachability-gated), semantic-validator LLM layer honoring skip_llm,
  third-country -> HIGH on documented transfer.
- Wire "dse" into _agent_outputs._TOPIC_AGENTS -> live check emits a validated
  DSE tab (was snapshot/legacy-only).
- Tests rewritten for v3 (DB/embedding/LLM stubbed offline): regex-boost
  detection, embedding-recall reachability guard, result->Finding conversion,
  third-country HIGH; topic-wiring asserts "dse".
- deep_check.py recovered for preservation (alternate LLM-judge path, unwired).

Runtime data deps for full live behavior (note for prod): doc_check_controls
in DB + /data/mc_classification.db embedding sidecar + embedding-service; all
degrade gracefully (keyword layer carries) if absent.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 11:15:52 +02:00

97 lines
3.9 KiB
Python

"""DSEAgent v3 (4-Layer: Regex-Boost / Keyword / BGE-M3-Recall / Semantic).
DB (_load_controls), Embedding-Service und LLM sind offline gestubbt → die Tests
sind deterministisch und brauchen kein Netzwerk. Die reinen Schichten
(compute_regex_boosts, embedding_recall-Reachability) werden direkt geprüft, die
Result→Finding-Konvertierung über einen gestubbten run_v3_pipeline.
"""
import asyncio
import compliance.services.specialist_agents.dse.agent as dse_agent
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.services.specialist_agents.dse._embedding_recall import (
embedding_recall,
)
from compliance.services.specialist_agents.dse.regex_boost import (
compute_regex_boosts,
)
_TELEMETRY = {
"layer_0_field_hits": 0, "layer_0_field_ids": [],
"layer_1_pass": 0, "embedding_passes": 0, "total_mcs": 1,
"sector_dropped": 0, "offtopic_dropped": 0,
"gate_excluded": 0, "organizational_checklist": [],
}
def _pipeline_stub(results):
async def _stub(text, scope):
return results, dict(_TELEMETRY, total_mcs=len(results))
return _stub
def _evaluate(text, context=None):
return asyncio.run(dse_agent.DSEAgent().evaluate(
AgentInput(doc_type="dse", text=text, context=context or {})))
def test_dse_agent_registered_is_v3():
agent = REGISTRY.get("dse")
assert agent is not None and agent.agent_version == "3.0"
def test_dse_short_text_skips():
out = _evaluate("zu kurz")
assert out.confidence == 0.0
assert all(c.status == "skipped" for c in out.mc_coverage)
def test_regex_boost_detects_core_fields():
text = ("Verantwortlicher im Sinne der DSGVO ist die Muster GmbH. "
"Rechtsgrundlage ist Art. 6. Speicherdauer der Daten. Beschwerde "
"bei der Aufsichtsbehoerde. ") * 2
hits = compute_regex_boosts(text, set())
assert {"controller", "legal_basis", "retention", "complaint"} & hits
def test_embedding_recall_offline_returns_empty():
# Kein Embedding-Service (Unit) -> Reachability-Guard -> leeres Set, kein Hang.
got = asyncio.run(embedding_recall("x" * 200, ["DSE-X-1"]))
assert got == set()
def test_evaluate_builds_finding_from_failed_db_mc(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "DATA-RETENTION-1", "passed": False, "severity": "MEDIUM",
"label": "Speicherdauer der Daten", "regulation": "DSGVO", "article": "13",
"source": "keyword_match",
}]))
out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True})
f = next((f for f in out.findings if f.field_id == "DATA-RETENTION-1"), None)
assert f is not None and f.severity == "MEDIUM"
assert f.action and len(f.action) <= 400
def test_evaluate_passed_db_mc_no_finding(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "PURPOSE-1", "passed": True, "severity": "MEDIUM",
"label": "Zwecke", "matched_text": "Zwecke der Verarbeitung",
}]))
out = _evaluate("Datenschutzerklaerung " + "x" * 200, context={"skip_llm": True})
assert "PURPOSE-1" not in [f.field_id for f in out.findings]
assert any(c.mc_id == "PURPOSE-1" and c.status == "ok" for c in out.mc_coverage)
def test_evaluate_third_country_high_on_documented_transfer(monkeypatch):
monkeypatch.setattr(dse_agent, "run_v3_pipeline", _pipeline_stub([{
"control_id": "TRANSFER-1", "passed": False, "severity": "MEDIUM",
"label": "Drittlanduebermittlung", "regulation": "DSGVO", "article": "13",
}]))
out = _evaluate(
"Datenschutzerklaerung " + "x" * 200,
context={"skip_llm": True,
"scan_context": {"third_country_transfer": "yes"}})
f = next((f for f in out.findings if f.field_id == "TRANSFER-1"), None)
assert f is not None and f.severity == "HIGH"
assert f.severity_reason == "db_mc_failed_third_country_transfer"