""" WebSocket Streaming API Handles real-time audio streaming for voice interface WebSocket Protocol: - Binary frames: Int16 PCM Audio (24kHz, 80ms frames) - JSON frames: {"type": "config|end_turn|interrupt"} Server -> Client: - Binary: Audio Response (base64) - JSON: {"type": "transcript|intent|status|error"} """ import structlog import asyncio import json import base64 from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query from typing import Optional from datetime import datetime from config import settings from models.session import SessionStatus, TranscriptMessage, AudioChunk from models.task import TaskCreate, TaskType logger = structlog.get_logger(__name__) router = APIRouter() # Active WebSocket connections (transient) active_connections: dict[str, WebSocket] = {} @router.websocket("/ws/voice") async def voice_websocket( websocket: WebSocket, session_id: str = Query(..., description="Session ID from /api/v1/sessions"), namespace: Optional[str] = Query(None, description="Namespace ID"), key_hash: Optional[str] = Query(None, description="Encryption key hash"), ): """ WebSocket endpoint for voice streaming. Protocol: 1. Client connects with session_id 2. Client sends binary audio frames (Int16 PCM, 24kHz) 3. Server responds with transcripts, intents, and audio Audio Processing: - Chunks are processed in RAM only - No audio is ever persisted - Transcripts are encrypted before any storage """ # Get session from api.sessions import _sessions session = _sessions.get(session_id) if not session: await websocket.close(code=4004, reason="Session not found") return # Accept connection await websocket.accept() logger.info( "WebSocket connected", session_id=session_id[:8], namespace_id=session.namespace_id[:8], ) # Update session status session.status = SessionStatus.CONNECTED active_connections[session_id] = websocket # Audio buffer for accumulating chunks audio_buffer = bytearray() chunk_sequence = 0 try: # Send initial status await websocket.send_json({ "type": "status", "status": "connected", "session_id": session_id, "audio_config": { "sample_rate": settings.audio_sample_rate, "frame_size_ms": settings.audio_frame_size_ms, "encoding": "pcm_s16le", }, }) while True: # Receive message (binary or text) message = await websocket.receive() if "bytes" in message: # Binary audio data audio_data = message["bytes"] session.audio_chunks_received += 1 # Create audio chunk (transient - never persisted) chunk = AudioChunk( sequence=chunk_sequence, timestamp_ms=int((datetime.utcnow().timestamp() * 1000) % (24 * 60 * 60 * 1000)), data=audio_data, ) chunk_sequence += 1 # Accumulate in buffer audio_buffer.extend(audio_data) # Process when we have enough data (e.g., 500ms worth) samples_needed = settings.audio_sample_rate // 2 # 500ms bytes_needed = samples_needed * 2 # 16-bit = 2 bytes if len(audio_buffer) >= bytes_needed: session.status = SessionStatus.PROCESSING # Process audio chunk await process_audio_chunk( websocket, session, bytes(audio_buffer[:bytes_needed]), ) # Remove processed data audio_buffer = audio_buffer[bytes_needed:] session.audio_chunks_processed += 1 elif "text" in message: # JSON control message try: data = json.loads(message["text"]) msg_type = data.get("type") if msg_type == "config": # Client configuration logger.debug("Received config", config=data) elif msg_type == "end_turn": # User finished speaking session.status = SessionStatus.PROCESSING # Process remaining audio buffer if audio_buffer: await process_audio_chunk( websocket, session, bytes(audio_buffer), ) audio_buffer.clear() # Signal end of user turn await websocket.send_json({ "type": "status", "status": "processing", }) elif msg_type == "interrupt": # User interrupted response session.status = SessionStatus.LISTENING await websocket.send_json({ "type": "status", "status": "interrupted", }) elif msg_type == "ping": # Keep-alive ping await websocket.send_json({"type": "pong"}) except json.JSONDecodeError: logger.warning("Invalid JSON message", message=message["text"][:100]) # Update activity session.update_activity() except WebSocketDisconnect: logger.info("WebSocket disconnected", session_id=session_id[:8]) except Exception as e: logger.error("WebSocket error", session_id=session_id[:8], error=str(e)) session.status = SessionStatus.ERROR finally: # Cleanup session.status = SessionStatus.CLOSED if session_id in active_connections: del active_connections[session_id] async def process_audio_chunk( websocket: WebSocket, session, audio_data: bytes, ): """ Process an audio chunk through the voice pipeline. 1. PersonaPlex/Ollama for transcription + understanding 2. Intent detection 3. Task creation if needed 4. Response generation 5. Audio synthesis (if PersonaPlex) """ from services.task_orchestrator import TaskOrchestrator from services.intent_router import IntentRouter orchestrator = TaskOrchestrator() intent_router = IntentRouter() try: # Transcribe audio if settings.use_personaplex: # Use PersonaPlex for transcription from services.personaplex_client import PersonaPlexClient client = PersonaPlexClient() transcript = await client.transcribe(audio_data) else: # Use Ollama fallback (text-only, requires separate ASR) # For MVP, we'll simulate with a placeholder # In production, integrate with Whisper or similar from services.fallback_llm_client import FallbackLLMClient llm_client = FallbackLLMClient() transcript = await llm_client.process_audio_description(audio_data) if not transcript or not transcript.strip(): return # Send transcript to client await websocket.send_json({ "type": "transcript", "text": transcript, "final": True, "confidence": 0.95, }) # Add to session messages user_message = TranscriptMessage( role="user", content=transcript, confidence=0.95, ) session.messages.append(user_message) # Detect intent intent = await intent_router.detect_intent(transcript, session.messages) if intent: await websocket.send_json({ "type": "intent", "intent": intent.type.value, "confidence": intent.confidence, "parameters": intent.parameters, }) # Create task if intent is actionable if intent.is_actionable: task = await orchestrator.create_task_from_intent( session_id=session.id, namespace_id=session.namespace_id, intent=intent, transcript=transcript, ) await websocket.send_json({ "type": "task_created", "task_id": task.id, "task_type": task.type.value, "state": task.state.value, }) # Generate response response_text = await orchestrator.generate_response( session_messages=session.messages, intent=intent, namespace_id=session.namespace_id, ) # Send text response await websocket.send_json({ "type": "response", "text": response_text, }) # Add to session messages assistant_message = TranscriptMessage( role="assistant", content=response_text, ) session.messages.append(assistant_message) # Generate audio response if PersonaPlex is available if settings.use_personaplex: from services.personaplex_client import PersonaPlexClient client = PersonaPlexClient() audio_response = await client.synthesize(response_text) if audio_response: # Send audio in chunks chunk_size = settings.audio_frame_samples * 2 # 16-bit for i in range(0, len(audio_response), chunk_size): chunk = audio_response[i:i + chunk_size] await websocket.send_bytes(chunk) # Update session status session.status = SessionStatus.LISTENING await websocket.send_json({ "type": "status", "status": "listening", }) except Exception as e: logger.error("Audio processing error", error=str(e)) await websocket.send_json({ "type": "error", "message": "Failed to process audio", "code": "processing_error", }) @router.get("/ws/stats") async def get_websocket_stats(): """Get WebSocket connection statistics.""" return { "active_connections": len(active_connections), "connection_ids": [cid[:8] for cid in active_connections.keys()], }