diff --git a/backend-compliance/compliance/services/specialist_agents/dse/_tiered_eval.py b/backend-compliance/compliance/services/specialist_agents/dse/_tiered_eval.py new file mode 100644 index 00000000..3cd561c3 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/dse/_tiered_eval.py @@ -0,0 +1,183 @@ +"""Getierte 3-Status-Auswertung für DSE-Controls mit `tiered_criteria`. + +Pro Kriterium wird nach `decision_method` bewertet: + - EMBEDDING (Präsenz): deterministisch (festes Modell), Doc EINMAL pro Scan + eingebettet → reproduzierbar, kein LLM. Trägt den GROSSTEIL. + - LLM (Sufficiency): Haiku-Judge, GECACHT pro (doc_hash, control_id#idx, + PROMPT_VERSION, criterion) → gleicher Scan = gleiches Ergebnis. Löst die + empirisch gemessene Judge-Varianz (ein Live-Call ist NICHT reproduzierbar). + +Status NUR aus LEGAL_MINIMUM: + ERFÜLLT (alle LM erfüllt ODER kein LM) · FEHLT (kein LM erfüllt) · + TEILWEISE (Teil der LM erfüllt) · UNBESTIMMT (LM nicht bewertbar, z. B. + Embedding-Service down → Aufrufer behält sein Legacy-Ergebnis). +BEST_PRACTICE/OPTIONAL fließen NIE in den Status, nur in `recommendations`. +Siehe docs-src/development/criterion_meta_model.md. +""" +from __future__ import annotations + +import asyncio +import hashlib +import logging +import os +import sqlite3 +from typing import Any, Optional + +logger = logging.getLogger(__name__) + +PROMPT_VERSION = "dse-tier-v1" +_CACHE_DB = os.getenv("TIERED_JUDGE_CACHE", "/data/tiered_judge_cache.db") +_EMBED_THR = float(os.getenv("DSE_CRITERION_EMBED_THRESHOLD", "0.62")) +LM = "LEGAL_MINIMUM" + + +def _doc_hash(text: str) -> str: + return hashlib.sha256(text.encode("utf-8", "ignore")).hexdigest()[:20] + + +def _ckey(dh: str, cid: str, idx: int, crit: str) -> str: + ch = hashlib.sha256(crit.encode("utf-8", "ignore")).hexdigest()[:12] + return f"{dh}|{cid}#{idx}|{PROMPT_VERSION}|{ch}" + + +def _cache_get(key: str) -> Optional[bool]: + try: + with sqlite3.connect(_CACHE_DB) as c: + c.execute("create table if not exists judge(k text primary key, met int)") + row = c.execute("select met from judge where k=?", (key,)).fetchone() + return None if row is None else bool(row[0]) + except Exception: + return None + + +def _cache_put(key: str, met: bool) -> None: + try: + with sqlite3.connect(_CACHE_DB) as c: + c.execute("create table if not exists judge(k text primary key, met int)") + c.execute("insert or replace into judge values(?,?)", (key, int(met))) + except Exception as e: + logger.warning("tiered judge cache put: %s", e) + + +async def prepare_doc(text: str) -> dict[str, Any]: + """Doc EINMAL pro Scan einbetten. Liefert {hash, chunk_vecs}. Bei Embedding- + Ausfall: chunk_vecs=None → EMBEDDING-Kriterien werden UNBESTIMMT (Fallback).""" + ctx: dict[str, Any] = {"hash": _doc_hash(text or ""), "chunk_vecs": None} + if not text or len(text) < 100: + return ctx + try: + from compliance.services.mc_embedding_matcher import DIM, _chunk_text, _embed_texts + vecs = await asyncio.wait_for(_embed_texts(_chunk_text(text)), timeout=90.0) + ctx["chunk_vecs"] = [v for v in vecs if v and len(v) == DIM] + except (Exception, asyncio.TimeoutError) as e: + logger.warning("tiered prepare_doc embedding inaktiv: %s", e) + return ctx + + +async def _embed_present(crits: list[str], ctx: dict, thr: float) -> dict[str, Optional[bool]]: + cvecs = ctx.get("chunk_vecs") + if not cvecs: + return {c: None for c in crits} + try: + from compliance.services.mc_embedding_matcher import DIM, _cosine, _embed_texts + pv = await _embed_texts(crits) + out: dict[str, Optional[bool]] = {} + for crit, v in zip(crits, pv): + if not v or len(v) != DIM: + out[crit] = None + else: + out[crit] = max((_cosine(v, cv) for cv in cvecs), default=0.0) >= thr + return out + except Exception as e: + logger.warning("tiered embed present: %s", e) + return {c: None for c in crits} + + +async def _llm_met(cid: str, idx: int, crit: str, doc, dh: str) -> Optional[bool]: + key = _ckey(dh, cid, idx, crit) + cached = _cache_get(key) + if cached is not None: + return cached + from compliance.services.checkers.router import build_spec, route_and_check + spec = build_spec(cid, {"verification_method": "CONTENT", "decision_method": "LLM"}, + label=crit, criteria=[crit]) + res = await route_and_check(spec, doc) + if res.present is None: + return None + _cache_put(key, bool(res.present)) + return bool(res.present) + + +def _status(lm_vals: list[Optional[bool]]) -> str: + if not lm_vals: + return "ERFÜLLT" # kein gesetzliches Minimum → nie rot + if any(m is None for m in lm_vals): + return "UNBESTIMMT" # Aufrufer behält Legacy + n = sum(1 for m in lm_vals if m) + if n == len(lm_vals): + return "ERFÜLLT" + return "FEHLT" if n == 0 else "TEILWEISE" + + +async def evaluate_tiered(control_id: str, tiered_criteria: list[dict], + ctx: dict, doc) -> dict[str, Any]: + dh = ctx.get("hash") or _doc_hash(getattr(doc, "text", "") or "") + emb_texts = [c["criterion"] for c in (tiered_criteria or []) + if c.get("criterion") + and (c.get("decision_method") or "EMBEDDING").upper() != "LLM"] + emb_res = await _embed_present(emb_texts, ctx, _EMBED_THR) if emb_texts else {} + + lm_vals: list[Optional[bool]] = [] + recs: list[dict] = [] + detail: list[dict] = [] + for idx, c in enumerate(tiered_criteria or []): + crit = c.get("criterion") or "" + if not crit: + continue + tier = (c.get("compliance_tier") or "").upper() + if (c.get("decision_method") or "EMBEDDING").upper() == "LLM": + met = await _llm_met(control_id, idx, crit, doc, dh) + src = "haiku-cache" + else: + met = emb_res.get(crit) + src = "embedding" + detail.append({"criterion": crit, "tier": tier, "met": met, "source": src}) + if tier == LM: + lm_vals.append(met) + elif met is False: + recs.append({"criterion": crit, "tier": tier or "OPTIONAL", + "legal_basis": c.get("legal_basis")}) + + return {"status": _status(lm_vals), "lm_met": sum(1 for m in lm_vals if m), + "lm_total": len(lm_vals), "recommendations": recs, "detail": detail} + + +async def fetch_tiered_criteria(cids: list[str], db_url: str = "") -> dict[str, list]: + """tiered_criteria der angegebenen Controls aus canonical_controls laden. + Leeres Dict bei Fehler/keiner DB (Fallback: kein Tiering, Legacy trägt).""" + cids = [c for c in cids if c] + if not cids: + return {} + import json + dsn = db_url or os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL") + if not dsn: + return {} + try: + import asyncpg + conn = await asyncpg.connect(dsn) + rows = await conn.fetch( + "select control_id, generation_metadata->'tiered_criteria' tc " + "from compliance.canonical_controls " + "where control_id = any($1::text[]) " + "and generation_metadata ? 'tiered_criteria'", cids) + await conn.close() + except Exception as e: + logger.warning("fetch_tiered_criteria failed: %s", e) + return {} + out: dict[str, list] = {} + for r in rows: + tc = r["tc"] + tc = json.loads(tc) if isinstance(tc, str) else tc + if tc: + out[r["control_id"]] = tc + return out diff --git a/backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py index b77a87cf..eee7cfae 100644 --- a/backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py +++ b/backend-compliance/compliance/services/specialist_agents/dse/v3_engine.py @@ -129,11 +129,41 @@ async def run_v3_pipeline( r["source"] = (r.get("source") or "") + "+embedding" embedding_passes += 1 + # Layer 3: getierte 3-Status-Auswertung (nur Controls mit tiered_criteria). + # Reproduzierbar: EMBEDDING-Präsenz (deterministisch) + GECACHTER Haiku-Judge + # nur für Sufficiency. UNBESTIMMT → Legacy-Pass bleibt. Gated + fail-safe. + tiered_evaluated = 0 + try: + from compliance.services.checkers.base import DocContext + from ._tiered_eval import ( + evaluate_tiered, fetch_tiered_criteria, prepare_doc, + ) + result_cids = [r.get("control_id") for r in results if r.get("control_id")] + tiered_map = await fetch_tiered_criteria(result_cids, db_url) + if tiered_map: + ctx = await prepare_doc(text) + doc_ctx = DocContext(text=text) + for r in results: + tc = tiered_map.get(r.get("control_id")) + if not tc: + continue + ev = await evaluate_tiered(r["control_id"], tc, ctx, doc_ctx) + if ev["status"] == "UNBESTIMMT": + continue + r["compliance_status"] = ev["status"] + r["recommendations"] = ev["recommendations"] + r["tier_lm"] = f"{ev['lm_met']}/{ev['lm_total']}" + r["passed"] = ev["status"] == "ERFÜLLT" + tiered_evaluated += 1 + except Exception as e: + logger.warning("dse tiered eval skipped: %s", e) + telemetry = { "layer_0_field_hits": len(boost_field_ids), "layer_0_field_ids": boost_field_ids, "layer_1_pass": layer_1_pass, "embedding_passes": embedding_passes, + "tiered_evaluated": tiered_evaluated, "total_mcs": len(results), "sector_dropped": drop_stats.get("sector_dropped", 0), "offtopic_dropped": drop_stats.get("offtopic_dropped", 0), diff --git a/backend-compliance/tests/test_tiered_eval.py b/backend-compliance/tests/test_tiered_eval.py new file mode 100644 index 00000000..c75cc3ce --- /dev/null +++ b/backend-compliance/tests/test_tiered_eval.py @@ -0,0 +1,102 @@ +"""Unit-Tests für die getierte 3-Status-Auswertung (_tiered_eval). + +Deckt ab: Status-Logik (inkl. kein-LM → ERFÜLLT, UNBESTIMMT bei nicht bewertbar), +Empfehlungs-Sammlung, EMBEDDING/LLM-Routing (gemockt) und den Reproduzierbarkeits- +Cache. Embedding/LLM werden gemockt — kein Netzwerk.""" +import asyncio + +from compliance.services.specialist_agents.dse import _tiered_eval as te + + +# ---- reine Status-Logik ------------------------------------------------- +def test_status_no_lm_is_erfuellt(): + assert te._status([]) == "ERFÜLLT" + + +def test_status_all_met_erfuellt(): + assert te._status([True, True]) == "ERFÜLLT" + + +def test_status_none_met_fehlt(): + assert te._status([False, False]) == "FEHLT" + + +def test_status_partial_teilweise(): + assert te._status([True, False]) == "TEILWEISE" + + +def test_status_any_none_unbestimmt(): + assert te._status([True, None]) == "UNBESTIMMT" + + +# ---- evaluate_tiered (Embedding/LLM gemockt) ---------------------------- +def _crit(text, tier, dm="EMBEDDING"): + return {"criterion": text, "compliance_tier": tier, + "decision_method": dm, "legal_basis": "x"} + + +class _Doc: + def __init__(self, text): + self.text = text + + +def test_evaluate_partial_with_recommendation(monkeypatch): + crits = [_crit("Zwecke genannt", "LEGAL_MINIMUM"), + _crit("Speicherdauer genannt", "LEGAL_MINIMUM"), + _crit("tabellarisch ausgewiesen", "BEST_PRACTICE")] + + async def fake_embed(texts, ctx, thr): + return {"Zwecke genannt": True, "Speicherdauer genannt": False, + "tabellarisch ausgewiesen": False} + + monkeypatch.setattr(te, "_embed_present", fake_embed) + out = asyncio.run(te.evaluate_tiered("C1", crits, {"hash": "h"}, _Doc("x" * 200))) + assert out["status"] == "TEILWEISE" + assert out["lm_met"] == 1 and out["lm_total"] == 2 + assert len(out["recommendations"]) == 1 + assert out["recommendations"][0]["tier"] == "BEST_PRACTICE" + + +def test_evaluate_no_lm_is_erfuellt_with_recs(monkeypatch): + crits = [_crit("Bildsymbole", "OPTIONAL"), _crit("Legende", "OPTIONAL")] + + async def fake_embed(texts, ctx, thr): + return {t: False for t in texts} + + monkeypatch.setattr(te, "_embed_present", fake_embed) + out = asyncio.run(te.evaluate_tiered("C2", crits, {"hash": "h"}, _Doc("x" * 200))) + assert out["status"] == "ERFÜLLT" + assert out["lm_total"] == 0 + assert len(out["recommendations"]) == 2 + + +def test_evaluate_llm_criterion_routed(monkeypatch): + crits = [_crit("Speicherdauer hinreichend nachvollziehbar", "LEGAL_MINIMUM", dm="LLM")] + + async def fake_llm(cid, idx, crit, doc, dh): + return True + + monkeypatch.setattr(te, "_llm_met", fake_llm) + out = asyncio.run(te.evaluate_tiered("C3", crits, {"hash": "h"}, _Doc("x" * 200))) + assert out["status"] == "ERFÜLLT" and out["lm_total"] == 1 + + +def test_evaluate_unbestimmt_when_embed_unavailable(monkeypatch): + crits = [_crit("Zwecke genannt", "LEGAL_MINIMUM")] + + async def fake_embed(texts, ctx, thr): + return {t: None for t in texts} # Embedding-Service down + + monkeypatch.setattr(te, "_embed_present", fake_embed) + out = asyncio.run(te.evaluate_tiered("C4", crits, {"hash": "h"}, _Doc("x" * 200))) + assert out["status"] == "UNBESTIMMT" + + +# ---- Reproduzierbarkeits-Cache ----------------------------------------- +def test_cache_roundtrip(monkeypatch, tmp_path): + monkeypatch.setattr(te, "_CACHE_DB", str(tmp_path / "cache.db")) + assert te._cache_get("k1") is None + te._cache_put("k1", True) + te._cache_put("k2", False) + assert te._cache_get("k1") is True + assert te._cache_get("k2") is False