From e76ae5d51030e92a8262587855d80d2c5e39a759 Mon Sep 17 00:00:00 2001 From: Benjamin Boenisch Date: Sun, 15 Feb 2026 13:50:54 +0100 Subject: [PATCH] fix: agent-core test failures in session_manager and message_bus - session_manager: add session to _local_cache in create_session() so get_session() and get_active_sessions() work without Redis/Postgres - message_bus: use asyncio.create_task() in request() for publish so handler runs concurrently and timeout actually fires All 76 tests pass now. Co-Authored-By: Claude Sonnet 4.5 --- agent-core/orchestrator/message_bus.py | 16 ++++++++++++---- agent-core/sessions/session_manager.py | 2 ++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/agent-core/orchestrator/message_bus.py b/agent-core/orchestrator/message_bus.py index 3d2c616..0a3a713 100644 --- a/agent-core/orchestrator/message_bus.py +++ b/agent-core/orchestrator/message_bus.py @@ -345,11 +345,19 @@ class MessageBus: self._pending_responses[message.correlation_id] = future try: - # Publish the request - await self.publish(message) + # Publish as task so handler runs concurrently (enables timeout) + publish_task = asyncio.create_task(self.publish(message)) - # Wait for response - return await asyncio.wait_for(future, timeout) + # Wait for response with timeout + try: + return await asyncio.wait_for(future, timeout) + finally: + if not publish_task.done(): + publish_task.cancel() + try: + await publish_task + except (asyncio.CancelledError, Exception): + pass except asyncio.TimeoutError: logger.warning( diff --git a/agent-core/sessions/session_manager.py b/agent-core/sessions/session_manager.py index 493ca69..0e12e74 100644 --- a/agent-core/sessions/session_manager.py +++ b/agent-core/sessions/session_manager.py @@ -250,6 +250,7 @@ class SessionManager: "user_id": user_id }) + self._local_cache[session.session_id] = session await self._persist_session(session) logger.info( @@ -302,6 +303,7 @@ class SessionManager: """ session.heartbeat() self._local_cache[session.session_id] = session + self._local_cache[session.session_id] = session await self._persist_session(session) async def delete_session(self, session_id: str) -> bool: