Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
521 lines
14 KiB
Python
521 lines
14 KiB
Python
"""
|
|
Context Management for Breakpilot Agents
|
|
|
|
Provides conversation context with:
|
|
- Message history with compression
|
|
- Entity extraction and tracking
|
|
- Intent history
|
|
- Context summarization
|
|
"""
|
|
|
|
from typing import Dict, Any, List, Optional, Callable, Awaitable
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
import json
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessageRole(Enum):
|
|
"""Message roles in a conversation"""
|
|
SYSTEM = "system"
|
|
USER = "user"
|
|
ASSISTANT = "assistant"
|
|
TOOL = "tool"
|
|
|
|
|
|
@dataclass
|
|
class Message:
|
|
"""Represents a message in a conversation"""
|
|
role: MessageRole
|
|
content: str
|
|
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"role": self.role.value,
|
|
"content": self.content,
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"metadata": self.metadata
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "Message":
|
|
return cls(
|
|
role=MessageRole(data["role"]),
|
|
content=data["content"],
|
|
timestamp=datetime.fromisoformat(data["timestamp"]) if "timestamp" in data else datetime.now(timezone.utc),
|
|
metadata=data.get("metadata", {})
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class ConversationContext:
|
|
"""
|
|
Context for a running conversation.
|
|
|
|
Maintains:
|
|
- Message history with automatic compression
|
|
- Extracted entities
|
|
- Intent history
|
|
- Conversation summary
|
|
"""
|
|
messages: List[Message] = field(default_factory=list)
|
|
entities: Dict[str, Any] = field(default_factory=dict)
|
|
intent_history: List[str] = field(default_factory=list)
|
|
summary: Optional[str] = None
|
|
max_messages: int = 50
|
|
system_prompt: Optional[str] = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def add_message(
|
|
self,
|
|
role: MessageRole,
|
|
content: str,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Message:
|
|
"""
|
|
Adds a message to the conversation.
|
|
|
|
Args:
|
|
role: Message role
|
|
content: Message content
|
|
metadata: Optional message metadata
|
|
|
|
Returns:
|
|
The created Message
|
|
"""
|
|
message = Message(
|
|
role=role,
|
|
content=content,
|
|
metadata=metadata or {}
|
|
)
|
|
self.messages.append(message)
|
|
|
|
# Compress if needed
|
|
if len(self.messages) > self.max_messages:
|
|
self._compress_history()
|
|
|
|
return message
|
|
|
|
def add_user_message(
|
|
self,
|
|
content: str,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Message:
|
|
"""Convenience method to add a user message"""
|
|
return self.add_message(MessageRole.USER, content, metadata)
|
|
|
|
def add_assistant_message(
|
|
self,
|
|
content: str,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Message:
|
|
"""Convenience method to add an assistant message"""
|
|
return self.add_message(MessageRole.ASSISTANT, content, metadata)
|
|
|
|
def add_system_message(
|
|
self,
|
|
content: str,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Message:
|
|
"""Convenience method to add a system message"""
|
|
return self.add_message(MessageRole.SYSTEM, content, metadata)
|
|
|
|
def add_intent(self, intent: str) -> None:
|
|
"""
|
|
Records an intent in the history.
|
|
|
|
Args:
|
|
intent: The detected intent
|
|
"""
|
|
self.intent_history.append(intent)
|
|
# Keep last 20 intents
|
|
if len(self.intent_history) > 20:
|
|
self.intent_history = self.intent_history[-20:]
|
|
|
|
def set_entity(self, name: str, value: Any) -> None:
|
|
"""
|
|
Sets an entity value.
|
|
|
|
Args:
|
|
name: Entity name
|
|
value: Entity value
|
|
"""
|
|
self.entities[name] = value
|
|
|
|
def get_entity(self, name: str, default: Any = None) -> Any:
|
|
"""
|
|
Gets an entity value.
|
|
|
|
Args:
|
|
name: Entity name
|
|
default: Default value if not found
|
|
|
|
Returns:
|
|
Entity value or default
|
|
"""
|
|
return self.entities.get(name, default)
|
|
|
|
def get_last_message(self, role: Optional[MessageRole] = None) -> Optional[Message]:
|
|
"""
|
|
Gets the last message, optionally filtered by role.
|
|
|
|
Args:
|
|
role: Optional role filter
|
|
|
|
Returns:
|
|
The last matching message or None
|
|
"""
|
|
if not self.messages:
|
|
return None
|
|
|
|
if role is None:
|
|
return self.messages[-1]
|
|
|
|
for msg in reversed(self.messages):
|
|
if msg.role == role:
|
|
return msg
|
|
|
|
return None
|
|
|
|
def get_messages_for_llm(self) -> List[Dict[str, str]]:
|
|
"""
|
|
Gets messages formatted for LLM API calls.
|
|
|
|
Returns:
|
|
List of message dicts with role and content
|
|
"""
|
|
result = []
|
|
|
|
# Add system prompt first
|
|
if self.system_prompt:
|
|
result.append({
|
|
"role": "system",
|
|
"content": self.system_prompt
|
|
})
|
|
|
|
# Add summary if we have one and history was compressed
|
|
if self.summary:
|
|
result.append({
|
|
"role": "system",
|
|
"content": f"Previous conversation summary: {self.summary}"
|
|
})
|
|
|
|
# Add recent messages
|
|
for msg in self.messages:
|
|
result.append({
|
|
"role": msg.role.value,
|
|
"content": msg.content
|
|
})
|
|
|
|
return result
|
|
|
|
def _compress_history(self) -> None:
|
|
"""
|
|
Compresses older messages to save context window space.
|
|
|
|
Keeps:
|
|
- System messages
|
|
- Last 20 messages
|
|
- Creates summary of compressed middle messages
|
|
"""
|
|
# Keep system messages
|
|
system_msgs = [m for m in self.messages if m.role == MessageRole.SYSTEM]
|
|
|
|
# Keep last 20 messages
|
|
recent_msgs = self.messages[-20:]
|
|
|
|
# Middle messages to summarize
|
|
middle_start = len(system_msgs)
|
|
middle_end = len(self.messages) - 20
|
|
middle_msgs = self.messages[middle_start:middle_end]
|
|
|
|
if middle_msgs:
|
|
# Create a basic summary (can be enhanced with LLM-based summarization)
|
|
self.summary = self._create_summary(middle_msgs)
|
|
|
|
# Combine
|
|
self.messages = system_msgs + recent_msgs
|
|
|
|
logger.debug(
|
|
f"Compressed conversation: {middle_end - middle_start} messages summarized"
|
|
)
|
|
|
|
def _create_summary(self, messages: List[Message]) -> str:
|
|
"""
|
|
Creates a summary of messages.
|
|
|
|
This is a basic implementation - can be enhanced with LLM-based summarization.
|
|
|
|
Args:
|
|
messages: Messages to summarize
|
|
|
|
Returns:
|
|
Summary string
|
|
"""
|
|
# Count message types
|
|
user_count = sum(1 for m in messages if m.role == MessageRole.USER)
|
|
assistant_count = sum(1 for m in messages if m.role == MessageRole.ASSISTANT)
|
|
|
|
# Extract key topics (simplified - could use NLP)
|
|
topics = set()
|
|
for msg in messages:
|
|
# Simple keyword extraction
|
|
words = msg.content.lower().split()
|
|
# Filter common words
|
|
keywords = [w for w in words if len(w) > 5][:3]
|
|
topics.update(keywords)
|
|
|
|
topics_str = ", ".join(list(topics)[:5])
|
|
|
|
return (
|
|
f"Earlier conversation: {user_count} user messages, "
|
|
f"{assistant_count} assistant responses. "
|
|
f"Topics discussed: {topics_str}"
|
|
)
|
|
|
|
def clear(self) -> None:
|
|
"""Clears all context"""
|
|
self.messages.clear()
|
|
self.entities.clear()
|
|
self.intent_history.clear()
|
|
self.summary = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Serializes context to dict"""
|
|
return {
|
|
"messages": [m.to_dict() for m in self.messages],
|
|
"entities": self.entities,
|
|
"intent_history": self.intent_history,
|
|
"summary": self.summary,
|
|
"max_messages": self.max_messages,
|
|
"system_prompt": self.system_prompt,
|
|
"metadata": self.metadata
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "ConversationContext":
|
|
"""Deserializes context from dict"""
|
|
ctx = cls(
|
|
messages=[Message.from_dict(m) for m in data.get("messages", [])],
|
|
entities=data.get("entities", {}),
|
|
intent_history=data.get("intent_history", []),
|
|
summary=data.get("summary"),
|
|
max_messages=data.get("max_messages", 50),
|
|
system_prompt=data.get("system_prompt"),
|
|
metadata=data.get("metadata", {})
|
|
)
|
|
return ctx
|
|
|
|
|
|
class ContextManager:
|
|
"""
|
|
Manages conversation contexts for multiple sessions.
|
|
|
|
Provides:
|
|
- Context creation and retrieval
|
|
- Persistence to Valkey/PostgreSQL
|
|
- Context sharing between agents
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
redis_client=None,
|
|
db_pool=None,
|
|
namespace: str = "breakpilot"
|
|
):
|
|
"""
|
|
Initialize the context manager.
|
|
|
|
Args:
|
|
redis_client: Async Redis/Valkey client
|
|
db_pool: Async PostgreSQL connection pool
|
|
namespace: Key namespace
|
|
"""
|
|
self.redis = redis_client
|
|
self.db_pool = db_pool
|
|
self.namespace = namespace
|
|
self._contexts: Dict[str, ConversationContext] = {}
|
|
self._summarize_callback: Optional[Callable[[List[Message]], Awaitable[str]]] = None
|
|
|
|
def _redis_key(self, session_id: str) -> str:
|
|
"""Generate Redis key for context"""
|
|
return f"{self.namespace}:context:{session_id}"
|
|
|
|
def create_context(
|
|
self,
|
|
session_id: str,
|
|
system_prompt: Optional[str] = None,
|
|
max_messages: int = 50
|
|
) -> ConversationContext:
|
|
"""
|
|
Creates a new conversation context.
|
|
|
|
Args:
|
|
session_id: Session ID for this context
|
|
system_prompt: Optional system prompt
|
|
max_messages: Maximum messages before compression
|
|
|
|
Returns:
|
|
The created context
|
|
"""
|
|
context = ConversationContext(
|
|
max_messages=max_messages,
|
|
system_prompt=system_prompt
|
|
)
|
|
self._contexts[session_id] = context
|
|
return context
|
|
|
|
async def get_context(self, session_id: str) -> Optional[ConversationContext]:
|
|
"""
|
|
Gets a context by session ID.
|
|
|
|
Args:
|
|
session_id: The session ID
|
|
|
|
Returns:
|
|
ConversationContext or None
|
|
"""
|
|
# Check local cache
|
|
if session_id in self._contexts:
|
|
return self._contexts[session_id]
|
|
|
|
# Try Valkey
|
|
context = await self._get_from_valkey(session_id)
|
|
if context:
|
|
self._contexts[session_id] = context
|
|
return context
|
|
|
|
return None
|
|
|
|
async def save_context(self, session_id: str) -> None:
|
|
"""
|
|
Saves a context to persistent storage.
|
|
|
|
Args:
|
|
session_id: The session ID
|
|
"""
|
|
if session_id not in self._contexts:
|
|
return
|
|
|
|
context = self._contexts[session_id]
|
|
await self._cache_in_valkey(session_id, context)
|
|
|
|
async def delete_context(self, session_id: str) -> bool:
|
|
"""
|
|
Deletes a context.
|
|
|
|
Args:
|
|
session_id: The session ID
|
|
|
|
Returns:
|
|
True if deleted
|
|
"""
|
|
self._contexts.pop(session_id, None)
|
|
|
|
if self.redis:
|
|
await self.redis.delete(self._redis_key(session_id))
|
|
|
|
return True
|
|
|
|
def set_summarize_callback(
|
|
self,
|
|
callback: Callable[[List[Message]], Awaitable[str]]
|
|
) -> None:
|
|
"""
|
|
Sets a callback for LLM-based summarization.
|
|
|
|
Args:
|
|
callback: Async function that takes messages and returns summary
|
|
"""
|
|
self._summarize_callback = callback
|
|
|
|
async def add_message(
|
|
self,
|
|
session_id: str,
|
|
role: MessageRole,
|
|
content: str,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> Optional[Message]:
|
|
"""
|
|
Adds a message to a session's context.
|
|
|
|
Args:
|
|
session_id: The session ID
|
|
role: Message role
|
|
content: Message content
|
|
metadata: Optional metadata
|
|
|
|
Returns:
|
|
The created message or None if context not found
|
|
"""
|
|
context = await self.get_context(session_id)
|
|
if not context:
|
|
return None
|
|
|
|
message = context.add_message(role, content, metadata)
|
|
|
|
# Save after each message
|
|
await self.save_context(session_id)
|
|
|
|
return message
|
|
|
|
async def get_messages_for_llm(
|
|
self,
|
|
session_id: str
|
|
) -> Optional[List[Dict[str, str]]]:
|
|
"""
|
|
Gets formatted messages for LLM API call.
|
|
|
|
Args:
|
|
session_id: The session ID
|
|
|
|
Returns:
|
|
List of message dicts or None
|
|
"""
|
|
context = await self.get_context(session_id)
|
|
if not context:
|
|
return None
|
|
|
|
return context.get_messages_for_llm()
|
|
|
|
async def _cache_in_valkey(
|
|
self,
|
|
session_id: str,
|
|
context: ConversationContext
|
|
) -> None:
|
|
"""Caches context in Valkey"""
|
|
if not self.redis:
|
|
return
|
|
|
|
try:
|
|
# 24 hour TTL for contexts
|
|
await self.redis.setex(
|
|
self._redis_key(session_id),
|
|
86400,
|
|
json.dumps(context.to_dict())
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cache context in Valkey: {e}")
|
|
|
|
async def _get_from_valkey(
|
|
self,
|
|
session_id: str
|
|
) -> Optional[ConversationContext]:
|
|
"""Retrieves context from Valkey"""
|
|
if not self.redis:
|
|
return None
|
|
|
|
try:
|
|
data = await self.redis.get(self._redis_key(session_id))
|
|
if data:
|
|
return ConversationContext.from_dict(json.loads(data))
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get context from Valkey: {e}")
|
|
|
|
return None
|