feat(agb): wire validated routed AGB engine into live check path

Consolidate the AGB C-lean engine (71% FP -> ~0, validated vs 7-company
Opus GT) onto the canonical checker library and into the live check path.

- AGBAgent.evaluate now runs routed C-lean: keyword (L1/L2) -> business-
  model gate -> per-item decision_method routing (embedding/reference/llm
  via services/checkers/) -> severity re-tiering (LOW -> recommendation),
  honoring context.skip_llm.
- New agb/_pipeline.py orchestrates the routing; agent.py stays thin.
- Remove the 3 AGB-local checker duplicates (_reference_check,
  _embedding_rescue, _llm_judge); services/checkers/ is now canonical.
- Wire "agb" into _agent_outputs._TOPIC_AGENTS so the live check emits a
  validated AGB tab (was snapshot-only).
- Run topic agents concurrently (asyncio.gather) + emit each tab via SSE
  as it finishes -> progressive results, no wait on the slowest agent.
- Tests: checker units (mocked), routed agent (gate/rescue/re-tier),
  topic wiring; existing AGB tests made offline-safe.

dev-only, no deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-21 10:40:08 +02:00
parent 9d79cf1576
commit 32e45f0797
10 changed files with 341 additions and 195 deletions
@@ -1,74 +0,0 @@
"""EMBEDDING-Rescue (decision_method=EMBEDDING) fuer AGB.
Fuer keyword-durchgefallene EMBEDDING-Items: pruefe, ob die Klausel SEMANTISCH
(>= per-Item-Schwelle) im Dokument vorkommt — rettet Recall-FP (Klausel da, anders
formuliert). Referenzvektoren = die Item-Paraphrasen aus `_routing.PARAPHRASES`
(NICHT der mc_classification-Sidecar wie bei DSE, da AGB eine kuratierte
Checkliste statt Library-Controls nutzt).
Deterministisch (festes Embedding-Modell -> gleicher Text -> gleicher Vektor) und
gecacht. Faellt der Embedding-Service aus, liefert die Schicht leer zurueck —
der Keyword-Layer traegt dann (kein Hang, kein Crash).
"""
from __future__ import annotations
import asyncio
import logging
from . import _routing
logger = logging.getLogger(__name__)
# Paraphrasen-Vektoren werden EINMAL pro Prozess eingebettet und gecacht.
_PARA_VEC_CACHE: dict[str, list] = {}
async def _ensure_para_vecs(item_ids: list[str]) -> dict[str, list]:
from compliance.services.mc_embedding_matcher import DIM, _embed_texts
todo = [i for i in item_ids
if i not in _PARA_VEC_CACHE and _routing.PARAPHRASES.get(i)]
for it in todo:
vecs = await _embed_texts(_routing.PARAPHRASES[it])
_PARA_VEC_CACHE[it] = [v for v in vecs if v and len(v) == DIM]
return _PARA_VEC_CACHE
async def embedding_rescue(
text: str,
candidate_ids,
embed_timeout: float = 90.0,
) -> set[str]:
"""Returns die Teilmenge der `candidate_ids`, die semantisch (>= per-Item-
Schwelle) im Text vorkommt. `candidate_ids` = die im Keyword-Layer
DURCHGEFALLENEN Items (Recall-Rescue). Nur EMBEDDING-Items werden behandelt.
"""
cands = [c for c in candidate_ids
if _routing.decision_method(c) == _routing.EMBEDDING
and _routing.PARAPHRASES.get(c)]
if not text or len(text) < 100 or not cands:
return set()
try:
from compliance.services.mc_embedding_matcher import (
DIM, _chunk_text, _cosine, _embed_texts,
)
para_vecs = await _ensure_para_vecs(cands)
chunks = _chunk_text(text)
if not chunks:
return set()
cvecs = [v for v in await asyncio.wait_for(
_embed_texts(chunks), timeout=embed_timeout)
if v and len(v) == DIM]
except (Exception, asyncio.TimeoutError) as e: # Service down -> kein Rescue
logger.info("agb embedding_rescue inaktiv: %s", str(e)[:90])
return set()
if not cvecs:
return set()
rescued: set[str] = set()
for cid in cands:
pv = para_vecs.get(cid) or []
if not pv:
continue
best = max((_cosine(p, c) for p in pv for c in cvecs), default=0.0)
if best >= _routing.EMBED_THRESHOLDS.get(cid, 0.60):
rescued.add(cid)
return rescued
@@ -1,74 +0,0 @@
"""LLM-Judge (decision_method=LLM) fuer die 2 semantisch engen AGB-Items
(delivery_timeframe, warranty_period), bei denen Embedding NICHT trennt.
Retrieval = GANZE Paragraph-Abschnitte (nicht Top-k-Chunks — das war in der
Validierung der Schluessel: Top-4-Chunks verfehlten z.B. die zalando-1-Jahr-
Klausel, der ganze Paragraph nicht). Entscheidung ueber die LLM-Kaskade
(`call_with_cascade`): prod startet bei OVH-120b (stark); dev nur Qwen (schwach,
bekannte Env-Grenze). NUR present/absent — Defekt-Pruefung ist Stage 3.
"""
from __future__ import annotations
import json
import logging
import re
from . import _routing
logger = logging.getLogger(__name__)
_SECTION_SPLIT = re.compile(r"(?m)(?=^\s*(?:§\s*)?\d+[\.\)]\s)")
_SYS = (
"Du bist deutscher AGB-Rechtsexperte. Entscheide, ob die genannte Pflicht in "
"den vorgelegten AGB-Abschnitten vorhanden ist. NUR die Abschnitte zaehlen. "
'Antworte NUR JSON: {"verdict":"ERFUELLT|FEHLT","zitat":"woertlich oder leer",'
'"begruendung":"1 Satz"}.'
)
def _sections(text: str) -> list[str]:
return [s.strip() for s in _SECTION_SPLIT.split(text) if s.strip()]
def relevant_sections(item_id: str, text: str, limit: int = 6) -> list[str]:
"""Ganze Abschnitte zum Thema des Items (Topic-Regex). Fallback: erste Abschnitte."""
secs = _sections(text)
topic = _routing.LLM_TOPIC.get(item_id)
if not topic:
return secs[:limit]
rel = [s for s in secs if re.search(topic, s, re.I)]
return rel[:limit] or secs[:limit]
def _parse(txt: str) -> dict:
out = (txt or "").strip()
if out.startswith("```"):
out = out.split("```", 2)[1]
out = out[4:] if out.startswith("json") else out
a, b = out.find("{"), out.rfind("}")
return json.loads(out[a:b + 1] if 0 <= a < b else out)
async def llm_judge(item_id: str, text: str) -> dict:
"""Returns {present: bool|None, zitat, begruendung, source}.
present=None => Judge konnte nicht entscheiden -> Aufrufer behaelt das
Keyword-Ergebnis (fail-safe Richtung Finding)."""
from compliance.services.llm_cascade import call_with_cascade
question = _routing.LLM_QUESTION.get(item_id, "Ist diese Pflicht im Text vorhanden?")
secs = relevant_sections(item_id, text)
user = json.dumps({"frage": question, "agb_abschnitte": secs}, ensure_ascii=False)
try:
r = await call_with_cascade(_SYS, user, min_confidence=0.6, max_tokens=500)
obj = _parse(r.get("text"))
verdict = obj.get("verdict")
if verdict not in ("ERFUELLT", "FEHLT"):
return {"present": None, "zitat": "", "begruendung": "unklar", "source": r.get("source", "?")}
return {
"present": verdict == "ERFUELLT",
"zitat": (obj.get("zitat") or "")[:200],
"begruendung": (obj.get("begruendung") or "")[:200],
"source": r.get("source", "?"),
}
except Exception as e:
logger.info("agb llm_judge fail %s: %s", item_id, str(e)[:80])
return {"present": None, "zitat": "", "begruendung": "judge_error", "source": "error"}
@@ -0,0 +1,102 @@
"""AGB-Routing-Pipeline (C-lean): nimmt das Keyword-Ergebnis des ChecklistAgent
und routet keyword-durchgefallene Items per `_routing.decision_method` an die
wiederverwendbare Prüfer-Library (Embedding / Reference / LLM). Davor das
Geschäftsmodell-Gate (Applicability). Das Re-Tiering (LOW → Empfehlung) +
Output-Zusammenbau macht der AGBAgent — hier nur die Routing-Entscheidung.
Validiert (7-Firmen-Opus-GT): 71 % FP → ~0. agent.py bleibt dünn, dies ist der
einzige Ort des C-lean-Flows.
"""
from __future__ import annotations
import logging
from compliance.services.checkers.base import (
ControlSpec,
DecisionMethod,
DocContext,
VerificationMethod,
)
from compliance.services.checkers.embedding_checker import EmbeddingChecker
from compliance.services.checkers.llm_checker import LLMChecker
from compliance.services.checkers.reference_checker import ReferenceChecker
from . import _routing
logger = logging.getLogger(__name__)
# Checker sind zustandslos (schwere Imports erst in .check()) → Modul-Singletons.
_EMB = EmbeddingChecker()
_REF = ReferenceChecker()
_LLM = LLMChecker()
def _spec(item_id: str) -> ControlSpec:
"""ControlSpec für ein Item aus der AGB-Routing-Config bauen."""
dm = _routing.decision_method(item_id)
if dm == _routing.REFERENCE:
return ControlSpec(
control_id=item_id, verification_method=VerificationMethod.REFERENCE,
decision_method=DecisionMethod.LINK_RESOLVER,
patterns=[_routing.REFERENCE_PATTERNS[item_id]],
)
if dm == _routing.LLM:
return ControlSpec(
control_id=item_id, verification_method=VerificationMethod.CONTENT,
decision_method=DecisionMethod.LLM,
paraphrases=_routing.PARAPHRASES.get(item_id, []),
topic_regex=_routing.LLM_TOPIC.get(item_id, ""),
question=_routing.LLM_QUESTION.get(item_id, ""),
)
return ControlSpec(
control_id=item_id, verification_method=VerificationMethod.CONTENT,
decision_method=DecisionMethod.EMBEDDING,
paraphrases=_routing.PARAPHRASES.get(item_id, []),
embed_threshold=_routing.EMBED_THRESHOLDS.get(item_id),
)
async def _resolves(item_id: str, text: str, skip_llm: bool):
"""True = Klausel doch vorhanden (Keyword-Finding auflösen). False/None =
Finding behalten (fail-safe: bei Unsicherheit/Service-Ausfall lieber melden)."""
dm = _routing.decision_method(item_id)
if dm == _routing.MERGED:
return True # in ein anderes Item aufgegangen → kein eigenes Finding
doc = DocContext(text=text)
spec = _spec(item_id)
if dm == _routing.REFERENCE:
return (await _REF.check(spec, doc)).present
if dm == _routing.LLM:
if skip_llm:
return None # interaktiv: kein LLM → Keyword-Ergebnis behalten
return (await _LLM.check(spec, doc)).present
return (await _EMB.check(spec, doc)).present
async def run_routed(base_findings: list, text: str, context: dict | None = None):
"""Routet die keyword-durchgefallenen Items.
Returns (kept, resolved_ids, gated_ids):
kept = Findings, die nach Gate+Rescue bestehen bleiben
resolved_ids = per Embedding/Reference/LLM doch als vorhanden erkannt
gated_ids = per Geschäftsmodell nicht anwendbar (N/A)
"""
context = context or {}
skip_llm = bool(context.get("skip_llm"))
model = _routing.detect_business_model(text)
kept, resolved, gated = [], [], []
for f in base_findings:
item_id = f.field_id
if not _routing.is_applicable(item_id, model):
gated.append(item_id)
continue
try:
present = await _resolves(item_id, text, skip_llm)
except Exception as e: # noqa: BLE001 — best-effort, Finding behalten
logger.info("agb routing %s failed: %s", item_id, str(e)[:80])
present = None
if present is True:
resolved.append(item_id)
else:
kept.append(f)
return kept, resolved, gated
@@ -1,34 +0,0 @@
"""REFERENCE-Pruefer (verification_method=REFERENCE): ist ein klarer Verweis auf
ein anderes Pflichtdokument vorhanden — und (optional) loest der Link auf?
Fuer AGB: `data_protection` = Verweis auf die Datenschutzerklaerung. Eine AGB soll
KEINE Datenschutz-Inhalte mischen; ein Verweis genuegt (§ ... / best practice).
Deterministisch (Regex), 7/7 gegen Opus-GT — KEIN LLM, kein juristisches Urteil.
Link-Aufloesung (HTTP) ist bewusst NICHT hier: das ist ein Runtime-/Online-Check
(separater Prozess), nicht Teil der deterministischen Text-Pruefung.
"""
from __future__ import annotations
import re
from . import _routing
_URL = re.compile(r"https?://[^\s)\]]+", re.I)
def check_reference(item_id: str, text: str) -> dict:
"""Returns {present: bool, link: str|None}.
present = ein eindeutiger Verweis auf das referenzierte Dokument steht im Text.
link = die in der Naehe gefundene URL (fuer einen spaeteren LINK_CHECK), falls vorhanden.
"""
pat = _routing.REFERENCE_PATTERNS.get(item_id)
if not pat or not text:
return {"present": False, "link": None}
m = re.search(pat, text, re.I)
if not m:
return {"present": False, "link": None}
window = text[max(0, m.start() - 40): m.end() + 200]
url = _URL.search(window) or _URL.search(text)
return {"present": True, "link": url.group(0) if url else None}
@@ -1,19 +1,60 @@
"""AGBAgent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB).
Thin-Subclass von ChecklistAgent über die kuratierte AGB_CHECKLIST (L1
Pflichtangaben + L2 Detailchecks). KEIN Library-Firehose.
ChecklistAgent-Subclass: erst L1/L2-Keyword-Pass, dann **C-lean-Routing** — die
keyword-durchgefallenen Items werden per `decision_method` an die wiederverwendbare
Prüfer-Library geroutet (Embedding / Reference / LLM), davor das Geschäftsmodell-
Gate (Applicability), danach Severity-Re-Tiering (LOW → Empfehlung).
Validiert gegen 7-Firmen-Opus-GT: 71 % FP → ~0. Config in `_routing`, Flow in `_pipeline`.
"""
from __future__ import annotations
from compliance.services.doc_checks.agb_checks import AGB_CHECKLIST
from .._base import AgentInput, AgentOutput, lint_output
from .._checklist_agent import ChecklistAgent
from .._rollup import rollup
from ._pipeline import run_routed
class AGBAgent(ChecklistAgent):
CHECKLIST = AGB_CHECKLIST
agent_id = "agb"
agent_version = "1.0"
agent_version = "2.0" # v2: decision_method-Routing (Embedding/Reference/LLM)
doc_type = "agb"
owned_mc_ids = tuple(c["id"] for c in AGB_CHECKLIST)
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
# 1) Basis-Keyword-Pass (L1/L2). out.findings = keyword-durchgefallene Items.
out = await super().evaluate(agent_input)
text = (agent_input.text or "").strip()
if len(text) < 100 or not out.findings:
return out # zu kurz / nichts zu routen
# 2) Routing: Gate + Embedding/Reference/LLM-Rescue der Keyword-Misses.
kept, resolved, gated = await run_routed(
out.findings, text, agent_input.context)
resolved_set, gated_set = set(resolved), set(gated)
# 3) Coverage angleichen: rescued → ok, gated → na.
for c in out.mc_coverage:
if c.mc_id in resolved_set:
c.status, c.reason = "ok", "semantisch vorhanden (Routing)"
elif c.mc_id in gated_set:
c.status, c.reason = "na", "für Geschäftsmodell nicht anwendbar"
# 4) Severity-Re-Tiering: HIGH/MEDIUM = Findings, LOW = nur Empfehlung.
out.findings = [f for f in kept if f.severity in ("HIGH", "MEDIUM")]
out.recommendations = rollup(kept)
# 5) Aggregat-Kennzahlen neu (Coverage hat sich verschoben).
cov = out.mc_coverage
out.mc_total = len(cov)
out.mc_ok = sum(1 for c in cov if c.status == "ok")
out.mc_na = sum(1 for c in cov if c.status == "na")
out.mc_high = sum(1 for c in cov if c.status == "high")
out.mc_medium = sum(1 for c in cov if c.status == "medium")
out.mc_low = sum(1 for c in cov if c.status == "low")
out.notes = ((out.notes + " · ") if out.notes else "") + \
f"routed: {len(resolved)} rescued, {len(gated)} n/a"
return lint_output(out)