f6d018234b
The calibrated DSE engine (4-layer: regex-boost / keyword / BGE-M3 embedding
recall @0.65 / semantic-validator) existed ONLY in the running macmini
container (docker cp'd, never committed) — at risk of loss on any container
rebuild. This recovers it into git and wires it into the live check path.
- Recover dse/{agent,v3_engine,_embedding_recall,_classification_gate,
regex_boost,mcs,deep_check}.py. DSEAgent (v3, BaseSpecialistAgent) replaces
the keyword-only stub: delegates MC-loading to the main engine
(rag_document_checker._load_controls), deterministic cached embedding recall
(reachability-gated), semantic-validator LLM layer honoring skip_llm,
third-country -> HIGH on documented transfer.
- Wire "dse" into _agent_outputs._TOPIC_AGENTS -> live check emits a validated
DSE tab (was snapshot/legacy-only).
- Tests rewritten for v3 (DB/embedding/LLM stubbed offline): regex-boost
detection, embedding-recall reachability guard, result->Finding conversion,
third-country HIGH; topic-wiring asserts "dse".
- deep_check.py recovered for preservation (alternate LLM-judge path, unwired).
Runtime data deps for full live behavior (note for prod): doc_check_controls
in DB + /data/mc_classification.db embedding sidecar + embedding-service; all
degrade gracefully (keyword layer carries) if absent.
dev-only, no deploy.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
159 lines
6.3 KiB
Python
159 lines
6.3 KiB
Python
"""Run registered v3 specialist agents and surface their structured
|
|
AgentOutput per topic for the standardized result tabs.
|
|
|
|
Additive to the legacy B-wiring HTML (`_b18_wiring`): this does NOT
|
|
replace it — it puts a clean, typed `AgentOutput` into
|
|
`state["agent_outputs"][topic]`, which `_phase_f_persist` forwards into
|
|
the API result so the frontend can render a per-topic tab.
|
|
|
|
Phase 1 ships only impressum; the topic map extends to cookie / vendor /
|
|
… as those agents get wired (same contract, no code change here beyond
|
|
the map). Once the tabs are the source of truth, B18's v1 path retires.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
from compliance.services.specialist_agents import REGISTRY, AgentInput
|
|
from compliance.services.specialist_agents.impressum._classification import (
|
|
scan_context_to_scope,
|
|
)
|
|
|
|
from ._sse import emit
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# topic key (matches state["doc_texts"]) -> registered agent_id
|
|
_TOPIC_AGENTS: dict[str, str] = {
|
|
"impressum": "impressum",
|
|
"agb": "agb", # v2: AGBAgent mit decision_method-Routing (71% FP -> ~0)
|
|
"dse": "dse", # v3: 4-Layer (Regex-Boost/Keyword/BGE-M3-Recall/Semantic)
|
|
}
|
|
|
|
_MIN_TEXT = 100
|
|
|
|
|
|
def _derive_scope(profile_dict: dict) -> list[str]:
|
|
"""Business-scope aus dem erkannten Profil — identisch zu B18, damit
|
|
der Tab denselben Scope sieht wie die bestehende Auswertung. Das
|
|
Rechtsform-Gate kommt in einer späteren Phase (eigene Klassifizierung)."""
|
|
scope: set[str] = set()
|
|
if profile_dict.get("has_online_shop"):
|
|
scope.add("ecommerce")
|
|
if profile_dict.get("is_regulated_profession"):
|
|
scope.add("regulated_profession")
|
|
if profile_dict.get("industry") in ("insurance", "Finance", "finance"):
|
|
scope.add("insurance")
|
|
# §18 MStV — 3-stufig: Medienunternehmen (Verlag/Presse) = harte Pflicht;
|
|
# nur Blog/News-Inhalte (has_editorial_content) = Graubereich → der Agent
|
|
# wertet 'editorial_possible' als POSSIBLY_APPLICABLE (Pruef-Hinweis).
|
|
if profile_dict.get("industry") == "media":
|
|
scope.add("editorial")
|
|
elif profile_dict.get("has_editorial_content"):
|
|
scope.add("editorial_possible")
|
|
return sorted(scope)
|
|
|
|
|
|
def doc_input_from_snapshot(snap: dict, doc_type: str) -> dict | None:
|
|
"""Baut den AgentInput für EINEN Doc-Type aus einem gespeicherten Snapshot
|
|
(kein Re-Crawl). Pure + testbar: zieht den Text aus doc_entries, leitet den
|
|
Scope aus scan_context + Profil ab (identisch zur Live-Auswertung) und nimmt
|
|
site_label als company_name-Fallback. None, wenn kein/zu kurzer Text.
|
|
"""
|
|
docs = snap.get("doc_entries") or []
|
|
text = next((e.get("text") or e.get("content") or ""
|
|
for e in docs if e.get("doc_type") == doc_type), "")
|
|
if len((text or "").strip()) < _MIN_TEXT:
|
|
return None
|
|
profile = snap.get("profile") or {}
|
|
scope = sorted(
|
|
set(scan_context_to_scope(snap.get("scan_context")))
|
|
| set(_derive_scope(profile))
|
|
)
|
|
return {
|
|
"doc_type": doc_type,
|
|
"text": text,
|
|
"business_scope": scope,
|
|
"company_name": (profile.get("company_name") or snap.get("site_label") or ""),
|
|
"origin_domain": snap.get("site_domain", ""),
|
|
# skip_llm: Snapshot-Ansicht ist interaktiv → kein ~40s-LLM-Schritt.
|
|
"context": {"scan_context": snap.get("scan_context") or {},
|
|
"skip_llm": True},
|
|
}
|
|
|
|
|
|
def impressum_input_from_snapshot(snap: dict) -> dict | None:
|
|
"""Rückwärtskompatibler Alias für den Impressum-Endpoint."""
|
|
return doc_input_from_snapshot(snap, "impressum")
|
|
|
|
|
|
async def run_agent_outputs(state: dict) -> None:
|
|
"""Für jedes Topic mit registriertem v3-Agent + ausreichend Text:
|
|
Agent laufen lassen, AgentOutput ablegen + als SSE topic-Event
|
|
emittieren (Tab füllt sich progressiv)."""
|
|
check_id = state.get("check_id", "")
|
|
doc_texts = state.get("doc_texts") or {}
|
|
profile_dict = state.get("profile_dict") or {}
|
|
req = state.get("req")
|
|
company_name = (
|
|
(getattr(req, "company_name", None) or "")
|
|
or (state.get("extracted_profile") or {}).get("company_name", "")
|
|
or state.get("site_name", "")
|
|
)
|
|
origin_domain = (
|
|
getattr(req, "origin_domain", None) or ""
|
|
) or state.get("domain", "")
|
|
# Phase 3: die 8 Wizard-Felder (scan_context) sind der primäre
|
|
# Scope-Treiber; das LLM-Profil ergänzt nur (v.a. regulated_profession,
|
|
# das die 8 Felder nicht ausdrücken können).
|
|
scan_context = getattr(req, "scan_context", None)
|
|
scope = sorted(
|
|
set(scan_context_to_scope(scan_context))
|
|
| set(_derive_scope(profile_dict))
|
|
)
|
|
|
|
outputs: dict[str, dict] = state.get("agent_outputs") or {}
|
|
|
|
async def _run_one(topic: str, agent_id: str):
|
|
"""Einen Topic-Agent laufen lassen + sein Tab-Event sofort emittieren
|
|
(Zwischenbefund). Fängt eigene Fehler → ein Agent reißt den Run nicht ab."""
|
|
text = (doc_texts.get(topic) or "").strip()
|
|
if len(text) < _MIN_TEXT:
|
|
return None
|
|
agent = REGISTRY.get(agent_id)
|
|
if agent is None:
|
|
logger.warning("agent_outputs: agent '%s' not registered", agent_id)
|
|
return None
|
|
try:
|
|
out = await agent.evaluate(AgentInput(
|
|
doc_type=topic,
|
|
text=text,
|
|
business_scope=scope,
|
|
company_name=company_name,
|
|
origin_domain=origin_domain,
|
|
))
|
|
dump = out.model_dump(mode="json")
|
|
emit(check_id, {"type": "topic", "topic": topic, "output": dump})
|
|
logger.info(
|
|
"agent_outputs[%s]: %d findings, confidence %.2f",
|
|
topic, len(out.findings), out.confidence,
|
|
)
|
|
return topic, dump
|
|
except Exception as e: # noqa: BLE001 — best-effort, never break the run
|
|
logger.warning("agent_outputs[%s] failed: %s", topic, e)
|
|
return None
|
|
|
|
# Topic-Agenten laufen NEBENLÄUFIG (ihre Embedding-/LLM-Waits überlappen) und
|
|
# füllen ihren Tab via SSE, sobald sie fertig sind — kein Warten aufs Schlusslicht.
|
|
results = await asyncio.gather(
|
|
*(_run_one(topic, agent_id) for topic, agent_id in _TOPIC_AGENTS.items())
|
|
)
|
|
for r in results:
|
|
if r:
|
|
outputs[r[0]] = r[1]
|
|
|
|
if outputs:
|
|
state["agent_outputs"] = outputs
|