This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/agent-core/tests/test_message_bus.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

225 lines
6.4 KiB
Python

"""
Tests for Message Bus
Tests cover:
- Message publishing and subscription
- Request-response pattern
- Message priority
- Local delivery (without Redis)
"""
import pytest
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import sys
sys.path.insert(0, str(__file__).rsplit('/tests/', 1)[0])
from orchestrator.message_bus import (
MessageBus,
AgentMessage,
MessagePriority,
MessageType,
)
class TestAgentMessage:
"""Tests for AgentMessage dataclass"""
def test_message_creation_defaults(self):
"""Message should have default values"""
message = AgentMessage(
sender="agent-1",
receiver="agent-2",
message_type="test",
payload={"data": "value"}
)
assert message.sender == "agent-1"
assert message.receiver == "agent-2"
assert message.priority == MessagePriority.NORMAL
assert message.correlation_id is not None
assert message.timestamp is not None
def test_message_with_priority(self):
"""Message should accept custom priority"""
message = AgentMessage(
sender="alert-agent",
receiver="admin",
message_type="critical_alert",
payload={},
priority=MessagePriority.CRITICAL
)
assert message.priority == MessagePriority.CRITICAL
def test_message_serialization(self):
"""Message should serialize and deserialize correctly"""
original = AgentMessage(
sender="sender",
receiver="receiver",
message_type="test",
payload={"key": "value"},
priority=MessagePriority.HIGH
)
data = original.to_dict()
restored = AgentMessage.from_dict(data)
assert restored.sender == original.sender
assert restored.receiver == original.receiver
assert restored.message_type == original.message_type
assert restored.payload == original.payload
assert restored.priority == original.priority
assert restored.correlation_id == original.correlation_id
class TestMessageBus:
"""Tests for MessageBus"""
@pytest.fixture
def bus(self):
"""Create a message bus without Redis"""
return MessageBus(
redis_client=None,
db_pool=None,
namespace="test"
)
@pytest.mark.asyncio
async def test_start_stop(self, bus):
"""Bus should start and stop correctly"""
await bus.start()
assert bus._running is True
await bus.stop()
assert bus._running is False
@pytest.mark.asyncio
async def test_subscribe_unsubscribe(self, bus):
"""Should subscribe and unsubscribe handlers"""
handler = AsyncMock(return_value=None)
await bus.subscribe("agent-1", handler)
assert "agent-1" in bus._handlers
await bus.unsubscribe("agent-1")
assert "agent-1" not in bus._handlers
@pytest.mark.asyncio
async def test_local_message_delivery(self, bus):
"""Messages should be delivered locally without Redis"""
received = []
async def handler(message):
received.append(message)
return None
await bus.subscribe("agent-2", handler)
message = AgentMessage(
sender="agent-1",
receiver="agent-2",
message_type="test",
payload={"data": "hello"}
)
await bus.publish(message)
# Local delivery is synchronous
assert len(received) == 1
assert received[0].payload["data"] == "hello"
@pytest.mark.asyncio
async def test_request_response(self, bus):
"""Request should get response from handler"""
async def handler(message):
return {"result": "processed"}
await bus.subscribe("responder", handler)
message = AgentMessage(
sender="requester",
receiver="responder",
message_type="request",
payload={"query": "test"}
)
response = await bus.request(message, timeout=5.0)
assert response["result"] == "processed"
@pytest.mark.asyncio
async def test_request_timeout(self, bus):
"""Request should timeout if no response"""
async def slow_handler(message):
await asyncio.sleep(10)
return {"result": "too late"}
await bus.subscribe("slow-agent", slow_handler)
message = AgentMessage(
sender="requester",
receiver="slow-agent",
message_type="request",
payload={}
)
with pytest.raises(asyncio.TimeoutError):
await bus.request(message, timeout=0.1)
@pytest.mark.asyncio
async def test_broadcast(self, bus):
"""Broadcast should reach all subscribers"""
received_1 = []
received_2 = []
async def handler_1(message):
received_1.append(message)
return None
async def handler_2(message):
received_2.append(message)
return None
await bus.subscribe("agent-1", handler_1)
await bus.subscribe("agent-2", handler_2)
message = AgentMessage(
sender="broadcaster",
receiver="*",
message_type="announcement",
payload={"text": "Hello everyone"}
)
await bus.broadcast(message)
assert len(received_1) == 1
assert len(received_2) == 1
def test_connected_property(self, bus):
"""Connected should reflect running state"""
assert bus.connected is False
def test_subscriber_count(self, bus):
"""Should track subscriber count"""
assert bus.subscriber_count == 0
class TestMessagePriority:
"""Tests for MessagePriority enum"""
def test_priority_ordering(self):
"""Priorities should have correct ordering"""
assert MessagePriority.LOW.value < MessagePriority.NORMAL.value
assert MessagePriority.NORMAL.value < MessagePriority.HIGH.value
assert MessagePriority.HIGH.value < MessagePriority.CRITICAL.value
def test_priority_values(self):
"""Priorities should have expected values"""
assert MessagePriority.LOW.value == 0
assert MessagePriority.NORMAL.value == 1
assert MessagePriority.HIGH.value == 2
assert MessagePriority.CRITICAL.value == 3