b7a7e70731
Die 8 Audit-Klassifizierungs-Felder (scan_context) treiben jetzt den business_scope der Agenten (vorher gespeichert, aber nicht genutzt). Rechtsform-Gates als opt-out (excludes_scope): Verein -> kein Handelsregister-Finding, e.K. -> kein Vertretungsberechtigte-Finding; unbekannte Rechtsform bleibt anwendbar. USt-IdNr optional -> fehlt = kein Finding. Rechts-Zuordnung vom Domain-Experten bestaetigt. - _classification.py: scan_context_to_scope (8 Felder -> scope-Tokens) - mcs.py: MC.excludes_scope + MC.optional; IMP-MC-004/006 Gate-Tokens; IMP-MC-005 optional; scope_matches respektiert excludes_scope - agent.py: optional -> kein Finding bei Abwesenheit - _agent_outputs.py: scope = scan_context vereinigt LLM-Profil-Fallback - Tests gruen: v3 25, Groundtruth 13, CI-Pfad 14 (+ SSE-Loop-Fix) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
43 lines
1.3 KiB
Python
43 lines
1.3 KiB
Python
"""Phase 2: SSE-Plumbing für den Compliance-Check.
|
|
|
|
Queue + Generator laufen innerhalb eines asyncio.run (sonst bindet
|
|
asyncio.Queue in Py3.9 an einen ggf. geschlossenen Loop).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from compliance.api.agent_check import _sse
|
|
|
|
|
|
def test_format_sse_line():
|
|
line = _sse._format_sse({"type": "topic", "topic": "impressum"})
|
|
assert line.startswith("data: ") and line.endswith("\n\n")
|
|
assert '"impressum"' in line
|
|
|
|
|
|
def test_emit_is_noop_without_queue():
|
|
# Kein new_queue → emit darf nicht crashen (best-effort).
|
|
_sse.emit("does-not-exist-xyz", {"type": "topic"})
|
|
|
|
|
|
def test_emit_and_event_generator_streams_then_closes():
|
|
async def scenario():
|
|
cid = "sse-test-gen"
|
|
_sse.new_queue(cid)
|
|
_sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}})
|
|
_sse.emit(cid, {"type": "complete", "status": "completed"})
|
|
out: list[str] = []
|
|
async for line in _sse.event_generator(cid):
|
|
out.append(line)
|
|
if len(out) > 12: # safety
|
|
break
|
|
return out
|
|
|
|
blob = "".join(asyncio.run(scenario()))
|
|
assert '"type": "hello"' in blob
|
|
assert '"topic": "impressum"' in blob
|
|
assert '"type": "complete"' in blob
|
|
assert '"type": "stream_close"' in blob
|