Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
225 lines
6.4 KiB
Python
225 lines
6.4 KiB
Python
"""
|
|
Tests for Message Bus
|
|
|
|
Tests cover:
|
|
- Message publishing and subscription
|
|
- Request-response pattern
|
|
- Message priority
|
|
- Local delivery (without Redis)
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import sys
|
|
sys.path.insert(0, str(__file__).rsplit('/tests/', 1)[0])
|
|
|
|
from orchestrator.message_bus import (
|
|
MessageBus,
|
|
AgentMessage,
|
|
MessagePriority,
|
|
MessageType,
|
|
)
|
|
|
|
|
|
class TestAgentMessage:
|
|
"""Tests for AgentMessage dataclass"""
|
|
|
|
def test_message_creation_defaults(self):
|
|
"""Message should have default values"""
|
|
message = AgentMessage(
|
|
sender="agent-1",
|
|
receiver="agent-2",
|
|
message_type="test",
|
|
payload={"data": "value"}
|
|
)
|
|
|
|
assert message.sender == "agent-1"
|
|
assert message.receiver == "agent-2"
|
|
assert message.priority == MessagePriority.NORMAL
|
|
assert message.correlation_id is not None
|
|
assert message.timestamp is not None
|
|
|
|
def test_message_with_priority(self):
|
|
"""Message should accept custom priority"""
|
|
message = AgentMessage(
|
|
sender="alert-agent",
|
|
receiver="admin",
|
|
message_type="critical_alert",
|
|
payload={},
|
|
priority=MessagePriority.CRITICAL
|
|
)
|
|
|
|
assert message.priority == MessagePriority.CRITICAL
|
|
|
|
def test_message_serialization(self):
|
|
"""Message should serialize and deserialize correctly"""
|
|
original = AgentMessage(
|
|
sender="sender",
|
|
receiver="receiver",
|
|
message_type="test",
|
|
payload={"key": "value"},
|
|
priority=MessagePriority.HIGH
|
|
)
|
|
|
|
data = original.to_dict()
|
|
restored = AgentMessage.from_dict(data)
|
|
|
|
assert restored.sender == original.sender
|
|
assert restored.receiver == original.receiver
|
|
assert restored.message_type == original.message_type
|
|
assert restored.payload == original.payload
|
|
assert restored.priority == original.priority
|
|
assert restored.correlation_id == original.correlation_id
|
|
|
|
|
|
class TestMessageBus:
|
|
"""Tests for MessageBus"""
|
|
|
|
@pytest.fixture
|
|
def bus(self):
|
|
"""Create a message bus without Redis"""
|
|
return MessageBus(
|
|
redis_client=None,
|
|
db_pool=None,
|
|
namespace="test"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_stop(self, bus):
|
|
"""Bus should start and stop correctly"""
|
|
await bus.start()
|
|
assert bus._running is True
|
|
|
|
await bus.stop()
|
|
assert bus._running is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscribe_unsubscribe(self, bus):
|
|
"""Should subscribe and unsubscribe handlers"""
|
|
handler = AsyncMock(return_value=None)
|
|
|
|
await bus.subscribe("agent-1", handler)
|
|
assert "agent-1" in bus._handlers
|
|
|
|
await bus.unsubscribe("agent-1")
|
|
assert "agent-1" not in bus._handlers
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_local_message_delivery(self, bus):
|
|
"""Messages should be delivered locally without Redis"""
|
|
received = []
|
|
|
|
async def handler(message):
|
|
received.append(message)
|
|
return None
|
|
|
|
await bus.subscribe("agent-2", handler)
|
|
|
|
message = AgentMessage(
|
|
sender="agent-1",
|
|
receiver="agent-2",
|
|
message_type="test",
|
|
payload={"data": "hello"}
|
|
)
|
|
|
|
await bus.publish(message)
|
|
|
|
# Local delivery is synchronous
|
|
assert len(received) == 1
|
|
assert received[0].payload["data"] == "hello"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_request_response(self, bus):
|
|
"""Request should get response from handler"""
|
|
async def handler(message):
|
|
return {"result": "processed"}
|
|
|
|
await bus.subscribe("responder", handler)
|
|
|
|
message = AgentMessage(
|
|
sender="requester",
|
|
receiver="responder",
|
|
message_type="request",
|
|
payload={"query": "test"}
|
|
)
|
|
|
|
response = await bus.request(message, timeout=5.0)
|
|
|
|
assert response["result"] == "processed"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_request_timeout(self, bus):
|
|
"""Request should timeout if no response"""
|
|
async def slow_handler(message):
|
|
await asyncio.sleep(10)
|
|
return {"result": "too late"}
|
|
|
|
await bus.subscribe("slow-agent", slow_handler)
|
|
|
|
message = AgentMessage(
|
|
sender="requester",
|
|
receiver="slow-agent",
|
|
message_type="request",
|
|
payload={}
|
|
)
|
|
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
await bus.request(message, timeout=0.1)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broadcast(self, bus):
|
|
"""Broadcast should reach all subscribers"""
|
|
received_1 = []
|
|
received_2 = []
|
|
|
|
async def handler_1(message):
|
|
received_1.append(message)
|
|
return None
|
|
|
|
async def handler_2(message):
|
|
received_2.append(message)
|
|
return None
|
|
|
|
await bus.subscribe("agent-1", handler_1)
|
|
await bus.subscribe("agent-2", handler_2)
|
|
|
|
message = AgentMessage(
|
|
sender="broadcaster",
|
|
receiver="*",
|
|
message_type="announcement",
|
|
payload={"text": "Hello everyone"}
|
|
)
|
|
|
|
await bus.broadcast(message)
|
|
|
|
assert len(received_1) == 1
|
|
assert len(received_2) == 1
|
|
|
|
def test_connected_property(self, bus):
|
|
"""Connected should reflect running state"""
|
|
assert bus.connected is False
|
|
|
|
def test_subscriber_count(self, bus):
|
|
"""Should track subscriber count"""
|
|
assert bus.subscriber_count == 0
|
|
|
|
|
|
class TestMessagePriority:
|
|
"""Tests for MessagePriority enum"""
|
|
|
|
def test_priority_ordering(self):
|
|
"""Priorities should have correct ordering"""
|
|
assert MessagePriority.LOW.value < MessagePriority.NORMAL.value
|
|
assert MessagePriority.NORMAL.value < MessagePriority.HIGH.value
|
|
assert MessagePriority.HIGH.value < MessagePriority.CRITICAL.value
|
|
|
|
def test_priority_values(self):
|
|
"""Priorities should have expected values"""
|
|
assert MessagePriority.LOW.value == 0
|
|
assert MessagePriority.NORMAL.value == 1
|
|
assert MessagePriority.HIGH.value == 2
|
|
assert MessagePriority.CRITICAL.value == 3
|