feat(advisor): Authority Router — Advisor collection-agnostisch, KB-2026.1-Gewinn im Produktpfad
CI / detect-changes (pull_request) Successful in 13s
CI / branch-name (pull_request) Successful in 2s
CI / guardrail-integrity (pull_request) Successful in 5s
CI / secret-scan (pull_request) Successful in 11s
CI / dep-audit (pull_request) Failing after 54s
CI / sbom-scan (pull_request) Failing after 1m1s
CI / build-sha-integrity (pull_request) Successful in 11s
CI / validate-canonical-controls (pull_request) Successful in 7s
CI / loc-budget (pull_request) Successful in 23s
CI / go-lint (pull_request) Successful in 53s
CI / python-lint (pull_request) Failing after 17s
CI / nodejs-lint (pull_request) Failing after 1m6s
CI / nodejs-build (pull_request) Successful in 2m59s
CI / test-go (pull_request) Successful in 1m0s
CI / iace-gt-coverage (pull_request) Successful in 17s
CI / test-python-backend (pull_request) Successful in 26s
CI / test-python-document-crawler (pull_request) Successful in 12s
CI / test-python-dsms-gateway (pull_request) Successful in 8s

Der Advisor fan-outete bisher selbst ueber eine feste Liste expliziter Collections
(advisor-rag.ts) und umging damit das #61-Scope-Routing (das nur den Default-Pfad
routet) → der gemessene +28-Retrieval-Gewinn (CB-100: 53→81, 0 Regr) kam nie beim
Antwort-LLM an. Dieser Router zieht den Fan-out in die Retriever-Schicht:

- SDK: LegalRAGClient.Retrieve() + POST /sdk/v1/rag/retrieve {query, top_k} —
  fan-outet server-seitig ueber die Broad-Authority-Base + die KB-2026.1-Slice bei
  inKBScope, merge+dedup, sortiert nach Authority-Score (rerankByAuthority je
  Collection), top-K. Index-Warmup vor dem nebenlaeufigen Fan-out (Map-Race-frei).
  Per-Env via RAG_ROUTER_COLLECTIONS.
- admin: advisor-rag.ts ruft EINMAL /retrieve statt 6-fach expliziter Collections.
  Advisor ist collection-agnostisch (Vertrag Compiler→Collections→Retriever→Advisor);
  COMPLIANCE_COLLECTIONS/searchCollection entfernt.

Validierung: Go-Unit (Router-Selektion, dedup); e2e gegen dev-Qdrant (echter
Retrieve(), CB-100-Stichprobe stride 5): OLD-hit 11/20 → NEW-hit 15/20, GAIN 4
(alle DS-Guidance), REGR 0 — reproduziert den +28/0-Regr durch den Produktionscode.
TS-Tests auf den Single-/retrieve-Call angepasst.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-30 14:13:09 +02:00
parent af11d21f6e
commit 1e5aaf7103
7 changed files with 386 additions and 51 deletions
@@ -0,0 +1,114 @@
package ucca
import (
"context"
"os"
"sort"
"strings"
"sync"
)
// routerBaseCollections is the broad authority base the Authority Router fans out over. It mirrors
// the Advisor's historical multi-collection set; the KB-2026.1 slice is added separately when the
// query is in scope. Override via RAG_ROUTER_COLLECTIONS (comma-separated) per environment.
func (c *LegalRAGClient) routerBaseCollections() []string {
if v := strings.TrimSpace(os.Getenv("RAG_ROUTER_COLLECTIONS")); v != "" {
var out []string
for _, p := range strings.Split(v, ",") {
if s := strings.TrimSpace(p); s != "" {
out = append(out, s)
}
}
if len(out) > 0 {
return out
}
}
return []string{
"bp_compliance_gesetze",
"bp_compliance_ce",
"bp_compliance_datenschutz",
"bp_dsfa_corpus",
"bp_compliance_recht",
"bp_legal_templates",
}
}
const routerPerCollectionTopK = 3
// Retrieve is the Authority Router entry point: callers (the Advisor) pass ONLY a query and stay
// collection-agnostic. The router fans out over the broad authority base and ADDS the KB-2026.1
// slice when the query is in scope (inKBScope), then merges all hits, deduplicates, and returns the
// top-K by authority score. This moves the former Advisor-side collection fan-out into the retrieval
// layer (the "Retriever" tier of the quality pyramid), so the proven KB-2026.1 slice gain reaches
// the product path without the Advisor knowing about individual collections.
//
// The merged set is ordered by the per-collection authority score that rerankByAuthority already
// produced inside searchInternal — i.e. binding-vs-guidance ordering is preserved across the merge.
// Per-collection failures (e.g. a collection absent on an environment) degrade gracefully.
func (c *LegalRAGClient) Retrieve(ctx context.Context, query string, topK int) ([]LegalSearchResult, error) {
if topK <= 0 {
topK = 8
}
collections := c.routerBaseCollections()
if c.kbScopeRoutingEnabled && c.kbSliceCollection != "" && inKBScope(query) {
collections = append(collections, c.kbSliceCollection)
}
// Warm the full-text indexes sequentially first so the concurrent fan-out below only READS the
// shared textIndexEnsured map (the writes happen here, serialized) — closes the cold-start map
// race deterministically. Best-effort: a missing collection just stays un-indexed (hybrid then
// falls back to dense, or the per-collection search degrades to nothing).
if c.hybridEnabled {
for _, coll := range collections {
_ = c.ensureTextIndex(ctx, coll)
}
}
out := make([][]LegalSearchResult, len(collections))
var wg sync.WaitGroup
for i, coll := range collections {
wg.Add(1)
go func(i int, coll string) {
defer wg.Done()
if res, err := c.searchInternal(ctx, coll, query, nil, routerPerCollectionTopK); err == nil {
out[i] = res
}
}(i, coll)
}
wg.Wait()
merged := make([]LegalSearchResult, 0, len(collections)*routerPerCollectionTopK)
for _, r := range out {
merged = append(merged, r...)
}
merged = dedupResults(merged)
sort.SliceStable(merged, func(a, b int) bool { return merged[a].Score > merged[b].Score })
if len(merged) > topK {
merged = merged[:topK]
}
return merged, nil
}
// dedupResults removes duplicate passages that can appear when collections overlap, keeping the
// highest-scoring occurrence. Identity = regulation_code + article_label + a text prefix.
func dedupResults(in []LegalSearchResult) []LegalSearchResult {
pos := make(map[string]int, len(in))
out := make([]LegalSearchResult, 0, len(in))
for _, r := range in {
text := r.Text
if len(text) > 80 {
text = text[:80]
}
key := r.RegulationCode + "|" + r.ArticleLabel + "|" + text
if idx, ok := pos[key]; ok {
if r.Score > out[idx].Score {
out[idx] = r
}
continue
}
pos[key] = len(out)
out = append(out, r)
}
return out
}
@@ -0,0 +1,134 @@
package ucca
import (
"context"
"encoding/json"
"os"
"strconv"
"strings"
"testing"
)
type benchQ struct {
ID string `json:"id"`
Document string `json:"document"`
Question string `json:"question"`
}
// docTokens maps a bench question's expected document to acceptable regulation_code/label substrings.
func docTokens(document string) []string {
d := strings.ToUpper(document)
var t []string
for _, wp := range []string{"WP243", "WP248", "WP260"} {
if strings.Contains(d, wp) {
t = append(t, wp)
}
}
dns := strings.ReplaceAll(d, " ", "")
for _, gl := range []struct{ key, tok string }{{"07/2020", "GL07"}, {"05/2020", "GL05"}, {"09/2022", "GL09"}} {
if strings.Contains(dns, gl.key) {
t = append(t, gl.tok)
}
}
if strings.Contains(d, "TDDDG") {
t = append(t, "TDDDG")
}
if strings.Contains(d, "DSGVO") || strings.Contains(d, "ART. 13") || strings.Contains(d, "ART. 14") {
t = append(t, "DSGVO")
}
if strings.Contains(d, "BDSG") {
t = append(t, "BDSG")
}
if strings.Contains(d, "CRA") {
t = append(t, "CRA")
}
if strings.Contains(d, "MASCH") {
t = append(t, "MASCH", "MACHINERY", "MVO")
}
return t
}
func hitDoc(results []LegalSearchResult, toks []string) bool {
for _, r := range results {
s := strings.ReplaceAll(strings.ToUpper(r.RegulationCode+" "+r.ArticleLabel), " ", "")
for _, tk := range toks {
if strings.Contains(s, strings.ReplaceAll(tk, " ", "")) {
return true
}
}
}
return false
}
// TestAuthorityRouterCB100 (RUN_E2E=1) drives the REAL Retrieve() over the ComplianceBench-100 against
// the live collections: NEW (scope routing on → slice added for in-scope queries) vs OLD (routing off
// → broad base only). It is the regression gate that the router actually delivers the proven slice
// gain (+28/0-regr in the offline simulation) through the production Go code path.
func TestAuthorityRouterCB100(t *testing.T) {
if os.Getenv("RUN_E2E") != "1" {
t.Skip("set RUN_E2E=1 + QDRANT_URL/OLLAMA_URL/QDRANT_API_KEY + BENCH_PATH")
}
path := os.Getenv("BENCH_PATH")
if path == "" {
path = "/tmp/compliance_bench.json"
}
raw, err := os.ReadFile(path)
if err != nil {
t.Fatalf("bench read: %v", err)
}
var doc struct {
Questions []benchQ `json:"questions"`
}
if err := json.Unmarshal(raw, &doc); err != nil {
t.Fatalf("bench parse: %v", err)
}
// BENCH_STRIDE samples every Kth question (stratified across DS/CRA/MaschVO) so the gate stays
// tractable against the remote dev Qdrant; default 1 = full CB-100.
stride := 1
if s := os.Getenv("BENCH_STRIDE"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 {
stride = n
}
}
c := NewLegalRAGClient()
ctx := context.Background()
var n, oldHit, newHit, gain, regr int
for i, q := range doc.Questions {
if i%stride != 0 {
continue
}
n++
toks := docTokens(q.Document)
c.kbScopeRoutingEnabled = false
oldRes, _ := c.Retrieve(ctx, q.Question, 8)
c.kbScopeRoutingEnabled = true
newRes, _ := c.Retrieve(ctx, q.Question, 8)
oh, nh := hitDoc(oldRes, toks), hitDoc(newRes, toks)
if oh {
oldHit++
}
if nh {
newHit++
}
flip := "="
switch {
case !oh && nh:
gain++
flip = "GAIN"
case oh && !nh:
regr++
flip = "REGR"
}
t.Logf("%-9s [%-14s] OLD=%-5v NEW=%-5v %s", q.ID, q.Document, oh, nh, flip)
}
t.Logf("CB-100 sample (stride=%d) via Retrieve(): N=%d | OLD-hit %d | NEW-hit %d | GAIN %d | REGR %d",
stride, n, oldHit, newHit, gain, regr)
if newHit <= oldHit || gain < 3 {
t.Errorf("router must add slice gains: NEW(%d) must exceed OLD(%d), gain=%d", newHit, oldHit, gain)
}
if regr > 2 {
t.Errorf("too many regressions through the router: %d", regr)
}
}
@@ -0,0 +1,67 @@
package ucca
import (
"os"
"testing"
)
func TestRouterBaseCollections(t *testing.T) {
c := &LegalRAGClient{}
os.Unsetenv("RAG_ROUTER_COLLECTIONS")
def := c.routerBaseCollections()
if len(def) != 6 || def[1] != "bp_compliance_ce" {
t.Fatalf("default base collections unexpected: %v", def)
}
os.Setenv("RAG_ROUTER_COLLECTIONS", " bp_compliance_ce , kb_2026_1_build ,, ")
defer os.Unsetenv("RAG_ROUTER_COLLECTIONS")
got := c.routerBaseCollections()
if len(got) != 2 || got[0] != "bp_compliance_ce" || got[1] != "kb_2026_1_build" {
t.Fatalf("env override parse failed (trim/empty): %v", got)
}
}
func TestRouterSliceSelection(t *testing.T) {
// The router appends the slice exactly when the query is in scope (inKBScope) and routing is on.
// Mirror the selection logic so a regression in either is caught without a live Qdrant.
c := &LegalRAGClient{kbSliceCollection: "kb_2026_1_build", kbScopeRoutingEnabled: true}
sel := func(q string) bool {
colls := c.routerBaseCollections()
if c.kbScopeRoutingEnabled && c.kbSliceCollection != "" && inKBScope(q) {
colls = append(colls, c.kbSliceCollection)
}
for _, x := range colls {
if x == c.kbSliceCollection {
return true
}
}
return false
}
if !sel("Welche neun Kriterien nennt WP248 fuer ein hohes Risiko?") {
t.Error("in-scope guidance query must include the slice")
}
if sel("Was sagt NIST SP 800-53 zu Access Control?") {
t.Error("out-of-scope query must NOT include the slice")
}
c.kbScopeRoutingEnabled = false
if sel("Welche Kriterien nennt WP248?") {
t.Error("routing disabled => slice never included")
}
}
func TestDedupResults(t *testing.T) {
in := []LegalSearchResult{
{RegulationCode: "EDPB WP248", ArticleLabel: "III.B", Text: "lorem", Score: 0.7},
{RegulationCode: "EDPB WP248", ArticleLabel: "III.B", Text: "lorem", Score: 0.9}, // dup, higher score
{RegulationCode: "DSGVO", ArticleLabel: "Art. 35", Text: "ipsum", Score: 0.8},
}
out := dedupResults(in)
if len(out) != 2 {
t.Fatalf("expected 2 deduped, got %d", len(out))
}
for _, r := range out {
if r.RegulationCode == "EDPB WP248" && r.Score != 0.9 {
t.Errorf("dedup must keep highest score, got %v", r.Score)
}
}
}