feat(agent): Impressum Rechtsform-Gates + USt-optional (Phase 3)

Die 8 Audit-Klassifizierungs-Felder (scan_context) treiben jetzt den
business_scope der Agenten (vorher gespeichert, aber nicht genutzt).
Rechtsform-Gates als opt-out (excludes_scope): Verein -> kein
Handelsregister-Finding, e.K. -> kein Vertretungsberechtigte-Finding;
unbekannte Rechtsform bleibt anwendbar. USt-IdNr optional -> fehlt =
kein Finding. Rechts-Zuordnung vom Domain-Experten bestaetigt.

- _classification.py: scan_context_to_scope (8 Felder -> scope-Tokens)
- mcs.py: MC.excludes_scope + MC.optional; IMP-MC-004/006 Gate-Tokens;
  IMP-MC-005 optional; scope_matches respektiert excludes_scope
- agent.py: optional -> kein Finding bei Abwesenheit
- _agent_outputs.py: scope = scan_context vereinigt LLM-Profil-Fallback
- Tests gruen: v3 25, Groundtruth 13, CI-Pfad 14 (+ SSE-Loop-Fix)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-10 20:37:56 +02:00
parent 65de90114a
commit b7a7e70731
6 changed files with 229 additions and 21 deletions
@@ -16,6 +16,9 @@ from __future__ import annotations
import logging
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.services.specialist_agents.impressum._classification import (
scan_context_to_scope,
)
from ._sse import emit
@@ -59,7 +62,14 @@ async def run_agent_outputs(state: dict) -> None:
origin_domain = (
getattr(req, "origin_domain", None) or ""
) or state.get("domain", "")
scope = _derive_scope(profile_dict)
# Phase 3: die 8 Wizard-Felder (scan_context) sind der primäre
# Scope-Treiber; das LLM-Profil ergänzt nur (v.a. regulated_profession,
# das die 8 Felder nicht ausdrücken können).
scan_context = getattr(req, "scan_context", None)
scope = sorted(
set(scan_context_to_scope(scan_context))
| set(_derive_scope(profile_dict))
)
outputs: dict[str, dict] = state.get("agent_outputs") or {}
for topic, agent_id in _TOPIC_AGENTS.items():
@@ -0,0 +1,62 @@
"""Phase 3: Normalisierung der 8 Audit-Klassifizierungs-Felder
(scan_context aus dem PreScanWizard) → business_scope-Tokens.
EINZIGER Normalisierungspunkt: beide Quellen (SDK-Profil/Scope ODER der
standalone Compliance-Check) füllen denselben business_scope, den
scope_matches() in den Agenten konsumiert. Schließt die Drift, dass die
8 Felder gespeichert, aber nicht an die Agenten gegeben wurden.
Rechts-Zuordnung mit dem User (Domain-Experte) bestätigt 2026-06-10:
- industry=healthcare → NICHT pauschal regulated_profession (Krankenhaus-
GmbH ≠ Apotheke). regulated_profession kommt nur aus expliziter
Erkennung (LLM-Profil is_regulated_profession), nicht aus der Branche.
- Handelsregister: gmbh/ug/ag/kg/ohg/gmbh_co_kg/ek (e.K. ist registerpflichtig).
- Vertretungsberechtigte: + verein/stiftung, aber OHNE ek (Inhaber genügt).
- USt-IdNr: kein Rechtsform-Gate (Kleinunternehmer §19 haben keine).
"""
from __future__ import annotations
# Rechtsformen OHNE Handelsregister-Eintrag → Handelsregister-MC n/a.
# (HR-pflichtig sind gmbh/ug/ag/kg/ohg/gmbh_co_kg/ek — die schließen wir
# NICHT aus. Opt-out, damit Entry-Points ohne legal_form anwendbar bleiben.)
_NON_HANDELSREGISTER_FORMS = frozenset({"verein", "stiftung", "behoerde", "other"})
# Rechtsformen OHNE gesondertes Vertretungsorgan (e.K. = Inhaber selbst).
_NON_VERTRETUNG_FORMS = frozenset({"ek", "behoerde", "other"})
def scan_context_to_scope(scan_context: dict | None) -> list[str]:
"""8 Wizard-Felder → business_scope-Tokens (für scope_matches)."""
sc = scan_context or {}
industry = str(sc.get("industry") or "").lower()
business_model = str(sc.get("business_model") or "").lower()
direct_sales = str(sc.get("direct_sales") or "").lower()
legal_form = str(sc.get("legal_form") or "").lower()
scope: set[str] = set()
# ── Branche / Geschäftsmodell ──
if industry == "ecommerce" or direct_sales == "yes":
scope.add("ecommerce")
if business_model in ("b2c", "both"):
scope.add("b2c")
if industry == "insurance":
scope.add("insurance")
if industry == "banking":
scope.add("financial_services")
if industry == "automotive":
scope.add("automotive")
if industry == "media":
scope.add("editorial") # §18 MStV (pragmatisch)
if legal_form == "behoerde" or industry == "public":
scope.add("public_authority")
# industry=healthcare → bewusst KEIN regulated_profession.
# ── Rechtsform-Gates (opt-out) ──
# Nur explizit AUSSCHLIESSEN; ohne/unbekanntes legal_form bleibt die
# Angabe anwendbar (z.B. Agent-Test ohne Wizard verschluckt nichts).
if legal_form in _NON_HANDELSREGISTER_FORMS:
scope.add("kein_handelsregister")
if legal_form in _NON_VERTRETUNG_FORMS:
scope.add("keine_vertretung")
return sorted(scope)
@@ -125,6 +125,14 @@ class ImpressumAgent(BaseSpecialistAgent):
mc_id=mc.mc_id, status="ok", reason="Pattern-Treffer",
))
continue
if mc.optional:
# fehlt + optional → KEIN Finding (z.B. USt-IdNr;
# Kleinunternehmer §19 haben legitim keine).
coverage.append(McCoverage(
mc_id=mc.mc_id, status="na",
reason="optional — nicht angegeben",
))
continue
sev = _SEV_TO_ENUM.get(mc.severity_if_missing, Severity.MEDIUM)
findings.append(Finding(
check_id=f"IMP-{mc.field_id}",
@@ -23,8 +23,15 @@ class MC:
patterns: tuple[Pattern[str], ...] = field(default_factory=tuple)
severity_if_missing: str = "MEDIUM" # HIGH | MEDIUM | LOW | INFO
requires_scope: tuple[str, ...] = field(default_factory=tuple)
# Opt-out: NICHT anwendbar, wenn eines dieser Tokens im Scope liegt
# (z.B. Einzelunternehmer für Vertretungsberechtigte). Default = immer
# anwendbar → Entry-Points ohne legal_form verschlucken nichts.
excludes_scope: tuple[str, ...] = field(default_factory=tuple)
# Wenn True: bei Scope-Mismatch nicht-applicable melden, sonst skip
explicit_na: bool = True
# Wenn True: fehlt die Angabe → KEIN Finding (z.B. USt-IdNr —
# Kleinunternehmer §19 haben legitim keine). Nur wenn vorhanden relevant.
optional: bool = False
MCS: tuple[MC, ...] = (
@@ -78,6 +85,7 @@ MCS: tuple[MC, ...] = (
label="Handelsregister-Eintrag",
norm="§ 5 Abs. 1 Nr. 4 TMG",
severity_if_missing="HIGH",
excludes_scope=("kein_handelsregister",),
patterns=(
re.compile(r"\bHR[BA]\s+\d", re.IGNORECASE),
re.compile(r"Handelsregister", re.IGNORECASE),
@@ -89,6 +97,7 @@ MCS: tuple[MC, ...] = (
label="USt-IdNr",
norm="§ 5 Abs. 1 Nr. 6 TMG",
severity_if_missing="MEDIUM",
optional=True,
patterns=(
re.compile(
r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]",
@@ -103,6 +112,7 @@ MCS: tuple[MC, ...] = (
label="Vertretungsberechtigte Person",
norm="§ 5 Abs. 1 Nr. 1 TMG (juristische Personen)",
severity_if_missing="HIGH",
excludes_scope=("keine_vertretung",),
patterns=(
re.compile(
r"(?:Gesch(?:ae|ä)ftsf(?:ue|ü)hr(?:er|ung|erin)|"
@@ -214,6 +224,10 @@ MC_IDS: tuple[str, ...] = tuple(m.mc_id for m in MCS)
def scope_matches(mc: MC, scope: set[str], is_automotive: bool) -> bool:
"""Entscheidet ob die MC auf den Business-Scope anwendbar ist."""
# Opt-out zuerst: explizit ausgeschlossene Rechtsformen (z.B.
# Einzelunternehmer für Vertretungsberechtigte) → nicht anwendbar.
if mc.excludes_scope and any(s in scope for s in mc.excludes_scope):
return False
if not mc.requires_scope:
return True
if mc.field_id == "aufsichtsbehoerde" and is_automotive:
@@ -0,0 +1,122 @@
"""Phase 3: scan_context → business_scope Normalisierung + Rechtsform-Gates.
Rechts-Zuordnung vom Domain-Experten bestätigt (2026-06-10):
e.K. ist registerpflichtig (Handelsregister-Finding) aber ohne gesonderte
Vertretungsberechtigte; Verein umgekehrt; USt-IdNr fehlt → kein Finding;
healthcare triggert NICHT regulated_profession.
"""
from __future__ import annotations
import asyncio
import pytest
from compliance.services.specialist_agents import AgentInput
from compliance.services.specialist_agents.impressum._classification import (
scan_context_to_scope,
)
from compliance.services.specialist_agents.impressum.agent import ImpressumAgent
def _scope(**kw) -> set[str]:
return set(scan_context_to_scope(kw))
def test_gmbh_no_exclusions():
# GmbH → keine Ausschluss-Tokens → beide MCs anwendbar.
s = _scope(legal_form="gmbh")
assert "kein_handelsregister" not in s
assert "keine_vertretung" not in s
def test_einzelkaufmann_register_but_no_vertretung():
# e.K. registerpflichtig (kein Ausschluss) aber Inhaber genügt.
s = _scope(legal_form="ek")
assert "kein_handelsregister" not in s
assert "keine_vertretung" in s
def test_verein_vertretung_but_no_register():
# e.V. = Vereinsregister, NICHT Handelsregister → HR ausgeschlossen.
s = _scope(legal_form="verein")
assert "kein_handelsregister" in s
assert "keine_vertretung" not in s
def test_branche_tokens():
assert "ecommerce" in _scope(industry="ecommerce")
assert "ecommerce" in _scope(direct_sales="yes")
assert "b2c" in _scope(business_model="b2c")
assert "b2c" in _scope(business_model="both")
assert "automotive" in _scope(industry="automotive")
assert "editorial" in _scope(industry="media")
assert "insurance" in _scope(industry="insurance")
assert "financial_services" in _scope(industry="banking")
assert "public_authority" in _scope(legal_form="behoerde")
assert "public_authority" in _scope(industry="public")
def test_healthcare_does_not_imply_regulated_profession():
# Krankenhaus-GmbH ≠ Apotheke → industry allein triggert es nicht.
assert "regulated_profession" not in _scope(
industry="healthcare", legal_form="gmbh")
def test_unknown_legal_form_no_exclusions():
# Unbekannte Rechtsform → keine Ausschluss-Tokens → MCs bleiben anwendbar.
# (Das 4-Status-Modell INSUFFICIENT_EVIDENCE folgt in der nächsten Phase.)
s = _scope(industry="ecommerce") # kein legal_form
assert "kein_handelsregister" not in s
assert "keine_vertretung" not in s
# ── Agent-Verhalten mit den Gates ──────────────────────────────────
IMPRESSUM_MINIMAL = (
"Angaben gemäß § 5 TMG\n\n"
"Beispiel Firma\n"
"Musterstraße 1\n"
"12345 Berlin\n\n"
"E-Mail: info@example.com\n"
"Telefon: +49 30 1234567\n"
"Mehr Informationen auf unserer Website.\n"
)
@pytest.fixture(autouse=True)
def _llm_offline(monkeypatch):
async def _no_validate(*_a, **_kw):
return {}
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.validate_present",
_no_validate, raising=False,
)
def _finding_fields(legal_form: str) -> set[str]:
agent = ImpressumAgent()
out = asyncio.run(agent.evaluate(AgentInput(
doc_type="impressum",
text=IMPRESSUM_MINIMAL,
business_scope=scan_context_to_scope({"legal_form": legal_form}),
)))
return {f.field_id for f in out.findings}
def test_einzelkaufmann_handelsregister_finding_no_vertretung():
fields = _finding_fields("ek")
assert "handelsregister" in fields # registerpflichtig
assert "vertretungsberechtigte" not in fields # Inhaber genügt
def test_gmbh_both_findings():
fields = _finding_fields("gmbh")
assert "handelsregister" in fields
assert "vertretungsberechtigte" in fields
def test_ust_id_absent_yields_no_finding():
# USt-IdNr fehlt im Text → optional → KEIN Finding (egal welche Rechtsform).
assert "ust_id" not in _finding_fields("gmbh")
assert "ust_id" not in _finding_fields("ek")
@@ -1,7 +1,7 @@
"""Phase 2: SSE-Plumbing für den Compliance-Check.
Deckt emit (Queue-Push), _format_sse (SSE-Zeilenformat) und den
event_generator (hello → Events → stream_close bei 'complete') ab.
Queue + Generator laufen innerhalb eines asyncio.run (sonst bindet
asyncio.Queue in Py3.9 an einen ggf. geschlossenen Loop).
"""
from __future__ import annotations
@@ -11,15 +11,8 @@ import asyncio
from compliance.api.agent_check import _sse
def test_emit_pushes_and_format():
cid = "sse-test-1"
_sse.new_queue(cid)
_sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {"x": 1}})
q = _sse._check_queues[cid]
assert q.qsize() == 1
ev = q.get_nowait()
assert ev["type"] == "topic" and ev["topic"] == "impressum"
line = _sse._format_sse(ev)
def test_format_sse_line():
line = _sse._format_sse({"type": "topic", "topic": "impressum"})
assert line.startswith("data: ") and line.endswith("\n\n")
assert '"impressum"' in line
@@ -29,21 +22,20 @@ def test_emit_is_noop_without_queue():
_sse.emit("does-not-exist-xyz", {"type": "topic"})
def test_event_generator_streams_topic_then_closes_on_complete():
cid = "sse-test-gen"
_sse.new_queue(cid)
_sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}})
_sse.emit(cid, {"type": "complete", "status": "completed"})
async def collect():
out = []
def test_emit_and_event_generator_streams_then_closes():
async def scenario():
cid = "sse-test-gen"
_sse.new_queue(cid)
_sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}})
_sse.emit(cid, {"type": "complete", "status": "completed"})
out: list[str] = []
async for line in _sse.event_generator(cid):
out.append(line)
if len(out) > 12: # safety
break
return out
blob = "".join(asyncio.run(collect()))
blob = "".join(asyncio.run(scenario()))
assert '"type": "hello"' in blob
assert '"topic": "impressum"' in blob
assert '"type": "complete"' in blob