Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m55s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 18s
- Voice-Service von Core nach Lehrer verschoben (bp-lehrer-voice-service) - 4 Jitsi-Services + 2 Synapse-Services in docker-compose.yml aufgenommen - Camunda komplett gelöscht: workflow pages, workflow-config.ts, bpmn-js deps - CAMUNDA_URL aus backend-lehrer environment entfernt - Sidebar: Kategorie "Compliance SDK" + "Katalogverwaltung" entfernt - Sidebar: Neue Kategorie "Kommunikation" mit Video & Chat, Voice Service, Alerts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
383 lines
13 KiB
Python
383 lines
13 KiB
Python
"""
|
|
Task Orchestrator - Task State Machine
|
|
Manages task lifecycle and routes to Breakpilot modules
|
|
|
|
The TaskOrchestrator is the agent orchestration layer that:
|
|
1. Receives intents from voice input
|
|
2. Creates and manages tasks
|
|
3. Routes to appropriate Breakpilot modules
|
|
4. Maintains conversation context
|
|
5. Handles follow-up queries
|
|
|
|
Note: This is a safe, internal task router with no shell access,
|
|
no email capabilities, and no external API access beyond internal services.
|
|
"""
|
|
import structlog
|
|
import httpx
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
|
|
from config import settings
|
|
from models.task import Task, TaskState, TaskType, is_valid_transition
|
|
from models.session import TranscriptMessage
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
class Intent:
|
|
"""Detected intent from voice input."""
|
|
|
|
def __init__(
|
|
self,
|
|
type: TaskType,
|
|
confidence: float,
|
|
parameters: Dict[str, Any],
|
|
is_actionable: bool = True,
|
|
):
|
|
self.type = type
|
|
self.confidence = confidence
|
|
self.parameters = parameters
|
|
self.is_actionable = is_actionable
|
|
|
|
|
|
class TaskOrchestrator:
|
|
"""
|
|
Task orchestration and state machine management.
|
|
|
|
Handles the full lifecycle of voice-initiated tasks:
|
|
1. Intent -> Task creation
|
|
2. Task queuing and execution
|
|
3. Result handling
|
|
4. Follow-up context
|
|
|
|
Security: This orchestrator only routes to internal Breakpilot services
|
|
via HTTP. It has NO access to shell commands, emails, calendars, or
|
|
external APIs.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._tasks: Dict[str, Task] = {}
|
|
self._session_tasks: Dict[str, List[str]] = {} # session_id -> task_ids
|
|
self._http_client: Optional[httpx.AsyncClient] = None
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
"""Get or create HTTP client."""
|
|
if self._http_client is None:
|
|
self._http_client = httpx.AsyncClient(timeout=30.0)
|
|
return self._http_client
|
|
|
|
async def queue_task(self, task: Task):
|
|
"""
|
|
Queue a task for processing.
|
|
Transitions from DRAFT to QUEUED.
|
|
"""
|
|
if task.state != TaskState.DRAFT:
|
|
logger.warning("Task not in DRAFT state", task_id=task.id[:8])
|
|
return
|
|
|
|
task.transition_to(TaskState.QUEUED, "queued_for_processing")
|
|
|
|
# Store task
|
|
self._tasks[task.id] = task
|
|
|
|
# Add to session tasks
|
|
if task.session_id not in self._session_tasks:
|
|
self._session_tasks[task.session_id] = []
|
|
self._session_tasks[task.session_id].append(task.id)
|
|
|
|
logger.info(
|
|
"Task queued",
|
|
task_id=task.id[:8],
|
|
type=task.type.value,
|
|
)
|
|
|
|
# Auto-process certain task types
|
|
auto_process_types = [
|
|
TaskType.STUDENT_OBSERVATION,
|
|
TaskType.REMINDER,
|
|
TaskType.HOMEWORK_CHECK,
|
|
]
|
|
|
|
if task.type in auto_process_types:
|
|
await self.process_task(task)
|
|
|
|
async def process_task(self, task: Task):
|
|
"""
|
|
Process a queued task.
|
|
Routes to appropriate Breakpilot module.
|
|
"""
|
|
if task.state != TaskState.QUEUED:
|
|
logger.warning("Task not in QUEUED state", task_id=task.id[:8])
|
|
return
|
|
|
|
task.transition_to(TaskState.RUNNING, "processing_started")
|
|
|
|
try:
|
|
# Route to appropriate handler
|
|
result = await self._route_task(task)
|
|
|
|
# Store result
|
|
task.result_ref = result
|
|
|
|
# Transition to READY
|
|
task.transition_to(TaskState.READY, "processing_complete")
|
|
|
|
logger.info(
|
|
"Task processed",
|
|
task_id=task.id[:8],
|
|
type=task.type.value,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Task processing failed", task_id=task.id[:8], error=str(e))
|
|
task.error_message = str(e)
|
|
task.transition_to(TaskState.READY, "processing_failed")
|
|
|
|
async def _route_task(self, task: Task) -> str:
|
|
"""
|
|
Route task to appropriate Breakpilot module.
|
|
"""
|
|
client = await self._get_client()
|
|
|
|
# Task type to endpoint mapping
|
|
routes = {
|
|
# Worksheet generation
|
|
TaskType.WORKSHEET_GENERATE: f"{settings.klausur_service_url}/api/v1/worksheets/generate",
|
|
TaskType.WORKSHEET_DIFFERENTIATE: f"{settings.klausur_service_url}/api/v1/worksheets/differentiate",
|
|
|
|
# Quick activities
|
|
TaskType.QUICK_ACTIVITY: f"{settings.klausur_service_url}/api/v1/activities/generate",
|
|
TaskType.QUIZ_GENERATE: f"{settings.klausur_service_url}/api/v1/quizzes/generate",
|
|
|
|
# Korrektur assistance
|
|
TaskType.OPERATOR_CHECKLIST: f"{settings.klausur_service_url}/api/v1/corrections/operators",
|
|
TaskType.EH_PASSAGE: f"{settings.klausur_service_url}/api/v1/corrections/eh-passage",
|
|
TaskType.FEEDBACK_SUGGEST: f"{settings.klausur_service_url}/api/v1/corrections/feedback",
|
|
}
|
|
|
|
# Check if this task type needs API routing
|
|
if task.type in routes:
|
|
try:
|
|
response = await client.post(
|
|
routes[task.type],
|
|
json={
|
|
"task_id": task.id,
|
|
"namespace_id": task.namespace_id,
|
|
"parameters": task.parameters,
|
|
},
|
|
timeout=settings.ollama_timeout,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("result", "")
|
|
except httpx.HTTPError as e:
|
|
logger.error("API call failed", url=routes[task.type], error=str(e))
|
|
raise
|
|
|
|
# Handle local tasks (no API call needed)
|
|
if task.type in [TaskType.STUDENT_OBSERVATION, TaskType.REMINDER, TaskType.HOMEWORK_CHECK]:
|
|
return await self._handle_note_task(task)
|
|
|
|
if task.type in [TaskType.CONFERENCE_TOPIC, TaskType.CORRECTION_NOTE]:
|
|
return await self._handle_note_task(task)
|
|
|
|
if task.type == TaskType.PARENT_LETTER:
|
|
return await self._generate_parent_letter(task)
|
|
|
|
if task.type == TaskType.CLASS_MESSAGE:
|
|
return await self._generate_class_message(task)
|
|
|
|
if task.type in [TaskType.CANVAS_EDIT, TaskType.CANVAS_LAYOUT]:
|
|
return await self._handle_canvas_command(task)
|
|
|
|
if task.type == TaskType.REMINDER_SCHEDULE:
|
|
return await self._schedule_reminder(task)
|
|
|
|
if task.type == TaskType.TASK_SUMMARY:
|
|
return await self._generate_task_summary(task)
|
|
|
|
logger.warning("Unknown task type", task_type=task.type.value)
|
|
return "Task type not implemented"
|
|
|
|
async def _handle_note_task(self, task: Task) -> str:
|
|
"""Handle simple note/observation tasks."""
|
|
# These are stored encrypted, no further processing needed
|
|
return "Notiz gespeichert"
|
|
|
|
async def _generate_parent_letter(self, task: Task) -> str:
|
|
"""Generate a parent letter using LLM."""
|
|
from services.fallback_llm_client import FallbackLLMClient
|
|
|
|
llm = FallbackLLMClient()
|
|
|
|
prompt = f"""Erstelle einen neutralen, professionellen Elternbrief basierend auf:
|
|
Anlass: {task.parameters.get('reason', 'Allgemeine Information')}
|
|
Kontext: {task.parameters.get('context', '')}
|
|
|
|
Der Brief soll:
|
|
- Sachlich und respektvoll formuliert sein
|
|
- Keine Schuldzuweisungen enthalten
|
|
- Konstruktiv auf Lösungen ausgerichtet sein
|
|
- In der Ich-Form aus Lehrersicht geschrieben sein
|
|
|
|
Bitte nur den Brieftext ausgeben, ohne Metakommentare."""
|
|
|
|
result = await llm.generate(prompt)
|
|
return result
|
|
|
|
async def _generate_class_message(self, task: Task) -> str:
|
|
"""Generate a class message."""
|
|
from services.fallback_llm_client import FallbackLLMClient
|
|
|
|
llm = FallbackLLMClient()
|
|
|
|
prompt = f"""Erstelle eine kurze Klassennachricht:
|
|
Inhalt: {task.parameters.get('content', '')}
|
|
Klasse: {task.parameters.get('class_ref', 'Klasse')}
|
|
|
|
Die Nachricht soll:
|
|
- Kurz und klar formuliert sein
|
|
- Freundlich aber verbindlich klingen
|
|
- Alle wichtigen Informationen enthalten
|
|
|
|
Nur die Nachricht ausgeben."""
|
|
|
|
result = await llm.generate(prompt)
|
|
return result
|
|
|
|
async def _handle_canvas_command(self, task: Task) -> str:
|
|
"""Handle Canvas editor commands."""
|
|
# Parse canvas commands and generate JSON instructions
|
|
command = task.parameters.get('command', '')
|
|
|
|
# Map natural language to Canvas actions
|
|
canvas_actions = []
|
|
|
|
if 'groesser' in command.lower() or 'größer' in command.lower():
|
|
canvas_actions.append({"action": "resize", "target": "headings", "scale": 1.2})
|
|
|
|
if 'kleiner' in command.lower():
|
|
canvas_actions.append({"action": "resize", "target": "spacing", "scale": 0.8})
|
|
|
|
if 'links' in command.lower():
|
|
canvas_actions.append({"action": "move", "direction": "left"})
|
|
|
|
if 'rechts' in command.lower():
|
|
canvas_actions.append({"action": "move", "direction": "right"})
|
|
|
|
if 'a4' in command.lower() or 'drucklayout' in command.lower():
|
|
canvas_actions.append({"action": "layout", "format": "A4"})
|
|
|
|
return str(canvas_actions)
|
|
|
|
async def _schedule_reminder(self, task: Task) -> str:
|
|
"""Schedule a reminder for later."""
|
|
# In production, this would use a scheduler service
|
|
reminder_time = task.parameters.get('time', 'tomorrow')
|
|
reminder_content = task.parameters.get('content', '')
|
|
|
|
return f"Erinnerung geplant für {reminder_time}: {reminder_content}"
|
|
|
|
async def _generate_task_summary(self, task: Task) -> str:
|
|
"""Generate a summary of pending tasks."""
|
|
session_tasks = self._session_tasks.get(task.session_id, [])
|
|
|
|
pending = []
|
|
for task_id in session_tasks:
|
|
t = self._tasks.get(task_id)
|
|
if t and t.state not in [TaskState.COMPLETED, TaskState.EXPIRED]:
|
|
pending.append(f"- {t.type.value}: {t.state.value}")
|
|
|
|
if not pending:
|
|
return "Keine offenen Aufgaben"
|
|
|
|
return "Offene Aufgaben:\n" + "\n".join(pending)
|
|
|
|
async def execute_task(self, task: Task):
|
|
"""Execute an approved task."""
|
|
if task.state != TaskState.APPROVED:
|
|
logger.warning("Task not approved", task_id=task.id[:8])
|
|
return
|
|
|
|
# Mark as completed
|
|
task.transition_to(TaskState.COMPLETED, "user_approved")
|
|
|
|
logger.info("Task completed", task_id=task.id[:8])
|
|
|
|
async def get_session_tasks(
|
|
self,
|
|
session_id: str,
|
|
state: Optional[TaskState] = None,
|
|
) -> List[Task]:
|
|
"""Get tasks for a session, optionally filtered by state."""
|
|
task_ids = self._session_tasks.get(session_id, [])
|
|
tasks = []
|
|
|
|
for task_id in task_ids:
|
|
task = self._tasks.get(task_id)
|
|
if task:
|
|
if state is None or task.state == state:
|
|
tasks.append(task)
|
|
|
|
return tasks
|
|
|
|
async def create_task_from_intent(
|
|
self,
|
|
session_id: str,
|
|
namespace_id: str,
|
|
intent: Intent,
|
|
transcript: str,
|
|
) -> Task:
|
|
"""Create a task from a detected intent."""
|
|
task = Task(
|
|
session_id=session_id,
|
|
namespace_id=namespace_id,
|
|
type=intent.type,
|
|
intent_text=transcript,
|
|
parameters=intent.parameters,
|
|
)
|
|
|
|
await self.queue_task(task)
|
|
return task
|
|
|
|
async def generate_response(
|
|
self,
|
|
session_messages: List[TranscriptMessage],
|
|
intent: Optional[Intent],
|
|
namespace_id: str,
|
|
) -> str:
|
|
"""Generate a conversational response."""
|
|
from services.fallback_llm_client import FallbackLLMClient
|
|
|
|
llm = FallbackLLMClient()
|
|
|
|
# Build conversation context
|
|
context = "\n".join([
|
|
f"{msg.role}: {msg.content}"
|
|
for msg in session_messages[-5:] # Last 5 messages
|
|
])
|
|
|
|
# Generate response based on intent
|
|
if intent:
|
|
if intent.type in [TaskType.STUDENT_OBSERVATION, TaskType.REMINDER]:
|
|
return "Verstanden, ich habe mir das notiert."
|
|
|
|
if intent.type == TaskType.WORKSHEET_GENERATE:
|
|
return "Ich erstelle das Arbeitsblatt. Das kann einen Moment dauern."
|
|
|
|
if intent.type == TaskType.PARENT_LETTER:
|
|
return "Ich bereite einen Elternbrief vor."
|
|
|
|
if intent.type == TaskType.QUIZ_GENERATE:
|
|
return "Ich generiere den Quiz. Einen Moment bitte."
|
|
|
|
# Default: use LLM for conversational response
|
|
prompt = f"""Du bist ein hilfreicher Assistent für Lehrer.
|
|
Konversation:
|
|
{context}
|
|
|
|
Antworte kurz und hilfreich auf die letzte Nachricht des Nutzers.
|
|
Halte die Antwort unter 50 Wörtern."""
|
|
|
|
response = await llm.generate(prompt)
|
|
return response
|