""" Context Management for Breakpilot Agents Provides conversation context with: - Message history with compression - Entity extraction and tracking - Intent history - Context summarization """ from typing import Dict, Any, List, Optional, Callable, Awaitable from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum import json import logging logger = logging.getLogger(__name__) class MessageRole(Enum): """Message roles in a conversation""" SYSTEM = "system" USER = "user" ASSISTANT = "assistant" TOOL = "tool" @dataclass class Message: """Represents a message in a conversation""" role: MessageRole content: str timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return { "role": self.role.value, "content": self.content, "timestamp": self.timestamp.isoformat(), "metadata": self.metadata } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Message": return cls( role=MessageRole(data["role"]), content=data["content"], timestamp=datetime.fromisoformat(data["timestamp"]) if "timestamp" in data else datetime.now(timezone.utc), metadata=data.get("metadata", {}) ) @dataclass class ConversationContext: """ Context for a running conversation. Maintains: - Message history with automatic compression - Extracted entities - Intent history - Conversation summary """ messages: List[Message] = field(default_factory=list) entities: Dict[str, Any] = field(default_factory=dict) intent_history: List[str] = field(default_factory=list) summary: Optional[str] = None max_messages: int = 50 system_prompt: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) def add_message( self, role: MessageRole, content: str, metadata: Optional[Dict[str, Any]] = None ) -> Message: """ Adds a message to the conversation. Args: role: Message role content: Message content metadata: Optional message metadata Returns: The created Message """ message = Message( role=role, content=content, metadata=metadata or {} ) self.messages.append(message) # Compress if needed if len(self.messages) > self.max_messages: self._compress_history() return message def add_user_message( self, content: str, metadata: Optional[Dict[str, Any]] = None ) -> Message: """Convenience method to add a user message""" return self.add_message(MessageRole.USER, content, metadata) def add_assistant_message( self, content: str, metadata: Optional[Dict[str, Any]] = None ) -> Message: """Convenience method to add an assistant message""" return self.add_message(MessageRole.ASSISTANT, content, metadata) def add_system_message( self, content: str, metadata: Optional[Dict[str, Any]] = None ) -> Message: """Convenience method to add a system message""" return self.add_message(MessageRole.SYSTEM, content, metadata) def add_intent(self, intent: str) -> None: """ Records an intent in the history. Args: intent: The detected intent """ self.intent_history.append(intent) # Keep last 20 intents if len(self.intent_history) > 20: self.intent_history = self.intent_history[-20:] def set_entity(self, name: str, value: Any) -> None: """ Sets an entity value. Args: name: Entity name value: Entity value """ self.entities[name] = value def get_entity(self, name: str, default: Any = None) -> Any: """ Gets an entity value. Args: name: Entity name default: Default value if not found Returns: Entity value or default """ return self.entities.get(name, default) def get_last_message(self, role: Optional[MessageRole] = None) -> Optional[Message]: """ Gets the last message, optionally filtered by role. Args: role: Optional role filter Returns: The last matching message or None """ if not self.messages: return None if role is None: return self.messages[-1] for msg in reversed(self.messages): if msg.role == role: return msg return None def get_messages_for_llm(self) -> List[Dict[str, str]]: """ Gets messages formatted for LLM API calls. Returns: List of message dicts with role and content """ result = [] # Add system prompt first if self.system_prompt: result.append({ "role": "system", "content": self.system_prompt }) # Add summary if we have one and history was compressed if self.summary: result.append({ "role": "system", "content": f"Previous conversation summary: {self.summary}" }) # Add recent messages for msg in self.messages: result.append({ "role": msg.role.value, "content": msg.content }) return result def _compress_history(self) -> None: """ Compresses older messages to save context window space. Keeps: - System messages - Last 20 messages - Creates summary of compressed middle messages """ # Keep system messages system_msgs = [m for m in self.messages if m.role == MessageRole.SYSTEM] # Keep last 20 messages recent_msgs = self.messages[-20:] # Middle messages to summarize middle_start = len(system_msgs) middle_end = len(self.messages) - 20 middle_msgs = self.messages[middle_start:middle_end] if middle_msgs: # Create a basic summary (can be enhanced with LLM-based summarization) self.summary = self._create_summary(middle_msgs) # Combine self.messages = system_msgs + recent_msgs logger.debug( f"Compressed conversation: {middle_end - middle_start} messages summarized" ) def _create_summary(self, messages: List[Message]) -> str: """ Creates a summary of messages. This is a basic implementation - can be enhanced with LLM-based summarization. Args: messages: Messages to summarize Returns: Summary string """ # Count message types user_count = sum(1 for m in messages if m.role == MessageRole.USER) assistant_count = sum(1 for m in messages if m.role == MessageRole.ASSISTANT) # Extract key topics (simplified - could use NLP) topics = set() for msg in messages: # Simple keyword extraction words = msg.content.lower().split() # Filter common words keywords = [w for w in words if len(w) > 5][:3] topics.update(keywords) topics_str = ", ".join(list(topics)[:5]) return ( f"Earlier conversation: {user_count} user messages, " f"{assistant_count} assistant responses. " f"Topics discussed: {topics_str}" ) def clear(self) -> None: """Clears all context""" self.messages.clear() self.entities.clear() self.intent_history.clear() self.summary = None def to_dict(self) -> Dict[str, Any]: """Serializes context to dict""" return { "messages": [m.to_dict() for m in self.messages], "entities": self.entities, "intent_history": self.intent_history, "summary": self.summary, "max_messages": self.max_messages, "system_prompt": self.system_prompt, "metadata": self.metadata } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ConversationContext": """Deserializes context from dict""" ctx = cls( messages=[Message.from_dict(m) for m in data.get("messages", [])], entities=data.get("entities", {}), intent_history=data.get("intent_history", []), summary=data.get("summary"), max_messages=data.get("max_messages", 50), system_prompt=data.get("system_prompt"), metadata=data.get("metadata", {}) ) return ctx class ContextManager: """ Manages conversation contexts for multiple sessions. Provides: - Context creation and retrieval - Persistence to Valkey/PostgreSQL - Context sharing between agents """ def __init__( self, redis_client=None, db_pool=None, namespace: str = "breakpilot" ): """ Initialize the context manager. Args: redis_client: Async Redis/Valkey client db_pool: Async PostgreSQL connection pool namespace: Key namespace """ self.redis = redis_client self.db_pool = db_pool self.namespace = namespace self._contexts: Dict[str, ConversationContext] = {} self._summarize_callback: Optional[Callable[[List[Message]], Awaitable[str]]] = None def _redis_key(self, session_id: str) -> str: """Generate Redis key for context""" return f"{self.namespace}:context:{session_id}" def create_context( self, session_id: str, system_prompt: Optional[str] = None, max_messages: int = 50 ) -> ConversationContext: """ Creates a new conversation context. Args: session_id: Session ID for this context system_prompt: Optional system prompt max_messages: Maximum messages before compression Returns: The created context """ context = ConversationContext( max_messages=max_messages, system_prompt=system_prompt ) self._contexts[session_id] = context return context async def get_context(self, session_id: str) -> Optional[ConversationContext]: """ Gets a context by session ID. Args: session_id: The session ID Returns: ConversationContext or None """ # Check local cache if session_id in self._contexts: return self._contexts[session_id] # Try Valkey context = await self._get_from_valkey(session_id) if context: self._contexts[session_id] = context return context return None async def save_context(self, session_id: str) -> None: """ Saves a context to persistent storage. Args: session_id: The session ID """ if session_id not in self._contexts: return context = self._contexts[session_id] await self._cache_in_valkey(session_id, context) async def delete_context(self, session_id: str) -> bool: """ Deletes a context. Args: session_id: The session ID Returns: True if deleted """ self._contexts.pop(session_id, None) if self.redis: await self.redis.delete(self._redis_key(session_id)) return True def set_summarize_callback( self, callback: Callable[[List[Message]], Awaitable[str]] ) -> None: """ Sets a callback for LLM-based summarization. Args: callback: Async function that takes messages and returns summary """ self._summarize_callback = callback async def add_message( self, session_id: str, role: MessageRole, content: str, metadata: Optional[Dict[str, Any]] = None ) -> Optional[Message]: """ Adds a message to a session's context. Args: session_id: The session ID role: Message role content: Message content metadata: Optional metadata Returns: The created message or None if context not found """ context = await self.get_context(session_id) if not context: return None message = context.add_message(role, content, metadata) # Save after each message await self.save_context(session_id) return message async def get_messages_for_llm( self, session_id: str ) -> Optional[List[Dict[str, str]]]: """ Gets formatted messages for LLM API call. Args: session_id: The session ID Returns: List of message dicts or None """ context = await self.get_context(session_id) if not context: return None return context.get_messages_for_llm() async def _cache_in_valkey( self, session_id: str, context: ConversationContext ) -> None: """Caches context in Valkey""" if not self.redis: return try: # 24 hour TTL for contexts await self.redis.setex( self._redis_key(session_id), 86400, json.dumps(context.to_dict()) ) except Exception as e: logger.warning(f"Failed to cache context in Valkey: {e}") async def _get_from_valkey( self, session_id: str ) -> Optional[ConversationContext]: """Retrieves context from Valkey""" if not self.redis: return None try: data = await self.redis.get(self._redis_key(session_id)) if data: return ConversationContext.from_dict(json.loads(data)) except Exception as e: logger.warning(f"Failed to get context from Valkey: {e}") return None