""" Task Models - Clawdbot State Machine Task lifecycle management with encrypted references State Machine: DRAFT -> QUEUED -> RUNNING -> READY | +-----------+----------+ | | APPROVED REJECTED | | COMPLETED DRAFT (revision) Any State -> EXPIRED (TTL) Any State -> PAUSED (User Interrupt) """ from datetime import datetime from enum import Enum from typing import Optional, Dict, Any, List from pydantic import BaseModel, Field import uuid class TaskState(str, Enum): """Task state machine states.""" DRAFT = "draft" QUEUED = "queued" RUNNING = "running" READY = "ready" APPROVED = "approved" REJECTED = "rejected" COMPLETED = "completed" EXPIRED = "expired" PAUSED = "paused" class TaskType(str, Enum): """Task types for Breakpilot integration.""" # Gruppe 1: Kurze Notizen STUDENT_OBSERVATION = "student_observation" REMINDER = "reminder" HOMEWORK_CHECK = "homework_check" CONFERENCE_TOPIC = "conference_topic" CORRECTION_NOTE = "correction_note" # Gruppe 2: Arbeitsblatt-Generierung WORKSHEET_GENERATE = "worksheet_generate" WORKSHEET_DIFFERENTIATE = "worksheet_differentiate" # Gruppe 3: Situatives Arbeiten QUICK_ACTIVITY = "quick_activity" QUIZ_GENERATE = "quiz_generate" PARENT_LETTER = "parent_letter" CLASS_MESSAGE = "class_message" # Gruppe 4: Canvas-Editor CANVAS_EDIT = "canvas_edit" CANVAS_LAYOUT = "canvas_layout" # Gruppe 5: Korrektur-Assistenz OPERATOR_CHECKLIST = "operator_checklist" EH_PASSAGE = "eh_passage" FEEDBACK_SUGGEST = "feedback_suggest" # Gruppe 6: Follow-up REMINDER_SCHEDULE = "reminder_schedule" TASK_SUMMARY = "task_summary" class Task(BaseModel): """ Task entity for Clawdbot orchestration. Stored in Valkey with TTL. """ id: str = Field(default_factory=lambda: str(uuid.uuid4())) session_id: str = Field(..., description="Parent session ID") namespace_id: str = Field(..., description="Teacher namespace ID") # Task definition type: TaskType state: TaskState = Field(default=TaskState.DRAFT) intent_text: str = Field(..., description="Original voice command (encrypted ref)") # Task parameters (no PII, only references) parameters: Dict[str, Any] = Field(default_factory=dict) # Example parameters: # - student_ref: encrypted reference to student # - class_ref: encrypted reference to class # - content_type: "worksheet", "quiz", etc. # - source_ref: encrypted reference to source document # Execution state result_ref: Optional[str] = Field(default=None, description="Encrypted result reference") error_message: Optional[str] = Field(default=None) # Timestamps created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) completed_at: Optional[datetime] = Field(default=None) expires_at: Optional[datetime] = Field(default=None) # Audit trail (no PII) state_history: List[Dict[str, Any]] = Field(default_factory=list) def transition_to(self, new_state: TaskState, reason: Optional[str] = None): """Transition to a new state with history tracking.""" old_state = self.state self.state = new_state self.updated_at = datetime.utcnow() # Add to history (no PII in reason) self.state_history.append({ "from": old_state.value, "to": new_state.value, "timestamp": self.updated_at.isoformat(), "reason": reason, }) if new_state in [TaskState.COMPLETED, TaskState.EXPIRED]: self.completed_at = self.updated_at class Config: json_schema_extra = { "example": { "id": "task-xyz789", "session_id": "session-abc123", "namespace_id": "teacher-ns-456", "type": "student_observation", "state": "ready", "intent_text": "encrypted:abc123...", "parameters": { "student_ref": "encrypted:student-max-123", "observation_type": "behavior", }, "created_at": "2026-01-26T10:30:00Z", "updated_at": "2026-01-26T10:30:05Z", } } class TaskCreate(BaseModel): """Request to create a new task.""" session_id: str type: TaskType intent_text: str = Field(..., description="Voice command text") parameters: Dict[str, Any] = Field(default_factory=dict) class Config: json_schema_extra = { "example": { "session_id": "session-abc123", "type": "student_observation", "intent_text": "Notiz zu Max: heute wiederholt gestoert", "parameters": { "student_name": "Max", # Will be encrypted "observation": "wiederholt gestoert", }, } } class TaskResponse(BaseModel): """Task response for API.""" id: str session_id: str type: TaskType state: TaskState created_at: datetime updated_at: datetime result_available: bool = Field(default=False) error_message: Optional[str] = Field(default=None) class Config: json_schema_extra = { "example": { "id": "task-xyz789", "session_id": "session-abc123", "type": "student_observation", "state": "completed", "created_at": "2026-01-26T10:30:00Z", "updated_at": "2026-01-26T10:30:10Z", "result_available": True, } } class TaskTransition(BaseModel): """Request to transition task state.""" new_state: TaskState reason: Optional[str] = Field(default=None, description="Transition reason (no PII)") class Config: json_schema_extra = { "example": { "new_state": "approved", "reason": "user_confirmed", } } # Valid state transitions VALID_TRANSITIONS: Dict[TaskState, List[TaskState]] = { TaskState.DRAFT: [TaskState.QUEUED, TaskState.EXPIRED, TaskState.PAUSED], TaskState.QUEUED: [TaskState.RUNNING, TaskState.EXPIRED, TaskState.PAUSED], TaskState.RUNNING: [TaskState.READY, TaskState.EXPIRED, TaskState.PAUSED], TaskState.READY: [TaskState.APPROVED, TaskState.REJECTED, TaskState.EXPIRED, TaskState.PAUSED], TaskState.APPROVED: [TaskState.COMPLETED, TaskState.EXPIRED], TaskState.REJECTED: [TaskState.DRAFT, TaskState.EXPIRED], TaskState.PAUSED: [TaskState.DRAFT, TaskState.QUEUED, TaskState.EXPIRED], TaskState.COMPLETED: [], # Terminal state TaskState.EXPIRED: [], # Terminal state } def is_valid_transition(from_state: TaskState, to_state: TaskState) -> bool: """Check if a state transition is valid.""" return to_state in VALID_TRANSITIONS.get(from_state, [])