fix(agents): Impressum+Cookie delegieren MC-Laden ans Main Tool — Scope-Filter + Maßnahmen
CI / detect-changes (push) Successful in 8s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 30s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped

Regression: Der v3-Agent-Pfad baute eine parallele MC-Pipeline
(_load_impressum_mcs / _load_cookie_mcs, Roh-SELECT) und lief damit an
allen Schutzmechanismen der Engine vorbei → GOV/Branchen-MCs als HIGH bei
OEM/Zulieferer, fremde MCs (Bestellbestätigung), und action=check_question
(Fragen statt Maßnahmen im Frontend).

- Agent delegiert MC-Laden an rag_document_checker._load_controls
  (P72-Scope, check_type='text', fits_doc_type/scope_requires).
- Subtraktives Sektor-Gate (SECTOR_PREFIXES) + Themen-Gate am Agent-Rand.
- action = konkrete Maßnahme (Imperativ) statt check_question.
- rag_document_checker: from __future__ import annotations (3.9-Import).
- mcs: Name-Pattern erkennt "Aktiengesellschaft" (OEM-Impressums).
- Tote GT-/Semantic-/Routes-Tests wiederbelebt (v3-Mismatch +
  agent.cascade-Patch-Target). Alle 72 Specialist-Tests grün.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-09 11:30:16 +02:00
parent bd4882e143
commit 389e6de0c7
13 changed files with 641 additions and 135 deletions
@@ -207,3 +207,47 @@ HECTRONIC = ImpressumGT(
ALL_GROUND_TRUTH = (ETO, SAFETYKON, BMW, ELLI, HECTRONIC)
def make_mc(
control_id: str,
pass_criteria: list[str],
severity: str = "HIGH",
title: str | None = None,
regulation: str = "TMG",
article: str = "§ 5",
check_question: str | None = None,
) -> dict:
"""Baut ein doc_check_controls-MC-Dict wie `_load_controls` es liefert —
fuer Tests die `_load_controls` mocken (Agent delegiert ans Main Tool)."""
return {
"id": control_id,
"control_id": control_id,
"title": title or control_id,
"regulation": regulation,
"article": article,
"severity": severity,
"check_question": check_question or f"Ist {control_id} vorhanden?",
"pass_criteria": list(pass_criteria),
"fail_criteria": [],
}
# Kern-Impressum-MCs die ein vollstaendiges Impressum erfuellt. Der
# Regex-Boost (Layer 0) hebt sie bei vorhandenen Pflichtangaben auf PASS,
# sodass ein sauberes Impressum 0 Findings produziert. Jede pass_criteria
# enthaelt >=2 Boost-Keywords damit boost_matches_db_mc greift.
CORE_IMPRESSUM_MCS: list[dict] = [
make_mc("IMP-NAME-A1", ["Anbieter Anschrift Adresse Firma angegeben"]),
make_mc("IMP-MAIL-A1", ["E-Mail Kontaktmöglichkeit angegeben"]),
make_mc("IMP-TEL-A1", ["Telefon Telefonnummer Rufnummer angegeben"],
severity="MEDIUM"),
make_mc("IMP-HR-A1", ["Handelsregister Registernummer HRB angegeben"]),
make_mc("IMP-UST-A1", ["Umsatzsteuer USt-IdNr angegeben"],
severity="MEDIUM"),
make_mc("IMP-VTR-A1",
["Geschäftsführer Vorstand Vertretungsberechtigt angegeben"]),
make_mc("IMP-VTRL-A1",
["Geschäftsführer Vorstand deutsche Bezeichnung Rechtsform"],
severity="MEDIUM"),
]
@@ -0,0 +1,69 @@
"""Tests für Cookie-Policy-Agent v3 — Delegation an die Main-Tool-Engine,
Sektor-Filter und Maßnahmen statt Fragen."""
from __future__ import annotations
import asyncio
from compliance.services.specialist_agents.cookie_policy import v3_engine
from compliance.services.specialist_agents.cookie_policy.agent import (
_build_measure,
)
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_build_measure_is_imperative_not_question():
m = _build_measure("Speicherdauer der Cookies", "TDDDG § 25")
assert "?" not in m
assert "ergänzen" in m.lower()
assert "Rechtsgrundlage" in m
def test_build_measure_handles_empty_label():
m = _build_measure("", "")
assert "?" not in m
assert m.strip() != ""
def test_cookie_v3_delegates_and_sector_filters(monkeypatch):
"""run_v3_pipeline lädt über die Main-Tool-Engine (cookie) und das
Sektor-Gate entfernt GOV-MCs out-of-scope."""
async def _fake_load(doc_type, db_url, limit, business_scope=None):
assert doc_type == "cookie"
return [
{"control_id": "COOKIE-1-A1", "title": "Cookie-Kategorien",
"regulation": "TDDDG", "article": "§ 25", "severity": "HIGH",
"check_question": "Kategorien genannt?",
"pass_criteria": '["Cookie Kategorien essentiell"]',
"fail_criteria": "[]"},
{"control_id": "GOV-9-A1", "title": "Behörden-Cookie",
"regulation": "X", "article": "", "severity": "HIGH",
"check_question": "Behörde?",
"pass_criteria": '["Behörde Aufsicht"]',
"fail_criteria": "[]"},
]
monkeypatch.setattr(
"compliance.services.rag_document_checker._load_controls",
_fake_load,
)
async def _no_match(*a, **kw):
return set()
monkeypatch.setattr(
"compliance.services.mc_embedding_matcher.embedding_match",
_no_match, raising=False,
)
text = ("Diese Website verwendet Cookies. Cookie-Kategorien: "
"essentiell, funktional. Speicherdauer und Zweck beschrieben. "
) * 4 # > 100 Zeichen
results, telem = _run(
v3_engine.run_v3_pipeline(text, business_scope=set()),
)
cids = {r["control_id"] for r in results}
assert "GOV-9-A1" not in cids # Sektor out-of-scope entfernt
assert "COOKIE-1-A1" in cids # cookie-MC bleibt
assert telem["sector_dropped"] == 1
@@ -11,7 +11,10 @@ import asyncio
import pytest
from compliance.services.specialist_agents import AgentInput, ImpressumAgent
from tests.fixtures.impressum_groundtruth import ALL_GROUND_TRUTH
from tests.fixtures.impressum_groundtruth import (
ALL_GROUND_TRUTH,
CORE_IMPRESSUM_MCS,
)
def _run(coro):
@@ -19,15 +22,40 @@ def _run(coro):
@pytest.fixture(autouse=True)
def _no_llm(monkeypatch):
"""Skip LLM-Eskalation in den GT-Tests — wir testen MC-Pattern,
nicht LLM-Halluzinationen."""
async def _no_cascade(*a, **kw): return None, []
def _agent_offline(monkeypatch):
"""GT-Tests offline + deterministisch: LLM-Eskalation aus, MC-Laden ueber
die gemockte Main-Tool-Engine (CORE_IMPRESSUM_MCS), Embedding aus. Der
Agent delegiert jetzt ans Main Tool — daher `_load_controls` mocken."""
import copy
async def _no_cascade(*a, **kw):
return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
"compliance.services.specialist_agents._semantic_validator.cascade",
_no_cascade,
)
async def _fake_load(doc_type, db_url, limit, business_scope=None):
return copy.deepcopy(CORE_IMPRESSUM_MCS)
monkeypatch.setattr(
"compliance.services.rag_document_checker._load_controls",
_fake_load,
)
async def _no_embed(*a, **kw):
return None
async def _no_match(*a, **kw):
return set()
monkeypatch.setattr(
"compliance.services.mc_embedding_matcher.ensure_mc_embeddings",
_no_embed, raising=False,
)
monkeypatch.setattr(
"compliance.services.mc_embedding_matcher.embedding_match",
_no_match, raising=False,
)
@pytest.mark.parametrize("gt", ALL_GROUND_TRUTH, ids=lambda g: g.name)
def test_no_false_positives_on_expected_clean(gt):
@@ -15,10 +15,17 @@ from compliance.services.specialist_agents import (
ImpressumAgent,
Severity,
)
from compliance.services.specialist_agents.impressum.agent import (
_build_measure,
)
from compliance.services.specialist_agents.impressum.regex_boost import (
BOOST_KEYWORDS,
boost_matches_db_mc,
compute_regex_boosts,
criteria_on_topic,
)
from compliance.services.specialist_agents.impressum.v3_engine import (
_filter_controls,
)
@@ -187,3 +194,152 @@ def test_short_text_skipped():
def test_agent_version_is_three():
agent = ImpressumAgent()
assert agent.agent_version == "3.0"
# ── Themen-Gate: criteria_on_topic ──────────────────────────────────
def test_criteria_on_topic_keeps_genuine_telefon():
assert criteria_on_topic([
"Telefonnummer angeben",
"Erreichbar per Telefon",
]) is True
def test_criteria_on_topic_keeps_genuine_address():
assert criteria_on_topic([
"Vollständige Postadresse (Straße, Hausnummer, PLZ, Ort)",
]) is True
def test_criteria_on_topic_drops_bestellbestaetigung():
# Fremd-MC: kein Impressum-Themenüberlapp → raus.
assert criteria_on_topic([
"Bestellbestätigung wird nach Vertragsschluss versendet",
"Bestelleingang wird dokumentiert",
]) is False
def test_criteria_on_topic_single_incidental_hit_dropped():
# 'E-Mail' allein (1 Treffer) reicht nicht — braucht >=2.
assert criteria_on_topic([
"Bestellbestätigung wird per E-Mail versendet",
]) is False
def test_criteria_on_topic_drops_behoerdliche_anzeige():
assert criteria_on_topic([
"Behördliche Anzeige der Tätigkeit erfolgt",
"Gewerbeanmeldung liegt vor",
]) is False
def test_criteria_on_topic_empty_kept():
# Keine Kriterien = kein Signal → konservativ behalten.
assert criteria_on_topic([]) is True
# ── Scope-Filter: _filter_controls ──────────────────────────────────
def _mc(control_id, pass_criteria):
return {"control_id": control_id, "pass_criteria": pass_criteria,
"fail_criteria": []}
def test_filter_controls_drops_gov_when_out_of_scope():
controls = [_mc("GOV-814-A03", ["Behörde meldet an Aufsichtsstelle"])]
kept, stats = _filter_controls(controls, business_scope=set())
assert kept == []
assert stats["sector_dropped"] == 1
def test_filter_controls_keeps_gov_when_in_scope():
controls = [_mc("GOV-814-A03",
["Aufsichtsbehörde und Behörde benannt"])]
kept, stats = _filter_controls(controls,
business_scope={"government"})
assert len(kept) == 1
assert stats["sector_dropped"] == 0
def test_filter_controls_keeps_genuine_impressum_mc():
controls = [_mc("AUTH-1954-A07",
["Vollständige Postadresse mit Straße und PLZ"])]
kept, stats = _filter_controls(controls, business_scope=set())
assert len(kept) == 1
assert stats["sector_dropped"] == 0
assert stats["offtopic_dropped"] == 0
def test_filter_controls_drops_offtopic_non_sector_mc():
controls = [_mc("ECOM-1-A1",
["Bestellbestätigung nach Vertragsschluss versenden"])]
kept, stats = _filter_controls(controls, business_scope=set())
assert kept == []
assert stats["offtopic_dropped"] == 1
# ── Maßnahme statt Frage: _build_measure ────────────────────────────
def test_build_measure_is_imperative_not_question():
m = _build_measure("USt-IdNr", "§ 5 Abs. 1 Nr. 6 TMG")
assert "?" not in m
assert "ergänzen" in m.lower()
assert "Rechtsgrundlage" in m
def test_build_measure_handles_empty_label():
m = _build_measure("", "")
assert "?" not in m
assert m.strip() != ""
# ── Delegation an Main-Tool-Engine + Filter (Integration) ───────────
def test_run_v3_pipeline_delegates_and_filters(monkeypatch):
"""run_v3_pipeline lädt über die Main-Tool-Engine (_load_controls
gemockt), normalisiert JSONB-Strings und das Sektor-/Themen-Gate
entfernt GOV (out-of-scope) + fremde MCs. Genuine MC bleibt."""
from compliance.services.specialist_agents.impressum import v3_engine
async def _fake_load(doc_type, db_url, limit, business_scope=None):
# pass_criteria absichtlich als JSON-STRING (wie asyncpg JSONB)
return [
{"control_id": "AUTH-1954-A07", "title": "USt-IdNr",
"regulation": "TMG", "article": "§ 5", "severity": "HIGH",
"check_question": "Ist die USt-IdNr angegeben?",
"pass_criteria": '["USt-IdNr"]',
"fail_criteria": "[]"},
{"control_id": "GOV-814-A03", "title": "Behördliche Anzeige",
"regulation": "X", "article": "", "severity": "HIGH",
"check_question": "Behörde informiert?",
"pass_criteria": '["Aufsichtsbehörde und Behörde benannt"]',
"fail_criteria": "[]"},
{"control_id": "ECOM-1-A1", "title": "Bestellbestätigung",
"regulation": "X", "article": "", "severity": "HIGH",
"check_question": "Bestellbestätigung versandt?",
"pass_criteria":
'["Bestellbestätigung nach Vertragsschluss versenden"]',
"fail_criteria": "[]"},
]
monkeypatch.setattr(
"compliance.services.rag_document_checker._load_controls",
_fake_load,
)
# AUTH-MC matched per Keyword → kein Layer-2-Embedding nötig; kein
# mc_embedding_matcher-Mock erforderlich.
text = ("Beispiel GmbH\nMusterstr. 1\n12345 Berlin\n"
"USt-IdNr: DE123456789\n") * 5 # >100 Zeichen
results, telem = _run(
v3_engine.run_v3_pipeline(text, business_scope=set()),
)
cids = {r["control_id"] for r in results}
assert "GOV-814-A03" not in cids # Sektor out-of-scope
assert "ECOM-1-A1" not in cids # themenfremd
assert "AUTH-1954-A07" in cids # genuine MC bleibt
assert telem["sector_dropped"] == 1
assert telem["offtopic_dropped"] == 1
@@ -13,6 +13,7 @@ from compliance.services.specialist_agents._semantic_validator import (
standard_label,
validate_present,
)
from tests.fixtures.impressum_groundtruth import make_mc
def _run(coro):
@@ -91,10 +92,33 @@ def test_semantic_demotion_high_to_low(monkeypatch):
"compliance.services.specialist_agents._semantic_validator.cascade",
_fake_cascade,
)
# Agent delegiert MC-Laden ans Main Tool → _load_controls mocken.
# control_id == field_id 'kontakt_telefon', damit der Semantic-Demote
# das Finding ueber die LLM-Antwort zuordnet.
async def _fake_load(doc_type, db_url, limit, business_scope=None):
return [make_mc("kontakt_telefon",
["Telefon Telefonnummer Rufnummer"],
severity="HIGH")]
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_fake_cascade,
"compliance.services.rag_document_checker._load_controls",
_fake_load,
)
async def _no_match(*a, **kw):
return set()
monkeypatch.setattr(
"compliance.services.mc_embedding_matcher.embedding_match",
_no_match, raising=False,
)
async def _no_embed(*a, **kw):
return None
monkeypatch.setattr(
"compliance.services.mc_embedding_matcher.ensure_mc_embeddings",
_no_embed, raising=False,
)
# Text OHNE Telefon-Label → MC matched nicht → HIGH-Finding
text = (
"Beispiel GmbH\nMusterstr. 1\n12345 Berlin\n"
@@ -9,6 +9,8 @@ from unittest.mock import AsyncMock, patch
import pytest
from fastapi.testclient import TestClient
from tests.fixtures.impressum_groundtruth import make_mc
@pytest.fixture
def app(tmp_path, monkeypatch):
@@ -69,9 +71,37 @@ def test_run_result_after_text_input(client, monkeypatch):
# Skip LLM
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
"compliance.services.specialist_agents._semantic_validator.cascade",
_no_cascade,
)
# Agent delegiert MC-Laden ans Main Tool → _load_controls mocken.
# Tesla nennt 'Management' (engl.) statt deutschem GF-Label → das
# label_korrekt-MC schlaegt fehl → erwartetes Finding.
async def _fake_load(doc_type, db_url, limit, business_scope=None):
# pass_criteria nur mit label_korrekt-eigenen Keywords (deutsche/
# Bezeichnung/Rechtsform) — NICHT 'Geschäftsführer/Vorstand', sonst
# boostet das aktive vertretungsberechtigte-Feld (Tesla: 'Management')
# das MC faelschlich auf PASS.
return [make_mc(
"vertretungsberechtigte_label_korrekt",
["deutsche Bezeichnung Rechtsform Pflicht angeben"],
)]
monkeypatch.setattr(
"compliance.services.rag_document_checker._load_controls",
_fake_load,
)
async def _no_match(*a, **kw): return set()
monkeypatch.setattr(
"compliance.services.mc_embedding_matcher.embedding_match",
_no_match, raising=False,
)
async def _no_embed(*a, **kw): return None
monkeypatch.setattr(
"compliance.services.mc_embedding_matcher.ensure_mc_embeddings",
_no_embed, raising=False,
)
r = client.post("/api/v1/specialist-agent/test/start",
json={"agent_id": "impressum",
"raw_texts": [
@@ -103,7 +133,7 @@ def test_run_result_after_text_input(client, monkeypatch):
def test_artifacts_listing(client, monkeypatch):
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
"compliance.services.specialist_agents._semantic_validator.cascade",
_no_cascade,
)
r = client.post("/api/v1/specialist-agent/test/start",