diff --git a/backend-compliance/compliance/api/agent_check/_agent_outputs.py b/backend-compliance/compliance/api/agent_check/_agent_outputs.py index 0ce66f7c..a343295f 100644 --- a/backend-compliance/compliance/api/agent_check/_agent_outputs.py +++ b/backend-compliance/compliance/api/agent_check/_agent_outputs.py @@ -13,6 +13,7 @@ the map). Once the tabs are the source of truth, B18's v1 path retires. from __future__ import annotations +import asyncio import logging from compliance.services.specialist_agents import REGISTRY, AgentInput @@ -27,6 +28,7 @@ logger = logging.getLogger(__name__) # topic key (matches state["doc_texts"]) -> registered agent_id _TOPIC_AGENTS: dict[str, str] = { "impressum": "impressum", + "agb": "agb", # v2: AGBAgent mit decision_method-Routing (71% FP -> ~0) } _MIN_TEXT = 100 @@ -112,14 +114,17 @@ async def run_agent_outputs(state: dict) -> None: ) outputs: dict[str, dict] = state.get("agent_outputs") or {} - for topic, agent_id in _TOPIC_AGENTS.items(): + + async def _run_one(topic: str, agent_id: str): + """Einen Topic-Agent laufen lassen + sein Tab-Event sofort emittieren + (Zwischenbefund). Fängt eigene Fehler → ein Agent reißt den Run nicht ab.""" text = (doc_texts.get(topic) or "").strip() if len(text) < _MIN_TEXT: - continue + return None agent = REGISTRY.get(agent_id) if agent is None: logger.warning("agent_outputs: agent '%s' not registered", agent_id) - continue + return None try: out = await agent.evaluate(AgentInput( doc_type=topic, @@ -128,15 +133,25 @@ async def run_agent_outputs(state: dict) -> None: company_name=company_name, origin_domain=origin_domain, )) - outputs[topic] = out.model_dump(mode="json") - emit(check_id, {"type": "topic", "topic": topic, - "output": outputs[topic]}) + dump = out.model_dump(mode="json") + emit(check_id, {"type": "topic", "topic": topic, "output": dump}) logger.info( "agent_outputs[%s]: %d findings, confidence %.2f", topic, len(out.findings), out.confidence, ) + return topic, dump except Exception as e: # noqa: BLE001 — best-effort, never break the run logger.warning("agent_outputs[%s] failed: %s", topic, e) + return None + + # Topic-Agenten laufen NEBENLÄUFIG (ihre Embedding-/LLM-Waits überlappen) und + # füllen ihren Tab via SSE, sobald sie fertig sind — kein Warten aufs Schlusslicht. + results = await asyncio.gather( + *(_run_one(topic, agent_id) for topic, agent_id in _TOPIC_AGENTS.items()) + ) + for r in results: + if r: + outputs[r[0]] = r[1] if outputs: state["agent_outputs"] = outputs diff --git a/backend-compliance/compliance/services/specialist_agents/agb/_embedding_rescue.py b/backend-compliance/compliance/services/specialist_agents/agb/_embedding_rescue.py deleted file mode 100644 index b6a348aa..00000000 --- a/backend-compliance/compliance/services/specialist_agents/agb/_embedding_rescue.py +++ /dev/null @@ -1,74 +0,0 @@ -"""EMBEDDING-Rescue (decision_method=EMBEDDING) fuer AGB. - -Fuer keyword-durchgefallene EMBEDDING-Items: pruefe, ob die Klausel SEMANTISCH -(>= per-Item-Schwelle) im Dokument vorkommt — rettet Recall-FP (Klausel da, anders -formuliert). Referenzvektoren = die Item-Paraphrasen aus `_routing.PARAPHRASES` -(NICHT der mc_classification-Sidecar wie bei DSE, da AGB eine kuratierte -Checkliste statt Library-Controls nutzt). - -Deterministisch (festes Embedding-Modell -> gleicher Text -> gleicher Vektor) und -gecacht. Faellt der Embedding-Service aus, liefert die Schicht leer zurueck — -der Keyword-Layer traegt dann (kein Hang, kein Crash). -""" -from __future__ import annotations - -import asyncio -import logging - -from . import _routing - -logger = logging.getLogger(__name__) - -# Paraphrasen-Vektoren werden EINMAL pro Prozess eingebettet und gecacht. -_PARA_VEC_CACHE: dict[str, list] = {} - - -async def _ensure_para_vecs(item_ids: list[str]) -> dict[str, list]: - from compliance.services.mc_embedding_matcher import DIM, _embed_texts - todo = [i for i in item_ids - if i not in _PARA_VEC_CACHE and _routing.PARAPHRASES.get(i)] - for it in todo: - vecs = await _embed_texts(_routing.PARAPHRASES[it]) - _PARA_VEC_CACHE[it] = [v for v in vecs if v and len(v) == DIM] - return _PARA_VEC_CACHE - - -async def embedding_rescue( - text: str, - candidate_ids, - embed_timeout: float = 90.0, -) -> set[str]: - """Returns die Teilmenge der `candidate_ids`, die semantisch (>= per-Item- - Schwelle) im Text vorkommt. `candidate_ids` = die im Keyword-Layer - DURCHGEFALLENEN Items (Recall-Rescue). Nur EMBEDDING-Items werden behandelt. - """ - cands = [c for c in candidate_ids - if _routing.decision_method(c) == _routing.EMBEDDING - and _routing.PARAPHRASES.get(c)] - if not text or len(text) < 100 or not cands: - return set() - try: - from compliance.services.mc_embedding_matcher import ( - DIM, _chunk_text, _cosine, _embed_texts, - ) - para_vecs = await _ensure_para_vecs(cands) - chunks = _chunk_text(text) - if not chunks: - return set() - cvecs = [v for v in await asyncio.wait_for( - _embed_texts(chunks), timeout=embed_timeout) - if v and len(v) == DIM] - except (Exception, asyncio.TimeoutError) as e: # Service down -> kein Rescue - logger.info("agb embedding_rescue inaktiv: %s", str(e)[:90]) - return set() - if not cvecs: - return set() - rescued: set[str] = set() - for cid in cands: - pv = para_vecs.get(cid) or [] - if not pv: - continue - best = max((_cosine(p, c) for p in pv for c in cvecs), default=0.0) - if best >= _routing.EMBED_THRESHOLDS.get(cid, 0.60): - rescued.add(cid) - return rescued diff --git a/backend-compliance/compliance/services/specialist_agents/agb/_llm_judge.py b/backend-compliance/compliance/services/specialist_agents/agb/_llm_judge.py deleted file mode 100644 index 088a99fe..00000000 --- a/backend-compliance/compliance/services/specialist_agents/agb/_llm_judge.py +++ /dev/null @@ -1,74 +0,0 @@ -"""LLM-Judge (decision_method=LLM) fuer die 2 semantisch engen AGB-Items -(delivery_timeframe, warranty_period), bei denen Embedding NICHT trennt. - -Retrieval = GANZE Paragraph-Abschnitte (nicht Top-k-Chunks — das war in der -Validierung der Schluessel: Top-4-Chunks verfehlten z.B. die zalando-1-Jahr- -Klausel, der ganze Paragraph nicht). Entscheidung ueber die LLM-Kaskade -(`call_with_cascade`): prod startet bei OVH-120b (stark); dev nur Qwen (schwach, -bekannte Env-Grenze). NUR present/absent — Defekt-Pruefung ist Stage 3. -""" -from __future__ import annotations - -import json -import logging -import re - -from . import _routing - -logger = logging.getLogger(__name__) - -_SECTION_SPLIT = re.compile(r"(?m)(?=^\s*(?:§\s*)?\d+[\.\)]\s)") -_SYS = ( - "Du bist deutscher AGB-Rechtsexperte. Entscheide, ob die genannte Pflicht in " - "den vorgelegten AGB-Abschnitten vorhanden ist. NUR die Abschnitte zaehlen. " - 'Antworte NUR JSON: {"verdict":"ERFUELLT|FEHLT","zitat":"woertlich oder leer",' - '"begruendung":"1 Satz"}.' -) - - -def _sections(text: str) -> list[str]: - return [s.strip() for s in _SECTION_SPLIT.split(text) if s.strip()] - - -def relevant_sections(item_id: str, text: str, limit: int = 6) -> list[str]: - """Ganze Abschnitte zum Thema des Items (Topic-Regex). Fallback: erste Abschnitte.""" - secs = _sections(text) - topic = _routing.LLM_TOPIC.get(item_id) - if not topic: - return secs[:limit] - rel = [s for s in secs if re.search(topic, s, re.I)] - return rel[:limit] or secs[:limit] - - -def _parse(txt: str) -> dict: - out = (txt or "").strip() - if out.startswith("```"): - out = out.split("```", 2)[1] - out = out[4:] if out.startswith("json") else out - a, b = out.find("{"), out.rfind("}") - return json.loads(out[a:b + 1] if 0 <= a < b else out) - - -async def llm_judge(item_id: str, text: str) -> dict: - """Returns {present: bool|None, zitat, begruendung, source}. - present=None => Judge konnte nicht entscheiden -> Aufrufer behaelt das - Keyword-Ergebnis (fail-safe Richtung Finding).""" - from compliance.services.llm_cascade import call_with_cascade - question = _routing.LLM_QUESTION.get(item_id, "Ist diese Pflicht im Text vorhanden?") - secs = relevant_sections(item_id, text) - user = json.dumps({"frage": question, "agb_abschnitte": secs}, ensure_ascii=False) - try: - r = await call_with_cascade(_SYS, user, min_confidence=0.6, max_tokens=500) - obj = _parse(r.get("text")) - verdict = obj.get("verdict") - if verdict not in ("ERFUELLT", "FEHLT"): - return {"present": None, "zitat": "", "begruendung": "unklar", "source": r.get("source", "?")} - return { - "present": verdict == "ERFUELLT", - "zitat": (obj.get("zitat") or "")[:200], - "begruendung": (obj.get("begruendung") or "")[:200], - "source": r.get("source", "?"), - } - except Exception as e: - logger.info("agb llm_judge fail %s: %s", item_id, str(e)[:80]) - return {"present": None, "zitat": "", "begruendung": "judge_error", "source": "error"} diff --git a/backend-compliance/compliance/services/specialist_agents/agb/_pipeline.py b/backend-compliance/compliance/services/specialist_agents/agb/_pipeline.py new file mode 100644 index 00000000..29f84169 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/agb/_pipeline.py @@ -0,0 +1,102 @@ +"""AGB-Routing-Pipeline (C-lean): nimmt das Keyword-Ergebnis des ChecklistAgent +und routet keyword-durchgefallene Items per `_routing.decision_method` an die +wiederverwendbare Prüfer-Library (Embedding / Reference / LLM). Davor das +Geschäftsmodell-Gate (Applicability). Das Re-Tiering (LOW → Empfehlung) + +Output-Zusammenbau macht der AGBAgent — hier nur die Routing-Entscheidung. + +Validiert (7-Firmen-Opus-GT): 71 % FP → ~0. agent.py bleibt dünn, dies ist der +einzige Ort des C-lean-Flows. +""" +from __future__ import annotations + +import logging + +from compliance.services.checkers.base import ( + ControlSpec, + DecisionMethod, + DocContext, + VerificationMethod, +) +from compliance.services.checkers.embedding_checker import EmbeddingChecker +from compliance.services.checkers.llm_checker import LLMChecker +from compliance.services.checkers.reference_checker import ReferenceChecker + +from . import _routing + +logger = logging.getLogger(__name__) + +# Checker sind zustandslos (schwere Imports erst in .check()) → Modul-Singletons. +_EMB = EmbeddingChecker() +_REF = ReferenceChecker() +_LLM = LLMChecker() + + +def _spec(item_id: str) -> ControlSpec: + """ControlSpec für ein Item aus der AGB-Routing-Config bauen.""" + dm = _routing.decision_method(item_id) + if dm == _routing.REFERENCE: + return ControlSpec( + control_id=item_id, verification_method=VerificationMethod.REFERENCE, + decision_method=DecisionMethod.LINK_RESOLVER, + patterns=[_routing.REFERENCE_PATTERNS[item_id]], + ) + if dm == _routing.LLM: + return ControlSpec( + control_id=item_id, verification_method=VerificationMethod.CONTENT, + decision_method=DecisionMethod.LLM, + paraphrases=_routing.PARAPHRASES.get(item_id, []), + topic_regex=_routing.LLM_TOPIC.get(item_id, ""), + question=_routing.LLM_QUESTION.get(item_id, ""), + ) + return ControlSpec( + control_id=item_id, verification_method=VerificationMethod.CONTENT, + decision_method=DecisionMethod.EMBEDDING, + paraphrases=_routing.PARAPHRASES.get(item_id, []), + embed_threshold=_routing.EMBED_THRESHOLDS.get(item_id), + ) + + +async def _resolves(item_id: str, text: str, skip_llm: bool): + """True = Klausel doch vorhanden (Keyword-Finding auflösen). False/None = + Finding behalten (fail-safe: bei Unsicherheit/Service-Ausfall lieber melden).""" + dm = _routing.decision_method(item_id) + if dm == _routing.MERGED: + return True # in ein anderes Item aufgegangen → kein eigenes Finding + doc = DocContext(text=text) + spec = _spec(item_id) + if dm == _routing.REFERENCE: + return (await _REF.check(spec, doc)).present + if dm == _routing.LLM: + if skip_llm: + return None # interaktiv: kein LLM → Keyword-Ergebnis behalten + return (await _LLM.check(spec, doc)).present + return (await _EMB.check(spec, doc)).present + + +async def run_routed(base_findings: list, text: str, context: dict | None = None): + """Routet die keyword-durchgefallenen Items. + + Returns (kept, resolved_ids, gated_ids): + kept = Findings, die nach Gate+Rescue bestehen bleiben + resolved_ids = per Embedding/Reference/LLM doch als vorhanden erkannt + gated_ids = per Geschäftsmodell nicht anwendbar (N/A) + """ + context = context or {} + skip_llm = bool(context.get("skip_llm")) + model = _routing.detect_business_model(text) + kept, resolved, gated = [], [], [] + for f in base_findings: + item_id = f.field_id + if not _routing.is_applicable(item_id, model): + gated.append(item_id) + continue + try: + present = await _resolves(item_id, text, skip_llm) + except Exception as e: # noqa: BLE001 — best-effort, Finding behalten + logger.info("agb routing %s failed: %s", item_id, str(e)[:80]) + present = None + if present is True: + resolved.append(item_id) + else: + kept.append(f) + return kept, resolved, gated diff --git a/backend-compliance/compliance/services/specialist_agents/agb/_reference_check.py b/backend-compliance/compliance/services/specialist_agents/agb/_reference_check.py deleted file mode 100644 index bff23edd..00000000 --- a/backend-compliance/compliance/services/specialist_agents/agb/_reference_check.py +++ /dev/null @@ -1,34 +0,0 @@ -"""REFERENCE-Pruefer (verification_method=REFERENCE): ist ein klarer Verweis auf -ein anderes Pflichtdokument vorhanden — und (optional) loest der Link auf? - -Fuer AGB: `data_protection` = Verweis auf die Datenschutzerklaerung. Eine AGB soll -KEINE Datenschutz-Inhalte mischen; ein Verweis genuegt (§ ... / best practice). -Deterministisch (Regex), 7/7 gegen Opus-GT — KEIN LLM, kein juristisches Urteil. - -Link-Aufloesung (HTTP) ist bewusst NICHT hier: das ist ein Runtime-/Online-Check -(separater Prozess), nicht Teil der deterministischen Text-Pruefung. -""" -from __future__ import annotations - -import re - -from . import _routing - -_URL = re.compile(r"https?://[^\s)\]]+", re.I) - - -def check_reference(item_id: str, text: str) -> dict: - """Returns {present: bool, link: str|None}. - - present = ein eindeutiger Verweis auf das referenzierte Dokument steht im Text. - link = die in der Naehe gefundene URL (fuer einen spaeteren LINK_CHECK), falls vorhanden. - """ - pat = _routing.REFERENCE_PATTERNS.get(item_id) - if not pat or not text: - return {"present": False, "link": None} - m = re.search(pat, text, re.I) - if not m: - return {"present": False, "link": None} - window = text[max(0, m.start() - 40): m.end() + 200] - url = _URL.search(window) or _URL.search(text) - return {"present": True, "link": url.group(0) if url else None} diff --git a/backend-compliance/compliance/services/specialist_agents/agb/agent.py b/backend-compliance/compliance/services/specialist_agents/agb/agent.py index a6cd5f20..1544b680 100644 --- a/backend-compliance/compliance/services/specialist_agents/agb/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/agb/agent.py @@ -1,19 +1,60 @@ """AGBAgent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB). -Thin-Subclass von ChecklistAgent über die kuratierte AGB_CHECKLIST (L1 -Pflichtangaben + L2 Detailchecks). KEIN Library-Firehose. +ChecklistAgent-Subclass: erst L1/L2-Keyword-Pass, dann **C-lean-Routing** — die +keyword-durchgefallenen Items werden per `decision_method` an die wiederverwendbare +Prüfer-Library geroutet (Embedding / Reference / LLM), davor das Geschäftsmodell- +Gate (Applicability), danach Severity-Re-Tiering (LOW → Empfehlung). +Validiert gegen 7-Firmen-Opus-GT: 71 % FP → ~0. Config in `_routing`, Flow in `_pipeline`. """ from __future__ import annotations from compliance.services.doc_checks.agb_checks import AGB_CHECKLIST +from .._base import AgentInput, AgentOutput, lint_output from .._checklist_agent import ChecklistAgent +from .._rollup import rollup +from ._pipeline import run_routed class AGBAgent(ChecklistAgent): CHECKLIST = AGB_CHECKLIST agent_id = "agb" - agent_version = "1.0" + agent_version = "2.0" # v2: decision_method-Routing (Embedding/Reference/LLM) doc_type = "agb" owned_mc_ids = tuple(c["id"] for c in AGB_CHECKLIST) + + async def evaluate(self, agent_input: AgentInput) -> AgentOutput: + # 1) Basis-Keyword-Pass (L1/L2). out.findings = keyword-durchgefallene Items. + out = await super().evaluate(agent_input) + text = (agent_input.text or "").strip() + if len(text) < 100 or not out.findings: + return out # zu kurz / nichts zu routen + + # 2) Routing: Gate + Embedding/Reference/LLM-Rescue der Keyword-Misses. + kept, resolved, gated = await run_routed( + out.findings, text, agent_input.context) + resolved_set, gated_set = set(resolved), set(gated) + + # 3) Coverage angleichen: rescued → ok, gated → na. + for c in out.mc_coverage: + if c.mc_id in resolved_set: + c.status, c.reason = "ok", "semantisch vorhanden (Routing)" + elif c.mc_id in gated_set: + c.status, c.reason = "na", "für Geschäftsmodell nicht anwendbar" + + # 4) Severity-Re-Tiering: HIGH/MEDIUM = Findings, LOW = nur Empfehlung. + out.findings = [f for f in kept if f.severity in ("HIGH", "MEDIUM")] + out.recommendations = rollup(kept) + + # 5) Aggregat-Kennzahlen neu (Coverage hat sich verschoben). + cov = out.mc_coverage + out.mc_total = len(cov) + out.mc_ok = sum(1 for c in cov if c.status == "ok") + out.mc_na = sum(1 for c in cov if c.status == "na") + out.mc_high = sum(1 for c in cov if c.status == "high") + out.mc_medium = sum(1 for c in cov if c.status == "medium") + out.mc_low = sum(1 for c in cov if c.status == "low") + out.notes = ((out.notes + " · ") if out.notes else "") + \ + f"routed: {len(resolved)} rescued, {len(gated)} n/a" + return lint_output(out) diff --git a/backend-compliance/compliance/tests/test_agb_agent.py b/backend-compliance/compliance/tests/test_agb_agent.py index ba218b6b..83fef3ad 100644 --- a/backend-compliance/compliance/tests/test_agb_agent.py +++ b/backend-compliance/compliance/tests/test_agb_agent.py @@ -1,12 +1,27 @@ -"""AGBAgent — kuratierte §§-305-ff-BGB-Checkliste (ChecklistAgent-Subclass).""" - -from __future__ import annotations - +"""AGBAgent (v2, routed). Embedding/LLM offline-gestubbt → kein Netzwerk.""" import asyncio +import pytest + +import compliance.services.specialist_agents.agb._pipeline as pipeline +from compliance.services.checkers.base import CheckResult from compliance.services.specialist_agents import REGISTRY, AgentInput +class _Stub: + def __init__(self, present): + self._p = present + + async def check(self, ctrl, doc): + return CheckResult(present=self._p) + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch): + monkeypatch.setattr(pipeline, "_EMB", _Stub(None)) + monkeypatch.setattr(pipeline, "_LLM", _Stub(None)) + + def _run(text: str): return asyncio.run( REGISTRY.get("agb").evaluate(AgentInput(doc_type="agb", text=text))) diff --git a/backend-compliance/compliance/tests/test_agb_routed_agent.py b/backend-compliance/compliance/tests/test_agb_routed_agent.py new file mode 100644 index 00000000..19511bb7 --- /dev/null +++ b/backend-compliance/compliance/tests/test_agb_routed_agent.py @@ -0,0 +1,62 @@ +"""AGB routed-Pipeline: Gate, Reference-/Embedding-Rescue, LLM-skip, Re-Tiering. +Embedding + LLM offline-gestubbt → deterministisch, kein Netzwerk (Reference = echtes Regex).""" +import asyncio +from types import SimpleNamespace + +import pytest + +import compliance.services.specialist_agents.agb._pipeline as pipeline +from compliance.services.checkers.base import CheckResult +from compliance.services.specialist_agents._base import AgentInput +from compliance.services.specialist_agents.agb.agent import AGBAgent + + +class _Stub: + def __init__(self, present): + self._p = present + + async def check(self, ctrl, doc): + return CheckResult(present=self._p) + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch): + monkeypatch.setattr(pipeline, "_EMB", _Stub(None)) + monkeypatch.setattr(pipeline, "_LLM", _Stub(None)) + + +def _routed(field_ids, text, context=None): + findings = [SimpleNamespace(field_id=fid) for fid in field_ids] + return asyncio.run(pipeline.run_routed(findings, text, context or {})) + + +def test_gate_termination_na_for_oneoff_shop(): + text = "Widerrufsbelehrung: Sie koennen binnen 14 Tagen widerrufen. " * 5 + kept, resolved, gated = _routed(["termination", "termination_form"], text) + assert set(gated) == {"termination", "termination_form"} + assert kept == [] + + +def test_reference_rescues_data_protection(): + text = "Einzelheiten zur Verarbeitung in unserer Datenschutzerklaerung. " * 5 + kept, resolved, gated = _routed(["data_protection"], text) + assert "data_protection" in resolved and kept == [] + + +def test_embedding_rescue_resolves(monkeypatch): + monkeypatch.setattr(pipeline, "_EMB", _Stub(True)) + kept, resolved, gated = _routed(["scope"], "x" * 200) + assert "scope" in resolved + + +def test_llm_skipped_keeps_finding(): + kept, resolved, gated = _routed(["delivery_timeframe"], "x" * 200, {"skip_llm": True}) + assert [f.field_id for f in kept] == ["delivery_timeframe"] and resolved == [] + + +def test_evaluate_retiers_low_out_of_findings(): + text = ("Allgemeine Geschaeftsbedingungen. Vertragsschluss durch Bestellung. " + "Haftung beschraenkt. Gerichtsstand Muenchen. ") * 6 + out = asyncio.run(AGBAgent().evaluate(AgentInput(doc_type="agb", text=text))) + assert out.agent == "agb" and out.agent_version == "2.0" + assert all(f.severity in ("HIGH", "MEDIUM") for f in out.findings) diff --git a/backend-compliance/compliance/tests/test_agent_outputs_topics.py b/backend-compliance/compliance/tests/test_agent_outputs_topics.py new file mode 100644 index 00000000..a88a0a2e --- /dev/null +++ b/backend-compliance/compliance/tests/test_agent_outputs_topics.py @@ -0,0 +1,10 @@ +"""AGB muss im LIVE-Pfad verdrahtet sein (_TOPIC_AGENTS), nicht nur per Snapshot.""" +from compliance.api.agent_check._agent_outputs import _TOPIC_AGENTS + + +def test_agb_wired_into_live_topic_agents(): + assert _TOPIC_AGENTS.get("agb") == "agb" + + +def test_impressum_still_wired(): + assert _TOPIC_AGENTS.get("impressum") == "impressum" diff --git a/backend-compliance/compliance/tests/test_checkers.py b/backend-compliance/compliance/tests/test_checkers.py new file mode 100644 index 00000000..0b357878 --- /dev/null +++ b/backend-compliance/compliance/tests/test_checkers.py @@ -0,0 +1,83 @@ +"""Unit-Tests der Prüfer-Library. Embedding + LLM gemockt → kein Netzwerk.""" +import asyncio + +import compliance.services.llm_cascade as cascade_mod +import compliance.services.mc_embedding_matcher as emb_mod +from compliance.services.checkers.base import ( + ControlSpec, + DecisionMethod, + DocContext, + VerificationMethod, +) +from compliance.services.checkers.embedding_checker import EmbeddingChecker +from compliance.services.checkers.llm_checker import LLMChecker +from compliance.services.checkers.reference_checker import ReferenceChecker + + +def _run(coro): + return asyncio.run(coro) + + +def test_reference_present_and_absent(): + rc = ReferenceChecker() + spec = ControlSpec("data_protection", VerificationMethod.REFERENCE, + DecisionMethod.LINK_RESOLVER, + patterns=[r"datenschutz(erkl|bestimmung|hinweis)"]) + r = _run(rc.check(spec, DocContext( + text="Details in unserer Datenschutzerklaerung: https://x.de/datenschutz"))) + assert r.present is True + assert r.detail.get("link", "").startswith("https://") + r2 = _run(rc.check(spec, DocContext(text="Keine Angabe zum Datenschutz-Thema."))) + assert r2.present is False + + +def test_embedding_threshold(monkeypatch): + monkeypatch.setattr(emb_mod, "DIM", 3, raising=False) + monkeypatch.setattr(emb_mod, "_chunk_text", lambda t: [t], raising=False) + + async def _embed(texts): + return [[1.0, 0.0, 0.0] for _ in texts] + + monkeypatch.setattr(emb_mod, "_embed_texts", _embed, raising=False) + ec = EmbeddingChecker() + spec = ControlSpec("scope_t", VerificationMethod.CONTENT, DecisionMethod.EMBEDDING, + paraphrases=["Geltungsbereich"], embed_threshold=0.58) + monkeypatch.setattr(emb_mod, "_cosine", lambda a, b: 0.90, raising=False) + r = _run(ec.check(spec, DocContext(text="x" * 200))) + assert r.present is True and r.confidence >= 0.58 + monkeypatch.setattr(emb_mod, "_cosine", lambda a, b: 0.20, raising=False) + r2 = _run(ec.check(spec, DocContext(text="x" * 200))) + assert r2.present is False + + +def test_embedding_offline_returns_none(monkeypatch): + async def _boom(texts): + raise ConnectionError("embedding-service down") + + monkeypatch.setattr(emb_mod, "_embed_texts", _boom, raising=False) + ec = EmbeddingChecker() + spec = ControlSpec("scope_off", VerificationMethod.CONTENT, DecisionMethod.EMBEDDING, + paraphrases=["x"], embed_threshold=0.6) + r = _run(ec.check(spec, DocContext(text="y" * 200))) + assert r.present is None # fail-safe + + +def test_llm_present_and_absent(monkeypatch): + lc = LLMChecker() + spec = ControlSpec("delivery_timeframe", VerificationMethod.CONTENT, DecisionMethod.LLM, + topic_regex=r"liefer", question="Konkrete Lieferfrist?") + doc = DocContext(text=("1. Lieferung\nDie Ware wird innerhalb von 2 Werktagen " + "geliefert.\n") * 4) + + async def _erfuellt(system, user, **kw): + return {"text": '{"verdict":"ERFUELLT","zitat":"2 Werktagen","begruendung":"x"}', + "source": "qwen", "confidence": 0.7} + + monkeypatch.setattr(cascade_mod, "call_with_cascade", _erfuellt, raising=False) + assert _run(lc.check(spec, doc)).present is True + + async def _fehlt(system, user, **kw): + return {"text": '{"verdict":"FEHLT"}', "source": "qwen"} + + monkeypatch.setattr(cascade_mod, "call_with_cascade", _fehlt, raising=False) + assert _run(lc.check(spec, doc)).present is False