Files
breakpilot-compliance/backend-compliance/compliance/tests/test_sse_compliance_check.py
T
Benjamin Admin b7a7e70731 feat(agent): Impressum Rechtsform-Gates + USt-optional (Phase 3)
Die 8 Audit-Klassifizierungs-Felder (scan_context) treiben jetzt den
business_scope der Agenten (vorher gespeichert, aber nicht genutzt).
Rechtsform-Gates als opt-out (excludes_scope): Verein -> kein
Handelsregister-Finding, e.K. -> kein Vertretungsberechtigte-Finding;
unbekannte Rechtsform bleibt anwendbar. USt-IdNr optional -> fehlt =
kein Finding. Rechts-Zuordnung vom Domain-Experten bestaetigt.

- _classification.py: scan_context_to_scope (8 Felder -> scope-Tokens)
- mcs.py: MC.excludes_scope + MC.optional; IMP-MC-004/006 Gate-Tokens;
  IMP-MC-005 optional; scope_matches respektiert excludes_scope
- agent.py: optional -> kein Finding bei Abwesenheit
- _agent_outputs.py: scope = scan_context vereinigt LLM-Profil-Fallback
- Tests gruen: v3 25, Groundtruth 13, CI-Pfad 14 (+ SSE-Loop-Fix)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-10 20:37:56 +02:00

43 lines
1.3 KiB
Python

"""Phase 2: SSE-Plumbing für den Compliance-Check.
Queue + Generator laufen innerhalb eines asyncio.run (sonst bindet
asyncio.Queue in Py3.9 an einen ggf. geschlossenen Loop).
"""
from __future__ import annotations
import asyncio
from compliance.api.agent_check import _sse
def test_format_sse_line():
line = _sse._format_sse({"type": "topic", "topic": "impressum"})
assert line.startswith("data: ") and line.endswith("\n\n")
assert '"impressum"' in line
def test_emit_is_noop_without_queue():
# Kein new_queue → emit darf nicht crashen (best-effort).
_sse.emit("does-not-exist-xyz", {"type": "topic"})
def test_emit_and_event_generator_streams_then_closes():
async def scenario():
cid = "sse-test-gen"
_sse.new_queue(cid)
_sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}})
_sse.emit(cid, {"type": "complete", "status": "completed"})
out: list[str] = []
async for line in _sse.event_generator(cid):
out.append(line)
if len(out) > 12: # safety
break
return out
blob = "".join(asyncio.run(scenario()))
assert '"type": "hello"' in blob
assert '"topic": "impressum"' in blob
assert '"type": "complete"' in blob
assert '"type": "stream_close"' in blob