feat(dse): tiered 3-state evaluator + Layer-3 wiring (compliance_tier)

Getierte Auswertung mit compliance_tier-Gating (nur LEGAL_MINIMUM bestimmt
ERFÜLLT/TEILWEISE/FEHLT; BEST_PRACTICE/OPTIONAL → Empfehlungen). Deterministisch-
first: EMBEDDING-Präsenz + gecachter Haiku nur für Sufficiency → reproduzierbar
(löst die gemessene Judge-Varianz). Layer-3 in v3_engine gated auf tiered_criteria,
fail-safe (UNBESTIMMT → Legacy). Offene Kalibrierung: Präsenz-Schwelle (Schritt 2).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-22 17:26:43 +02:00
parent 3e3644f83d
commit 5ff08a240b
3 changed files with 315 additions and 0 deletions
@@ -0,0 +1,183 @@
"""Getierte 3-Status-Auswertung für DSE-Controls mit `tiered_criteria`.
Pro Kriterium wird nach `decision_method` bewertet:
- EMBEDDING (Präsenz): deterministisch (festes Modell), Doc EINMAL pro Scan
eingebettet → reproduzierbar, kein LLM. Trägt den GROSSTEIL.
- LLM (Sufficiency): Haiku-Judge, GECACHT pro (doc_hash, control_id#idx,
PROMPT_VERSION, criterion) → gleicher Scan = gleiches Ergebnis. Löst die
empirisch gemessene Judge-Varianz (ein Live-Call ist NICHT reproduzierbar).
Status NUR aus LEGAL_MINIMUM:
ERFÜLLT (alle LM erfüllt ODER kein LM) · FEHLT (kein LM erfüllt) ·
TEILWEISE (Teil der LM erfüllt) · UNBESTIMMT (LM nicht bewertbar, z. B.
Embedding-Service down → Aufrufer behält sein Legacy-Ergebnis).
BEST_PRACTICE/OPTIONAL fließen NIE in den Status, nur in `recommendations`.
Siehe docs-src/development/criterion_meta_model.md.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import os
import sqlite3
from typing import Any, Optional
logger = logging.getLogger(__name__)
PROMPT_VERSION = "dse-tier-v1"
_CACHE_DB = os.getenv("TIERED_JUDGE_CACHE", "/data/tiered_judge_cache.db")
_EMBED_THR = float(os.getenv("DSE_CRITERION_EMBED_THRESHOLD", "0.62"))
LM = "LEGAL_MINIMUM"
def _doc_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8", "ignore")).hexdigest()[:20]
def _ckey(dh: str, cid: str, idx: int, crit: str) -> str:
ch = hashlib.sha256(crit.encode("utf-8", "ignore")).hexdigest()[:12]
return f"{dh}|{cid}#{idx}|{PROMPT_VERSION}|{ch}"
def _cache_get(key: str) -> Optional[bool]:
try:
with sqlite3.connect(_CACHE_DB) as c:
c.execute("create table if not exists judge(k text primary key, met int)")
row = c.execute("select met from judge where k=?", (key,)).fetchone()
return None if row is None else bool(row[0])
except Exception:
return None
def _cache_put(key: str, met: bool) -> None:
try:
with sqlite3.connect(_CACHE_DB) as c:
c.execute("create table if not exists judge(k text primary key, met int)")
c.execute("insert or replace into judge values(?,?)", (key, int(met)))
except Exception as e:
logger.warning("tiered judge cache put: %s", e)
async def prepare_doc(text: str) -> dict[str, Any]:
"""Doc EINMAL pro Scan einbetten. Liefert {hash, chunk_vecs}. Bei Embedding-
Ausfall: chunk_vecs=None → EMBEDDING-Kriterien werden UNBESTIMMT (Fallback)."""
ctx: dict[str, Any] = {"hash": _doc_hash(text or ""), "chunk_vecs": None}
if not text or len(text) < 100:
return ctx
try:
from compliance.services.mc_embedding_matcher import DIM, _chunk_text, _embed_texts
vecs = await asyncio.wait_for(_embed_texts(_chunk_text(text)), timeout=90.0)
ctx["chunk_vecs"] = [v for v in vecs if v and len(v) == DIM]
except (Exception, asyncio.TimeoutError) as e:
logger.warning("tiered prepare_doc embedding inaktiv: %s", e)
return ctx
async def _embed_present(crits: list[str], ctx: dict, thr: float) -> dict[str, Optional[bool]]:
cvecs = ctx.get("chunk_vecs")
if not cvecs:
return {c: None for c in crits}
try:
from compliance.services.mc_embedding_matcher import DIM, _cosine, _embed_texts
pv = await _embed_texts(crits)
out: dict[str, Optional[bool]] = {}
for crit, v in zip(crits, pv):
if not v or len(v) != DIM:
out[crit] = None
else:
out[crit] = max((_cosine(v, cv) for cv in cvecs), default=0.0) >= thr
return out
except Exception as e:
logger.warning("tiered embed present: %s", e)
return {c: None for c in crits}
async def _llm_met(cid: str, idx: int, crit: str, doc, dh: str) -> Optional[bool]:
key = _ckey(dh, cid, idx, crit)
cached = _cache_get(key)
if cached is not None:
return cached
from compliance.services.checkers.router import build_spec, route_and_check
spec = build_spec(cid, {"verification_method": "CONTENT", "decision_method": "LLM"},
label=crit, criteria=[crit])
res = await route_and_check(spec, doc)
if res.present is None:
return None
_cache_put(key, bool(res.present))
return bool(res.present)
def _status(lm_vals: list[Optional[bool]]) -> str:
if not lm_vals:
return "ERFÜLLT" # kein gesetzliches Minimum → nie rot
if any(m is None for m in lm_vals):
return "UNBESTIMMT" # Aufrufer behält Legacy
n = sum(1 for m in lm_vals if m)
if n == len(lm_vals):
return "ERFÜLLT"
return "FEHLT" if n == 0 else "TEILWEISE"
async def evaluate_tiered(control_id: str, tiered_criteria: list[dict],
ctx: dict, doc) -> dict[str, Any]:
dh = ctx.get("hash") or _doc_hash(getattr(doc, "text", "") or "")
emb_texts = [c["criterion"] for c in (tiered_criteria or [])
if c.get("criterion")
and (c.get("decision_method") or "EMBEDDING").upper() != "LLM"]
emb_res = await _embed_present(emb_texts, ctx, _EMBED_THR) if emb_texts else {}
lm_vals: list[Optional[bool]] = []
recs: list[dict] = []
detail: list[dict] = []
for idx, c in enumerate(tiered_criteria or []):
crit = c.get("criterion") or ""
if not crit:
continue
tier = (c.get("compliance_tier") or "").upper()
if (c.get("decision_method") or "EMBEDDING").upper() == "LLM":
met = await _llm_met(control_id, idx, crit, doc, dh)
src = "haiku-cache"
else:
met = emb_res.get(crit)
src = "embedding"
detail.append({"criterion": crit, "tier": tier, "met": met, "source": src})
if tier == LM:
lm_vals.append(met)
elif met is False:
recs.append({"criterion": crit, "tier": tier or "OPTIONAL",
"legal_basis": c.get("legal_basis")})
return {"status": _status(lm_vals), "lm_met": sum(1 for m in lm_vals if m),
"lm_total": len(lm_vals), "recommendations": recs, "detail": detail}
async def fetch_tiered_criteria(cids: list[str], db_url: str = "") -> dict[str, list]:
"""tiered_criteria der angegebenen Controls aus canonical_controls laden.
Leeres Dict bei Fehler/keiner DB (Fallback: kein Tiering, Legacy trägt)."""
cids = [c for c in cids if c]
if not cids:
return {}
import json
dsn = db_url or os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
if not dsn:
return {}
try:
import asyncpg
conn = await asyncpg.connect(dsn)
rows = await conn.fetch(
"select control_id, generation_metadata->'tiered_criteria' tc "
"from compliance.canonical_controls "
"where control_id = any($1::text[]) "
"and generation_metadata ? 'tiered_criteria'", cids)
await conn.close()
except Exception as e:
logger.warning("fetch_tiered_criteria failed: %s", e)
return {}
out: dict[str, list] = {}
for r in rows:
tc = r["tc"]
tc = json.loads(tc) if isinstance(tc, str) else tc
if tc:
out[r["control_id"]] = tc
return out
@@ -129,11 +129,41 @@ async def run_v3_pipeline(
r["source"] = (r.get("source") or "") + "+embedding"
embedding_passes += 1
# Layer 3: getierte 3-Status-Auswertung (nur Controls mit tiered_criteria).
# Reproduzierbar: EMBEDDING-Präsenz (deterministisch) + GECACHTER Haiku-Judge
# nur für Sufficiency. UNBESTIMMT → Legacy-Pass bleibt. Gated + fail-safe.
tiered_evaluated = 0
try:
from compliance.services.checkers.base import DocContext
from ._tiered_eval import (
evaluate_tiered, fetch_tiered_criteria, prepare_doc,
)
result_cids = [r.get("control_id") for r in results if r.get("control_id")]
tiered_map = await fetch_tiered_criteria(result_cids, db_url)
if tiered_map:
ctx = await prepare_doc(text)
doc_ctx = DocContext(text=text)
for r in results:
tc = tiered_map.get(r.get("control_id"))
if not tc:
continue
ev = await evaluate_tiered(r["control_id"], tc, ctx, doc_ctx)
if ev["status"] == "UNBESTIMMT":
continue
r["compliance_status"] = ev["status"]
r["recommendations"] = ev["recommendations"]
r["tier_lm"] = f"{ev['lm_met']}/{ev['lm_total']}"
r["passed"] = ev["status"] == "ERFÜLLT"
tiered_evaluated += 1
except Exception as e:
logger.warning("dse tiered eval skipped: %s", e)
telemetry = {
"layer_0_field_hits": len(boost_field_ids),
"layer_0_field_ids": boost_field_ids,
"layer_1_pass": layer_1_pass,
"embedding_passes": embedding_passes,
"tiered_evaluated": tiered_evaluated,
"total_mcs": len(results),
"sector_dropped": drop_stats.get("sector_dropped", 0),
"offtopic_dropped": drop_stats.get("offtopic_dropped", 0),
@@ -0,0 +1,102 @@
"""Unit-Tests für die getierte 3-Status-Auswertung (_tiered_eval).
Deckt ab: Status-Logik (inkl. kein-LM → ERFÜLLT, UNBESTIMMT bei nicht bewertbar),
Empfehlungs-Sammlung, EMBEDDING/LLM-Routing (gemockt) und den Reproduzierbarkeits-
Cache. Embedding/LLM werden gemockt — kein Netzwerk."""
import asyncio
from compliance.services.specialist_agents.dse import _tiered_eval as te
# ---- reine Status-Logik -------------------------------------------------
def test_status_no_lm_is_erfuellt():
assert te._status([]) == "ERFÜLLT"
def test_status_all_met_erfuellt():
assert te._status([True, True]) == "ERFÜLLT"
def test_status_none_met_fehlt():
assert te._status([False, False]) == "FEHLT"
def test_status_partial_teilweise():
assert te._status([True, False]) == "TEILWEISE"
def test_status_any_none_unbestimmt():
assert te._status([True, None]) == "UNBESTIMMT"
# ---- evaluate_tiered (Embedding/LLM gemockt) ----------------------------
def _crit(text, tier, dm="EMBEDDING"):
return {"criterion": text, "compliance_tier": tier,
"decision_method": dm, "legal_basis": "x"}
class _Doc:
def __init__(self, text):
self.text = text
def test_evaluate_partial_with_recommendation(monkeypatch):
crits = [_crit("Zwecke genannt", "LEGAL_MINIMUM"),
_crit("Speicherdauer genannt", "LEGAL_MINIMUM"),
_crit("tabellarisch ausgewiesen", "BEST_PRACTICE")]
async def fake_embed(texts, ctx, thr):
return {"Zwecke genannt": True, "Speicherdauer genannt": False,
"tabellarisch ausgewiesen": False}
monkeypatch.setattr(te, "_embed_present", fake_embed)
out = asyncio.run(te.evaluate_tiered("C1", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "TEILWEISE"
assert out["lm_met"] == 1 and out["lm_total"] == 2
assert len(out["recommendations"]) == 1
assert out["recommendations"][0]["tier"] == "BEST_PRACTICE"
def test_evaluate_no_lm_is_erfuellt_with_recs(monkeypatch):
crits = [_crit("Bildsymbole", "OPTIONAL"), _crit("Legende", "OPTIONAL")]
async def fake_embed(texts, ctx, thr):
return {t: False for t in texts}
monkeypatch.setattr(te, "_embed_present", fake_embed)
out = asyncio.run(te.evaluate_tiered("C2", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "ERFÜLLT"
assert out["lm_total"] == 0
assert len(out["recommendations"]) == 2
def test_evaluate_llm_criterion_routed(monkeypatch):
crits = [_crit("Speicherdauer hinreichend nachvollziehbar", "LEGAL_MINIMUM", dm="LLM")]
async def fake_llm(cid, idx, crit, doc, dh):
return True
monkeypatch.setattr(te, "_llm_met", fake_llm)
out = asyncio.run(te.evaluate_tiered("C3", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "ERFÜLLT" and out["lm_total"] == 1
def test_evaluate_unbestimmt_when_embed_unavailable(monkeypatch):
crits = [_crit("Zwecke genannt", "LEGAL_MINIMUM")]
async def fake_embed(texts, ctx, thr):
return {t: None for t in texts} # Embedding-Service down
monkeypatch.setattr(te, "_embed_present", fake_embed)
out = asyncio.run(te.evaluate_tiered("C4", crits, {"hash": "h"}, _Doc("x" * 200)))
assert out["status"] == "UNBESTIMMT"
# ---- Reproduzierbarkeits-Cache -----------------------------------------
def test_cache_roundtrip(monkeypatch, tmp_path):
monkeypatch.setattr(te, "_CACHE_DB", str(tmp_path / "cache.db"))
assert te._cache_get("k1") is None
te._cache_put("k1", True)
te._cache_put("k2", False)
assert te._cache_get("k1") is True
assert te._cache_get("k2") is False