""" Tests for Message Bus Tests cover: - Message publishing and subscription - Request-response pattern - Message priority - Local delivery (without Redis) """ import pytest import asyncio from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock import sys sys.path.insert(0, str(__file__).rsplit('/tests/', 1)[0]) from orchestrator.message_bus import ( MessageBus, AgentMessage, MessagePriority, MessageType, ) class TestAgentMessage: """Tests for AgentMessage dataclass""" def test_message_creation_defaults(self): """Message should have default values""" message = AgentMessage( sender="agent-1", receiver="agent-2", message_type="test", payload={"data": "value"} ) assert message.sender == "agent-1" assert message.receiver == "agent-2" assert message.priority == MessagePriority.NORMAL assert message.correlation_id is not None assert message.timestamp is not None def test_message_with_priority(self): """Message should accept custom priority""" message = AgentMessage( sender="alert-agent", receiver="admin", message_type="critical_alert", payload={}, priority=MessagePriority.CRITICAL ) assert message.priority == MessagePriority.CRITICAL def test_message_serialization(self): """Message should serialize and deserialize correctly""" original = AgentMessage( sender="sender", receiver="receiver", message_type="test", payload={"key": "value"}, priority=MessagePriority.HIGH ) data = original.to_dict() restored = AgentMessage.from_dict(data) assert restored.sender == original.sender assert restored.receiver == original.receiver assert restored.message_type == original.message_type assert restored.payload == original.payload assert restored.priority == original.priority assert restored.correlation_id == original.correlation_id class TestMessageBus: """Tests for MessageBus""" @pytest.fixture def bus(self): """Create a message bus without Redis""" return MessageBus( redis_client=None, db_pool=None, namespace="test" ) @pytest.mark.asyncio async def test_start_stop(self, bus): """Bus should start and stop correctly""" await bus.start() assert bus._running is True await bus.stop() assert bus._running is False @pytest.mark.asyncio async def test_subscribe_unsubscribe(self, bus): """Should subscribe and unsubscribe handlers""" handler = AsyncMock(return_value=None) await bus.subscribe("agent-1", handler) assert "agent-1" in bus._handlers await bus.unsubscribe("agent-1") assert "agent-1" not in bus._handlers @pytest.mark.asyncio async def test_local_message_delivery(self, bus): """Messages should be delivered locally without Redis""" received = [] async def handler(message): received.append(message) return None await bus.subscribe("agent-2", handler) message = AgentMessage( sender="agent-1", receiver="agent-2", message_type="test", payload={"data": "hello"} ) await bus.publish(message) # Local delivery is synchronous assert len(received) == 1 assert received[0].payload["data"] == "hello" @pytest.mark.asyncio async def test_request_response(self, bus): """Request should get response from handler""" async def handler(message): return {"result": "processed"} await bus.subscribe("responder", handler) message = AgentMessage( sender="requester", receiver="responder", message_type="request", payload={"query": "test"} ) response = await bus.request(message, timeout=5.0) assert response["result"] == "processed" @pytest.mark.asyncio async def test_request_timeout(self, bus): """Request should timeout if no response""" async def slow_handler(message): await asyncio.sleep(10) return {"result": "too late"} await bus.subscribe("slow-agent", slow_handler) message = AgentMessage( sender="requester", receiver="slow-agent", message_type="request", payload={} ) with pytest.raises(asyncio.TimeoutError): await bus.request(message, timeout=0.1) @pytest.mark.asyncio async def test_broadcast(self, bus): """Broadcast should reach all subscribers""" received_1 = [] received_2 = [] async def handler_1(message): received_1.append(message) return None async def handler_2(message): received_2.append(message) return None await bus.subscribe("agent-1", handler_1) await bus.subscribe("agent-2", handler_2) message = AgentMessage( sender="broadcaster", receiver="*", message_type="announcement", payload={"text": "Hello everyone"} ) await bus.broadcast(message) assert len(received_1) == 1 assert len(received_2) == 1 def test_connected_property(self, bus): """Connected should reflect running state""" assert bus.connected is False def test_subscriber_count(self, bus): """Should track subscriber count""" assert bus.subscriber_count == 0 class TestMessagePriority: """Tests for MessagePriority enum""" def test_priority_ordering(self): """Priorities should have correct ordering""" assert MessagePriority.LOW.value < MessagePriority.NORMAL.value assert MessagePriority.NORMAL.value < MessagePriority.HIGH.value assert MessagePriority.HIGH.value < MessagePriority.CRITICAL.value def test_priority_values(self): """Priorities should have expected values""" assert MessagePriority.LOW.value == 0 assert MessagePriority.NORMAL.value == 1 assert MessagePriority.HIGH.value == 2 assert MessagePriority.CRITICAL.value == 3