diff --git a/backend-compliance/compliance/api/agent_check/_agent_outputs.py b/backend-compliance/compliance/api/agent_check/_agent_outputs.py index 1fd82d1b..751eb9b4 100644 --- a/backend-compliance/compliance/api/agent_check/_agent_outputs.py +++ b/backend-compliance/compliance/api/agent_check/_agent_outputs.py @@ -16,6 +16,9 @@ from __future__ import annotations import logging from compliance.services.specialist_agents import REGISTRY, AgentInput +from compliance.services.specialist_agents.impressum._classification import ( + scan_context_to_scope, +) from ._sse import emit @@ -59,7 +62,14 @@ async def run_agent_outputs(state: dict) -> None: origin_domain = ( getattr(req, "origin_domain", None) or "" ) or state.get("domain", "") - scope = _derive_scope(profile_dict) + # Phase 3: die 8 Wizard-Felder (scan_context) sind der primäre + # Scope-Treiber; das LLM-Profil ergänzt nur (v.a. regulated_profession, + # das die 8 Felder nicht ausdrücken können). + scan_context = getattr(req, "scan_context", None) + scope = sorted( + set(scan_context_to_scope(scan_context)) + | set(_derive_scope(profile_dict)) + ) outputs: dict[str, dict] = state.get("agent_outputs") or {} for topic, agent_id in _TOPIC_AGENTS.items(): diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/_classification.py b/backend-compliance/compliance/services/specialist_agents/impressum/_classification.py new file mode 100644 index 00000000..716c2cfa --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/impressum/_classification.py @@ -0,0 +1,62 @@ +"""Phase 3: Normalisierung der 8 Audit-Klassifizierungs-Felder +(scan_context aus dem PreScanWizard) → business_scope-Tokens. + +EINZIGER Normalisierungspunkt: beide Quellen (SDK-Profil/Scope ODER der +standalone Compliance-Check) füllen denselben business_scope, den +scope_matches() in den Agenten konsumiert. Schließt die Drift, dass die +8 Felder gespeichert, aber nicht an die Agenten gegeben wurden. + +Rechts-Zuordnung mit dem User (Domain-Experte) bestätigt 2026-06-10: +- industry=healthcare → NICHT pauschal regulated_profession (Krankenhaus- + GmbH ≠ Apotheke). regulated_profession kommt nur aus expliziter + Erkennung (LLM-Profil is_regulated_profession), nicht aus der Branche. +- Handelsregister: gmbh/ug/ag/kg/ohg/gmbh_co_kg/ek (e.K. ist registerpflichtig). +- Vertretungsberechtigte: + verein/stiftung, aber OHNE ek (Inhaber genügt). +- USt-IdNr: kein Rechtsform-Gate (Kleinunternehmer §19 haben keine). +""" + +from __future__ import annotations + +# Rechtsformen OHNE Handelsregister-Eintrag → Handelsregister-MC n/a. +# (HR-pflichtig sind gmbh/ug/ag/kg/ohg/gmbh_co_kg/ek — die schließen wir +# NICHT aus. Opt-out, damit Entry-Points ohne legal_form anwendbar bleiben.) +_NON_HANDELSREGISTER_FORMS = frozenset({"verein", "stiftung", "behoerde", "other"}) +# Rechtsformen OHNE gesondertes Vertretungsorgan (e.K. = Inhaber selbst). +_NON_VERTRETUNG_FORMS = frozenset({"ek", "behoerde", "other"}) + + +def scan_context_to_scope(scan_context: dict | None) -> list[str]: + """8 Wizard-Felder → business_scope-Tokens (für scope_matches).""" + sc = scan_context or {} + industry = str(sc.get("industry") or "").lower() + business_model = str(sc.get("business_model") or "").lower() + direct_sales = str(sc.get("direct_sales") or "").lower() + legal_form = str(sc.get("legal_form") or "").lower() + + scope: set[str] = set() + # ── Branche / Geschäftsmodell ── + if industry == "ecommerce" or direct_sales == "yes": + scope.add("ecommerce") + if business_model in ("b2c", "both"): + scope.add("b2c") + if industry == "insurance": + scope.add("insurance") + if industry == "banking": + scope.add("financial_services") + if industry == "automotive": + scope.add("automotive") + if industry == "media": + scope.add("editorial") # §18 MStV (pragmatisch) + if legal_form == "behoerde" or industry == "public": + scope.add("public_authority") + # industry=healthcare → bewusst KEIN regulated_profession. + + # ── Rechtsform-Gates (opt-out) ── + # Nur explizit AUSSCHLIESSEN; ohne/unbekanntes legal_form bleibt die + # Angabe anwendbar (z.B. Agent-Test ohne Wizard verschluckt nichts). + if legal_form in _NON_HANDELSREGISTER_FORMS: + scope.add("kein_handelsregister") + if legal_form in _NON_VERTRETUNG_FORMS: + scope.add("keine_vertretung") + + return sorted(scope) diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py index c67cfd93..de9e0dc7 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py @@ -125,6 +125,14 @@ class ImpressumAgent(BaseSpecialistAgent): mc_id=mc.mc_id, status="ok", reason="Pattern-Treffer", )) continue + if mc.optional: + # fehlt + optional → KEIN Finding (z.B. USt-IdNr; + # Kleinunternehmer §19 haben legitim keine). + coverage.append(McCoverage( + mc_id=mc.mc_id, status="na", + reason="optional — nicht angegeben", + )) + continue sev = _SEV_TO_ENUM.get(mc.severity_if_missing, Severity.MEDIUM) findings.append(Finding( check_id=f"IMP-{mc.field_id}", diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py b/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py index db745eb7..a5c32d18 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py @@ -23,8 +23,15 @@ class MC: patterns: tuple[Pattern[str], ...] = field(default_factory=tuple) severity_if_missing: str = "MEDIUM" # HIGH | MEDIUM | LOW | INFO requires_scope: tuple[str, ...] = field(default_factory=tuple) + # Opt-out: NICHT anwendbar, wenn eines dieser Tokens im Scope liegt + # (z.B. Einzelunternehmer für Vertretungsberechtigte). Default = immer + # anwendbar → Entry-Points ohne legal_form verschlucken nichts. + excludes_scope: tuple[str, ...] = field(default_factory=tuple) # Wenn True: bei Scope-Mismatch nicht-applicable melden, sonst skip explicit_na: bool = True + # Wenn True: fehlt die Angabe → KEIN Finding (z.B. USt-IdNr — + # Kleinunternehmer §19 haben legitim keine). Nur wenn vorhanden relevant. + optional: bool = False MCS: tuple[MC, ...] = ( @@ -78,6 +85,7 @@ MCS: tuple[MC, ...] = ( label="Handelsregister-Eintrag", norm="§ 5 Abs. 1 Nr. 4 TMG", severity_if_missing="HIGH", + excludes_scope=("kein_handelsregister",), patterns=( re.compile(r"\bHR[BA]\s+\d", re.IGNORECASE), re.compile(r"Handelsregister", re.IGNORECASE), @@ -89,6 +97,7 @@ MCS: tuple[MC, ...] = ( label="USt-IdNr", norm="§ 5 Abs. 1 Nr. 6 TMG", severity_if_missing="MEDIUM", + optional=True, patterns=( re.compile( r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]", @@ -103,6 +112,7 @@ MCS: tuple[MC, ...] = ( label="Vertretungsberechtigte Person", norm="§ 5 Abs. 1 Nr. 1 TMG (juristische Personen)", severity_if_missing="HIGH", + excludes_scope=("keine_vertretung",), patterns=( re.compile( r"(?:Gesch(?:ae|ä)ftsf(?:ue|ü)hr(?:er|ung|erin)|" @@ -214,6 +224,10 @@ MC_IDS: tuple[str, ...] = tuple(m.mc_id for m in MCS) def scope_matches(mc: MC, scope: set[str], is_automotive: bool) -> bool: """Entscheidet ob die MC auf den Business-Scope anwendbar ist.""" + # Opt-out zuerst: explizit ausgeschlossene Rechtsformen (z.B. + # Einzelunternehmer für Vertretungsberechtigte) → nicht anwendbar. + if mc.excludes_scope and any(s in scope for s in mc.excludes_scope): + return False if not mc.requires_scope: return True if mc.field_id == "aufsichtsbehoerde" and is_automotive: diff --git a/backend-compliance/compliance/tests/test_impressum_classification.py b/backend-compliance/compliance/tests/test_impressum_classification.py new file mode 100644 index 00000000..b917ac32 --- /dev/null +++ b/backend-compliance/compliance/tests/test_impressum_classification.py @@ -0,0 +1,122 @@ +"""Phase 3: scan_context → business_scope Normalisierung + Rechtsform-Gates. + +Rechts-Zuordnung vom Domain-Experten bestätigt (2026-06-10): +e.K. ist registerpflichtig (Handelsregister-Finding) aber ohne gesonderte +Vertretungsberechtigte; Verein umgekehrt; USt-IdNr fehlt → kein Finding; +healthcare triggert NICHT regulated_profession. +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from compliance.services.specialist_agents import AgentInput +from compliance.services.specialist_agents.impressum._classification import ( + scan_context_to_scope, +) +from compliance.services.specialist_agents.impressum.agent import ImpressumAgent + + +def _scope(**kw) -> set[str]: + return set(scan_context_to_scope(kw)) + + +def test_gmbh_no_exclusions(): + # GmbH → keine Ausschluss-Tokens → beide MCs anwendbar. + s = _scope(legal_form="gmbh") + assert "kein_handelsregister" not in s + assert "keine_vertretung" not in s + + +def test_einzelkaufmann_register_but_no_vertretung(): + # e.K. registerpflichtig (kein Ausschluss) aber Inhaber genügt. + s = _scope(legal_form="ek") + assert "kein_handelsregister" not in s + assert "keine_vertretung" in s + + +def test_verein_vertretung_but_no_register(): + # e.V. = Vereinsregister, NICHT Handelsregister → HR ausgeschlossen. + s = _scope(legal_form="verein") + assert "kein_handelsregister" in s + assert "keine_vertretung" not in s + + +def test_branche_tokens(): + assert "ecommerce" in _scope(industry="ecommerce") + assert "ecommerce" in _scope(direct_sales="yes") + assert "b2c" in _scope(business_model="b2c") + assert "b2c" in _scope(business_model="both") + assert "automotive" in _scope(industry="automotive") + assert "editorial" in _scope(industry="media") + assert "insurance" in _scope(industry="insurance") + assert "financial_services" in _scope(industry="banking") + assert "public_authority" in _scope(legal_form="behoerde") + assert "public_authority" in _scope(industry="public") + + +def test_healthcare_does_not_imply_regulated_profession(): + # Krankenhaus-GmbH ≠ Apotheke → industry allein triggert es nicht. + assert "regulated_profession" not in _scope( + industry="healthcare", legal_form="gmbh") + + +def test_unknown_legal_form_no_exclusions(): + # Unbekannte Rechtsform → keine Ausschluss-Tokens → MCs bleiben anwendbar. + # (Das 4-Status-Modell INSUFFICIENT_EVIDENCE folgt in der nächsten Phase.) + s = _scope(industry="ecommerce") # kein legal_form + assert "kein_handelsregister" not in s + assert "keine_vertretung" not in s + + +# ── Agent-Verhalten mit den Gates ────────────────────────────────── + +IMPRESSUM_MINIMAL = ( + "Angaben gemäß § 5 TMG\n\n" + "Beispiel Firma\n" + "Musterstraße 1\n" + "12345 Berlin\n\n" + "E-Mail: info@example.com\n" + "Telefon: +49 30 1234567\n" + "Mehr Informationen auf unserer Website.\n" +) + + +@pytest.fixture(autouse=True) +def _llm_offline(monkeypatch): + async def _no_validate(*_a, **_kw): + return {} + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.validate_present", + _no_validate, raising=False, + ) + + +def _finding_fields(legal_form: str) -> set[str]: + agent = ImpressumAgent() + out = asyncio.run(agent.evaluate(AgentInput( + doc_type="impressum", + text=IMPRESSUM_MINIMAL, + business_scope=scan_context_to_scope({"legal_form": legal_form}), + ))) + return {f.field_id for f in out.findings} + + +def test_einzelkaufmann_handelsregister_finding_no_vertretung(): + fields = _finding_fields("ek") + assert "handelsregister" in fields # registerpflichtig + assert "vertretungsberechtigte" not in fields # Inhaber genügt + + +def test_gmbh_both_findings(): + fields = _finding_fields("gmbh") + assert "handelsregister" in fields + assert "vertretungsberechtigte" in fields + + +def test_ust_id_absent_yields_no_finding(): + # USt-IdNr fehlt im Text → optional → KEIN Finding (egal welche Rechtsform). + assert "ust_id" not in _finding_fields("gmbh") + assert "ust_id" not in _finding_fields("ek") diff --git a/backend-compliance/compliance/tests/test_sse_compliance_check.py b/backend-compliance/compliance/tests/test_sse_compliance_check.py index e1c046c6..7645cea0 100644 --- a/backend-compliance/compliance/tests/test_sse_compliance_check.py +++ b/backend-compliance/compliance/tests/test_sse_compliance_check.py @@ -1,7 +1,7 @@ """Phase 2: SSE-Plumbing für den Compliance-Check. -Deckt emit (Queue-Push), _format_sse (SSE-Zeilenformat) und den -event_generator (hello → Events → stream_close bei 'complete') ab. +Queue + Generator laufen innerhalb eines asyncio.run (sonst bindet +asyncio.Queue in Py3.9 an einen ggf. geschlossenen Loop). """ from __future__ import annotations @@ -11,15 +11,8 @@ import asyncio from compliance.api.agent_check import _sse -def test_emit_pushes_and_format(): - cid = "sse-test-1" - _sse.new_queue(cid) - _sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {"x": 1}}) - q = _sse._check_queues[cid] - assert q.qsize() == 1 - ev = q.get_nowait() - assert ev["type"] == "topic" and ev["topic"] == "impressum" - line = _sse._format_sse(ev) +def test_format_sse_line(): + line = _sse._format_sse({"type": "topic", "topic": "impressum"}) assert line.startswith("data: ") and line.endswith("\n\n") assert '"impressum"' in line @@ -29,21 +22,20 @@ def test_emit_is_noop_without_queue(): _sse.emit("does-not-exist-xyz", {"type": "topic"}) -def test_event_generator_streams_topic_then_closes_on_complete(): - cid = "sse-test-gen" - _sse.new_queue(cid) - _sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}}) - _sse.emit(cid, {"type": "complete", "status": "completed"}) - - async def collect(): - out = [] +def test_emit_and_event_generator_streams_then_closes(): + async def scenario(): + cid = "sse-test-gen" + _sse.new_queue(cid) + _sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}}) + _sse.emit(cid, {"type": "complete", "status": "completed"}) + out: list[str] = [] async for line in _sse.event_generator(cid): out.append(line) if len(out) > 12: # safety break return out - blob = "".join(asyncio.run(collect())) + blob = "".join(asyncio.run(scenario())) assert '"type": "hello"' in blob assert '"topic": "impressum"' in blob assert '"type": "complete"' in blob