""" Enhanced Task Orchestrator - Multi-Agent Integration Extends the existing TaskOrchestrator with Multi-Agent support: - Session management with checkpoints - Message bus integration for inter-agent communication - Quality judge integration via BQAS - Heartbeat-based liveness Split into: - enhanced_orchestrator_session.py: Session lifecycle (create/end/recover) - enhanced_task_orchestrator.py (this file): Main orchestrator class """ import structlog import asyncio from typing import Optional, Dict, Any from datetime import datetime from services.task_orchestrator import TaskOrchestrator, Intent from models.task import Task, TaskState # Import agent-core components import sys sys.path.insert(0, '/Users/benjaminadmin/Projekte/breakpilot-pwa/agent-core') from sessions.session_manager import SessionManager, AgentSession, SessionState from sessions.heartbeat import HeartbeatMonitor, HeartbeatClient from brain.memory_store import MemoryStore from brain.context_manager import ContextManager, MessageRole from orchestrator.message_bus import MessageBus, AgentMessage, MessagePriority from orchestrator.task_router import TaskRouter, RoutingStrategy from services.enhanced_orchestrator_session import ( create_session as _create_session, end_session as _end_session, recover_session as _recover_session, ) logger = structlog.get_logger(__name__) class EnhancedTaskOrchestrator(TaskOrchestrator): """ Enhanced TaskOrchestrator with Multi-Agent support. Extends the existing TaskOrchestrator to integrate with: - Session management for persistence and recovery - Message bus for inter-agent communication - Quality judge for response validation - Memory store for long-term learning """ def __init__( self, redis_client=None, db_pool=None, namespace: str = "breakpilot" ): super().__init__() self.session_manager = SessionManager( redis_client=redis_client, db_pool=db_pool, namespace=namespace ) self.memory_store = MemoryStore( redis_client=redis_client, db_pool=db_pool, namespace=namespace ) self.context_manager = ContextManager( redis_client=redis_client, db_pool=db_pool, namespace=namespace ) self.message_bus = MessageBus( redis_client=redis_client, db_pool=db_pool, namespace=namespace ) self.heartbeat = HeartbeatMonitor( timeout_seconds=30, check_interval_seconds=5, max_missed_beats=3 ) self.task_router = TaskRouter() self._voice_sessions: Dict[str, AgentSession] = {} self._heartbeat_clients: Dict[str, HeartbeatClient] = {} logger.info("Enhanced TaskOrchestrator initialized with agent-core") async def start(self) -> None: """Starts the enhanced orchestrator""" await self.message_bus.start() await self.heartbeat.start_monitoring() await self.message_bus.subscribe("voice-orchestrator", self._handle_agent_message) logger.info("Enhanced TaskOrchestrator started") async def stop(self) -> None: """Stops the enhanced orchestrator""" for client in self._heartbeat_clients.values(): await client.stop() self._heartbeat_clients.clear() await self.heartbeat.stop_monitoring() await self.message_bus.stop() logger.info("Enhanced TaskOrchestrator stopped") async def create_session( self, voice_session_id: str, user_id: str = "", metadata: Optional[Dict[str, Any]] = None ) -> AgentSession: return await _create_session( self.session_manager, self.context_manager, self.heartbeat, self._voice_sessions, self._heartbeat_clients, voice_session_id, user_id, metadata, self._get_system_prompt(), ) async def get_session(self, voice_session_id: str) -> Optional[AgentSession]: return self._voice_sessions.get(voice_session_id) async def end_session(self, voice_session_id: str) -> None: await _end_session( self.session_manager, self.heartbeat, self._voice_sessions, self._heartbeat_clients, voice_session_id, ) async def queue_task(self, task: Task) -> None: """Queue a task with session checkpointing.""" session = self._voice_sessions.get(task.session_id) if session: session.checkpoint("task_queued", { "task_id": task.id, "task_type": task.type.value, "parameters": task.parameters }) await self.session_manager.update_session(session) await super().queue_task(task) async def process_task(self, task: Task) -> None: """Process a task with enhanced routing and quality checks.""" session = self._voice_sessions.get(task.session_id) if session: session.checkpoint("task_processing", {"task_id": task.id}) if self._needs_specialized_agent(task): await self._route_to_agent(task, session) else: await super().process_task(task) if task.result_ref and self._needs_quality_check(task): await self._run_quality_check(task, session) if task.state == TaskState.READY and task.result_ref: await self._store_task_result(task) if session: session.checkpoint("task_completed", { "task_id": task.id, "state": task.state.value }) await self.session_manager.update_session(session) def _needs_specialized_agent(self, task: Task) -> bool: from models.task import TaskType return task.type in [TaskType.PARENT_LETTER, TaskType.FEEDBACK_SUGGEST] def _needs_quality_check(self, task: Task) -> bool: from models.task import TaskType return task.type in [ TaskType.PARENT_LETTER, TaskType.CLASS_MESSAGE, TaskType.FEEDBACK_SUGGEST, TaskType.WORKSHEET_GENERATE, ] async def _route_to_agent(self, task: Task, session: Optional[AgentSession]) -> None: """Routes a task to a specialized agent""" intent = f"task_{task.type.value}" routing_result = await self.task_router.route( intent=intent, context={"task": task.parameters}, strategy=RoutingStrategy.LEAST_LOADED ) if not routing_result.success: logger.warning( "No agent available for task, using local processing", task_id=task.id[:8], reason=routing_result.reason ) await super().process_task(task) return try: response = await self.message_bus.request( AgentMessage( sender="voice-orchestrator", receiver=routing_result.agent_id, message_type=f"process_{task.type.value}", payload={ "task_id": task.id, "task_type": task.type.value, "parameters": task.parameters, "session_id": session.session_id if session else None }, priority=MessagePriority.NORMAL ), timeout=30.0 ) task.result_ref = response.get("result", "") task.transition_to(TaskState.READY, "agent_processed") except asyncio.TimeoutError: logger.error( "Agent timeout, falling back to local", task_id=task.id[:8], agent=routing_result.agent_id ) await super().process_task(task) async def _run_quality_check(self, task: Task, session: Optional[AgentSession]) -> None: """Runs quality check on task result via quality judge""" try: response = await self.message_bus.request( AgentMessage( sender="voice-orchestrator", receiver="quality-judge", message_type="evaluate_response", payload={ "task_id": task.id, "task_type": task.type.value, "response": task.result_ref, "context": task.parameters }, priority=MessagePriority.NORMAL ), timeout=10.0 ) quality_score = response.get("composite_score", 0) if quality_score < 60: task.error_message = f"Quality check failed: {quality_score}" logger.warning("Task failed quality check", task_id=task.id[:8], score=quality_score) except asyncio.TimeoutError: logger.warning("Quality check timeout", task_id=task.id[:8]) async def _store_task_result(self, task: Task) -> None: """Stores task result in memory for learning""" await self.memory_store.remember( key=f"task:{task.type.value}:{task.id}", value={ "result": task.result_ref, "parameters": task.parameters, "completed_at": datetime.utcnow().isoformat() }, agent_id="voice-orchestrator", ttl_days=30 ) async def _handle_agent_message(self, message: AgentMessage) -> Optional[Dict[str, Any]]: """Handles incoming messages from other agents""" logger.debug("Received agent message", sender=message.sender, type=message.message_type) if message.message_type == "task_status_update": task_id = message.payload.get("task_id") if task_id in self._tasks: task = self._tasks[task_id] new_state = message.payload.get("state") if new_state: task.transition_to(TaskState(new_state), "agent_update") return None def _get_system_prompt(self) -> str: return """Du bist ein hilfreicher Assistent fuer Lehrer in der Breakpilot-App. Deine Aufgaben: - Hilf beim Erstellen von Arbeitsblaettern - Unterstuetze bei der Korrektur - Erstelle Elternbriefe und Klassennachrichten - Dokumentiere Beobachtungen und Erinnerungen Halte dich kurz und praezise. Nutze einfache, klare Sprache. Bei Unklarheiten frage nach.""" async def recover_session( self, voice_session_id: str, session_id: str ) -> Optional[AgentSession]: return await _recover_session( self.session_manager, self.heartbeat, self._voice_sessions, self._heartbeat_clients, self._tasks, self.process_task, voice_session_id, session_id, )