""" Classroom API - Shared State und Helper Functions. Zentrale Komponenten die von allen Classroom-Modulen verwendet werden. """ from typing import Dict, List, Optional, Any from datetime import datetime import os import logging import asyncio import json from fastapi import HTTPException, WebSocket, Request # Auth imports (Phase 7: Keycloak Integration) try: from auth import get_current_user AUTH_ENABLED = True except ImportError: AUTH_ENABLED = False logging.warning("Auth module not available, using demo user fallback") from classroom_engine import ( LessonPhase, LessonSession, LessonStateMachine, PhaseTimer, ) # Database imports (Feature f22) try: from classroom_engine.database import get_db, init_db, SessionLocal from classroom_engine.repository import SessionRepository DB_ENABLED = True except ImportError: DB_ENABLED = False logging.warning("Classroom DB not available, using in-memory storage only") logger = logging.getLogger(__name__) # === WebSocket Connection Manager (Phase 6: Real-time) === class ConnectionManager: """ Verwaltet WebSocket-Verbindungen fuer Echtzeit-Timer-Updates. Features: - Session-basierte Verbindungen (jede Session hat eigene Clients) - Automatisches Cleanup bei Disconnect - Broadcast an alle Clients einer Session - Multi-Device Support """ def __init__(self): # session_id -> Set[WebSocket] self._connections: Dict[str, set] = {} # WebSocket -> session_id (reverse lookup) self._websocket_sessions: Dict[WebSocket, str] = {} self._lock = asyncio.Lock() async def connect(self, websocket: WebSocket, session_id: str): """Verbindet einen Client mit einer Session.""" await websocket.accept() async with self._lock: if session_id not in self._connections: self._connections[session_id] = set() self._connections[session_id].add(websocket) self._websocket_sessions[websocket] = session_id logger.info(f"WebSocket connected to session {session_id}, total clients: {len(self._connections[session_id])}") async def disconnect(self, websocket: WebSocket): """Trennt einen Client.""" async with self._lock: session_id = self._websocket_sessions.pop(websocket, None) if session_id and session_id in self._connections: self._connections[session_id].discard(websocket) if not self._connections[session_id]: del self._connections[session_id] logger.info(f"WebSocket disconnected from session {session_id}") async def broadcast_to_session(self, session_id: str, message: dict): """Sendet eine Nachricht an alle Clients einer Session.""" async with self._lock: connections = self._connections.get(session_id, set()).copy() if not connections: return message_json = json.dumps(message) dead_connections = [] for websocket in connections: try: await websocket.send_text(message_json) except Exception as e: logger.warning(f"Failed to send to websocket: {e}") dead_connections.append(websocket) # Cleanup dead connections for ws in dead_connections: await self.disconnect(ws) async def broadcast_timer_update(self, session_id: str, timer_data: dict): """Sendet Timer-Update an alle Clients einer Session.""" await self.broadcast_to_session(session_id, { "type": "timer_update", "data": timer_data }) async def broadcast_phase_change(self, session_id: str, phase_data: dict): """Sendet Phasenwechsel-Event an alle Clients.""" await self.broadcast_to_session(session_id, { "type": "phase_change", "data": phase_data }) async def broadcast_session_ended(self, session_id: str): """Sendet Session-Ende-Event an alle Clients.""" await self.broadcast_to_session(session_id, { "type": "session_ended", "data": {"session_id": session_id} }) def get_client_count(self, session_id: str) -> int: """Gibt die Anzahl der verbundenen Clients fuer eine Session zurueck.""" return len(self._connections.get(session_id, set())) def get_active_sessions(self) -> List[str]: """Gibt alle Sessions mit aktiven WebSocket-Verbindungen zurueck.""" return list(self._connections.keys()) # Global instances ws_manager = ConnectionManager() _sessions: Dict[str, LessonSession] = {} _db_initialized = False _timer_broadcast_task: Optional[asyncio.Task] = None # === Demo User === DEMO_USER = { "user_id": "demo-teacher", "email": "demo@breakpilot.app", "name": "Demo Lehrer", "given_name": "Demo", "family_name": "Lehrer", "role": "teacher", "is_demo": True } # === Timer Broadcast Functions === async def _timer_broadcast_loop(): """ Hintergrund-Task der Timer-Updates alle 1 Sekunde an verbundene Clients sendet. """ logger.info("Timer broadcast loop started") while True: try: await asyncio.sleep(1) active_ws_sessions = ws_manager.get_active_sessions() if not active_ws_sessions: continue for session_id in active_ws_sessions: session = _sessions.get(session_id) if not session or session.is_ended: continue timer_status = build_timer_status(session) await ws_manager.broadcast_timer_update(session_id, timer_status) except asyncio.CancelledError: logger.info("Timer broadcast loop cancelled") break except Exception as e: logger.error(f"Error in timer broadcast loop: {e}") await asyncio.sleep(5) def start_timer_broadcast(): """Startet den Timer-Broadcast-Task wenn noch nicht laufend.""" global _timer_broadcast_task if _timer_broadcast_task is None or _timer_broadcast_task.done(): _timer_broadcast_task = asyncio.create_task(_timer_broadcast_loop()) logger.info("Timer broadcast task created") def stop_timer_broadcast(): """Stoppt den Timer-Broadcast-Task.""" global _timer_broadcast_task if _timer_broadcast_task and not _timer_broadcast_task.done(): _timer_broadcast_task.cancel() logger.info("Timer broadcast task cancelled") # === Database Functions === def init_db_if_needed(): """Initialisiert DB und laedt aktive Sessions beim ersten Aufruf.""" global _db_initialized if _db_initialized or not DB_ENABLED: return try: init_db() _load_active_sessions_from_db() _db_initialized = True logger.info("Classroom DB initialized, loaded active sessions") except Exception as e: logger.error(f"Failed to initialize Classroom DB: {e}") def _load_active_sessions_from_db(): """Laedt alle aktiven Sessions aus der DB in den Memory-Cache.""" if not DB_ENABLED: return try: db = SessionLocal() repo = SessionRepository(db) from classroom_engine.db_models import LessonSessionDB, LessonPhaseEnum active_db_sessions = db.query(LessonSessionDB).filter( LessonSessionDB.current_phase != LessonPhaseEnum.ENDED ).all() for db_session in active_db_sessions: session = repo.to_dataclass(db_session) _sessions[session.session_id] = session logger.info(f"Loaded session {session.session_id} from DB") db.close() except Exception as e: logger.error(f"Failed to load sessions from DB: {e}") def persist_session(session: LessonSession): """Speichert/aktualisiert Session in der DB.""" if not DB_ENABLED: return try: db = SessionLocal() repo = SessionRepository(db) existing = repo.get_by_id(session.session_id) if existing: repo.update(session) else: repo.create(session) db.close() except Exception as e: logger.error(f"Failed to persist session {session.session_id}: {e}") # === Auth Functions === async def get_optional_current_user(request: Request) -> Dict[str, Any]: """ Optionale Authentifizierung - gibt Demo-User zurueck wenn kein Token. """ if not AUTH_ENABLED: return DEMO_USER auth_header = request.headers.get("Authorization", "") if not auth_header or not auth_header.startswith("Bearer "): env = os.environ.get("ENVIRONMENT", "development") if env == "development": return DEMO_USER raise HTTPException(status_code=401, detail="Nicht authentifiziert") try: return await get_current_user(request) except Exception as e: logger.warning(f"Auth failed: {e}") env = os.environ.get("ENVIRONMENT", "development") if env == "development": return DEMO_USER raise HTTPException(status_code=401, detail="Authentifizierung fehlgeschlagen") # === Session Helpers === def get_session_or_404(session_id: str) -> LessonSession: """Holt eine Session oder wirft 404. Prueft auch DB bei Cache-Miss.""" init_db_if_needed() session = _sessions.get(session_id) if session: return session if DB_ENABLED: try: db = SessionLocal() repo = SessionRepository(db) db_session = repo.get_by_id(session_id) if db_session: session = repo.to_dataclass(db_session) _sessions[session.session_id] = session db.close() return session db.close() except Exception as e: logger.error(f"Failed to load session {session_id} from DB: {e}") raise HTTPException(status_code=404, detail="Session nicht gefunden") def build_timer_status(session: LessonSession) -> dict: """Baut Timer-Status als dict fuer WebSocket-Broadcast.""" timer = PhaseTimer() status = timer.get_phase_status(session) status["session_id"] = session.session_id status["current_phase"] = session.current_phase.value status["is_paused"] = session.is_paused status["timestamp"] = datetime.utcnow().isoformat() return status def get_sessions() -> Dict[str, LessonSession]: """Gibt das Sessions-Dictionary zurueck.""" return _sessions def add_session(session: LessonSession): """Fuegt eine Session zum Cache hinzu und persistiert sie.""" _sessions[session.session_id] = session persist_session(session) def remove_session(session_id: str): """Entfernt eine Session aus dem Cache.""" _sessions.pop(session_id, None)