""" Heartbeat Monitoring for Breakpilot Agents Provides liveness monitoring for agents with: - Configurable timeout thresholds - Async background monitoring - Callback-based timeout handling - Integration with SessionManager """ import asyncio from typing import Dict, Callable, Optional, Awaitable, Set from datetime import datetime, timezone, timedelta from dataclasses import dataclass, field import logging logger = logging.getLogger(__name__) @dataclass class HeartbeatEntry: """Represents a heartbeat entry for an agent""" session_id: str agent_type: str last_beat: datetime missed_beats: int = 0 class HeartbeatMonitor: """ Monitors agent heartbeats and triggers callbacks on timeout. Usage: monitor = HeartbeatMonitor(timeout_seconds=30) monitor.on_timeout = handle_timeout await monitor.start_monitoring() """ def __init__( self, timeout_seconds: int = 30, check_interval_seconds: int = 5, max_missed_beats: int = 3 ): """ Initialize the heartbeat monitor. Args: timeout_seconds: Time without heartbeat before considered stale check_interval_seconds: How often to check for stale sessions max_missed_beats: Number of missed beats before triggering timeout """ self.sessions: Dict[str, HeartbeatEntry] = {} self.timeout = timedelta(seconds=timeout_seconds) self.check_interval = check_interval_seconds self.max_missed_beats = max_missed_beats self.on_timeout: Optional[Callable[[str, str], Awaitable[None]]] = None self.on_warning: Optional[Callable[[str, int], Awaitable[None]]] = None self._running = False self._task: Optional[asyncio.Task] = None self._paused_sessions: Set[str] = set() async def start_monitoring(self) -> None: """ Starts the background heartbeat monitoring task. This runs indefinitely until stop_monitoring() is called. """ if self._running: logger.warning("Heartbeat monitor already running") return self._running = True self._task = asyncio.create_task(self._monitoring_loop()) logger.info( f"Heartbeat monitor started (timeout={self.timeout.seconds}s, " f"interval={self.check_interval}s)" ) async def stop_monitoring(self) -> None: """Stops the heartbeat monitoring task""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None logger.info("Heartbeat monitor stopped") async def _monitoring_loop(self) -> None: """Main monitoring loop""" while self._running: try: await asyncio.sleep(self.check_interval) await self._check_heartbeats() except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in heartbeat monitoring: {e}") async def _check_heartbeats(self) -> None: """Checks all registered sessions for stale heartbeats""" now = datetime.now(timezone.utc) timed_out = [] for session_id, entry in list(self.sessions.items()): # Skip paused sessions if session_id in self._paused_sessions: continue time_since_beat = now - entry.last_beat if time_since_beat > self.timeout: entry.missed_beats += 1 # Warn on first missed beat if entry.missed_beats == 1 and self.on_warning: await self.on_warning(session_id, entry.missed_beats) logger.warning( f"Session {session_id} missed heartbeat " f"({entry.missed_beats}/{self.max_missed_beats})" ) # Timeout after max missed beats if entry.missed_beats >= self.max_missed_beats: timed_out.append((session_id, entry.agent_type)) # Handle timeouts for session_id, agent_type in timed_out: logger.error( f"Session {session_id} ({agent_type}) timed out after " f"{self.max_missed_beats} missed heartbeats" ) if self.on_timeout: try: await self.on_timeout(session_id, agent_type) except Exception as e: logger.error(f"Error in timeout handler: {e}") # Remove from tracking del self.sessions[session_id] self._paused_sessions.discard(session_id) def register(self, session_id: str, agent_type: str) -> None: """ Registers a session for heartbeat monitoring. Args: session_id: The session ID to monitor agent_type: The type of agent """ self.sessions[session_id] = HeartbeatEntry( session_id=session_id, agent_type=agent_type, last_beat=datetime.now(timezone.utc) ) logger.debug(f"Registered session {session_id} for heartbeat monitoring") def beat(self, session_id: str) -> bool: """ Records a heartbeat for a session. Args: session_id: The session ID Returns: True if the session is registered, False otherwise """ if session_id in self.sessions: self.sessions[session_id].last_beat = datetime.now(timezone.utc) self.sessions[session_id].missed_beats = 0 return True return False def unregister(self, session_id: str) -> bool: """ Unregisters a session from heartbeat monitoring. Args: session_id: The session ID to unregister Returns: True if the session was registered, False otherwise """ self._paused_sessions.discard(session_id) if session_id in self.sessions: del self.sessions[session_id] logger.debug(f"Unregistered session {session_id} from heartbeat monitoring") return True return False def pause(self, session_id: str) -> bool: """ Pauses heartbeat monitoring for a session. Useful when a session is intentionally idle (e.g., waiting for user input). Args: session_id: The session ID to pause Returns: True if the session was registered, False otherwise """ if session_id in self.sessions: self._paused_sessions.add(session_id) logger.debug(f"Paused heartbeat monitoring for session {session_id}") return True return False def resume(self, session_id: str) -> bool: """ Resumes heartbeat monitoring for a paused session. Args: session_id: The session ID to resume Returns: True if the session was paused, False otherwise """ if session_id in self._paused_sessions: self._paused_sessions.discard(session_id) # Reset the heartbeat timer self.beat(session_id) logger.debug(f"Resumed heartbeat monitoring for session {session_id}") return True return False def get_status(self, session_id: str) -> Optional[Dict]: """ Gets the heartbeat status for a session. Args: session_id: The session ID Returns: Status dict or None if not registered """ if session_id not in self.sessions: return None entry = self.sessions[session_id] now = datetime.now(timezone.utc) return { "session_id": session_id, "agent_type": entry.agent_type, "last_beat": entry.last_beat.isoformat(), "seconds_since_beat": (now - entry.last_beat).total_seconds(), "missed_beats": entry.missed_beats, "is_paused": session_id in self._paused_sessions, "is_healthy": entry.missed_beats == 0 } def get_all_status(self) -> Dict[str, Dict]: """ Gets heartbeat status for all registered sessions. Returns: Dict mapping session_id to status dict """ return { session_id: self.get_status(session_id) for session_id in self.sessions } @property def registered_count(self) -> int: """Returns the number of registered sessions""" return len(self.sessions) @property def healthy_count(self) -> int: """Returns the number of healthy sessions (no missed beats)""" return sum( 1 for entry in self.sessions.values() if entry.missed_beats == 0 ) class HeartbeatClient: """ Client-side heartbeat sender for agents. Usage: client = HeartbeatClient(session_id, heartbeat_url) await client.start() # ... agent work ... await client.stop() """ def __init__( self, session_id: str, monitor: Optional[HeartbeatMonitor] = None, interval_seconds: int = 10 ): """ Initialize the heartbeat client. Args: session_id: The session ID to send heartbeats for monitor: Optional local HeartbeatMonitor (for in-process agents) interval_seconds: How often to send heartbeats """ self.session_id = session_id self.monitor = monitor self.interval = interval_seconds self._running = False self._task: Optional[asyncio.Task] = None async def start(self) -> None: """Starts sending heartbeats""" if self._running: return self._running = True self._task = asyncio.create_task(self._heartbeat_loop()) logger.debug(f"Heartbeat client started for session {self.session_id}") async def stop(self) -> None: """Stops sending heartbeats""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None logger.debug(f"Heartbeat client stopped for session {self.session_id}") async def _heartbeat_loop(self) -> None: """Main heartbeat sending loop""" while self._running: try: await self._send_heartbeat() await asyncio.sleep(self.interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error sending heartbeat: {e}") await asyncio.sleep(self.interval) async def _send_heartbeat(self) -> None: """Sends a single heartbeat""" if self.monitor: # Local monitor self.monitor.beat(self.session_id) # Future: Add HTTP-based heartbeat for distributed agents async def __aenter__(self): """Context manager entry""" await self.start() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" await self.stop()