"""Compliance TTS Service — Piper TTS + FFmpeg Audio/Video Pipeline.""" import hashlib import logging import os import tempfile import uuid from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, Response from pydantic import BaseModel from storage import StorageClient from tts_engine import PiperTTS logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="Compliance TTS Service", version="1.0.0") # Configuration MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "bp-core-minio:9000") MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "breakpilot") MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "breakpilot123") MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true" PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "/app/models/de_DE-thorsten-high.onnx") PIPER_MODEL_EN_PATH = os.getenv("PIPER_MODEL_EN_PATH", "/app/models/en_US-lessac-high.onnx") AUDIO_BUCKET = "compliance-training-audio" VIDEO_BUCKET = "compliance-training-video" # Initialize services storage = StorageClient(MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY, secure=MINIO_SECURE) tts = PiperTTS(PIPER_MODEL_PATH) tts_en = PiperTTS(PIPER_MODEL_EN_PATH) if os.path.exists(PIPER_MODEL_EN_PATH) else None @app.on_event("startup") async def startup(): """Ensure buckets exist on startup.""" storage.ensure_bucket(AUDIO_BUCKET) storage.ensure_bucket(VIDEO_BUCKET) logger.info("TTS Service started") # --- Models --- class SynthesizeRequest(BaseModel): text: str language: str = "de" voice: str = "thorsten-high" module_id: str content_id: str | None = None class SynthesizeResponse(BaseModel): audio_id: str bucket: str object_key: str duration_seconds: float size_bytes: int class GenerateVideoRequest(BaseModel): script: dict audio_object_key: str module_id: str class GenerateVideoResponse(BaseModel): video_id: str bucket: str object_key: str duration_seconds: float size_bytes: int class PresignedURLRequest(BaseModel): bucket: str object_key: str expires: int = 3600 class PresignedURLResponse(BaseModel): url: str expires_in: int class VoiceInfo(BaseModel): id: str language: str name: str quality: str # --- Endpoints --- @app.get("/health") async def health(): """Health check endpoint.""" return { "status": "healthy", "piper_available": tts.is_available, "ffmpeg_available": _check_ffmpeg(), "minio_connected": storage.is_connected(), } @app.get("/voices") async def list_voices(): """List available TTS voices.""" voices = [ VoiceInfo( id="de_DE-thorsten-high", language="de", name="Thorsten (High Quality)", quality="high", ), ] if tts_en is not None: voices.append(VoiceInfo( id="en_US-lessac-high", language="en", name="Lessac (High Quality)", quality="high", )) return {"voices": voices} class SynthesizeDirectRequest(BaseModel): text: str language: str = "de" # Simple disk cache for synthesized audio (avoids re-synthesis of same text) TTS_CACHE_DIR = "/tmp/tts-cache" os.makedirs(TTS_CACHE_DIR, exist_ok=True) EDGE_TTS_VOICES = { "de": "de-DE-ConradNeural", "en": "en-US-GuyNeural", } async def _edge_tts_synthesize(text: str, language: str, output_path: str) -> bool: """Synthesize using Edge TTS (Microsoft Neural Voices). Returns True on success.""" try: import edge_tts voice = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["de"]) communicate = edge_tts.Communicate(text, voice) await communicate.save(output_path) return True except Exception as e: logger.warning(f"Edge TTS failed, falling back to Piper: {e}") return False @app.post("/synthesize-direct") async def synthesize_direct(req: SynthesizeDirectRequest): """Synthesize text and return MP3 audio directly (no MinIO upload). Uses Edge TTS (Microsoft Neural Voices) for high-quality speech. Falls back to Piper TTS if Edge TTS is unavailable (e.g. no internet). Includes disk caching so identical text is only synthesized once. """ if not req.text.strip(): raise HTTPException(status_code=400, detail="Text is empty") # Cache key based on text + language hash text_hash = hashlib.sha256(f"{req.language}:{req.text}".encode()).hexdigest()[:16] cache_path = os.path.join(TTS_CACHE_DIR, f"{text_hash}.mp3") if os.path.exists(cache_path): logger.info(f"TTS cache hit: {text_hash}") return FileResponse( cache_path, media_type="audio/mpeg", headers={"X-TTS-Cache": "hit"}, ) # Try Edge TTS first (high quality neural voices) success = await _edge_tts_synthesize(req.text, req.language, cache_path) if success and os.path.exists(cache_path): size = os.path.getsize(cache_path) logger.info(f"Edge TTS ({req.language}): {len(req.text)} chars, {size} bytes, cached as {text_hash}") return FileResponse( cache_path, media_type="audio/mpeg", headers={"X-TTS-Cache": "miss", "X-TTS-Engine": "edge"}, ) # Fallback: Piper TTS engine = tts if req.language == "en" and tts_en is not None: engine = tts_en with tempfile.TemporaryDirectory() as tmpdir: try: mp3_path, duration = engine.synthesize_to_mp3(req.text, tmpdir) import shutil shutil.copy2(mp3_path, cache_path) logger.info(f"Piper TTS ({req.language}): {len(req.text)} chars, {duration:.1f}s, cached as {text_hash}") except Exception as e: logger.error(f"TTS synthesis failed: {e}") raise HTTPException(status_code=500, detail=str(e)) return FileResponse( cache_path, media_type="audio/mpeg", headers={"X-TTS-Cache": "miss", "X-TTS-Engine": "piper"}, ) @app.post("/presigned-url", response_model=PresignedURLResponse) async def get_presigned_url(req: PresignedURLRequest): """Generate a presigned URL for accessing a stored media file.""" try: url = storage.get_presigned_url(req.bucket, req.object_key, req.expires) return PresignedURLResponse(url=url, expires_in=req.expires) except Exception as e: logger.error(f"Presigned URL generation failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/synthesize", response_model=SynthesizeResponse) async def synthesize(req: SynthesizeRequest): """Synthesize text to audio and upload to storage.""" if not req.text.strip(): raise HTTPException(status_code=400, detail="Text is empty") audio_id = str(uuid.uuid4()) content_suffix = req.content_id or "full" object_key = f"audio/{req.module_id}/{content_suffix}.mp3" with tempfile.TemporaryDirectory() as tmpdir: try: mp3_path, duration = tts.synthesize_to_mp3(req.text, tmpdir) size_bytes = storage.upload_file(AUDIO_BUCKET, object_key, mp3_path, "audio/mpeg") except Exception as e: logger.error(f"Synthesis failed: {e}") raise HTTPException(status_code=500, detail=str(e)) return SynthesizeResponse( audio_id=audio_id, bucket=AUDIO_BUCKET, object_key=object_key, duration_seconds=round(duration, 2), size_bytes=size_bytes, ) class SynthesizeSectionsRequest(BaseModel): sections: list[dict] # [{text, heading}] voice: str = "de_DE-thorsten-high" module_id: str = "" class SynthesizeSectionsResponse(BaseModel): sections: list[dict] total_duration: float class GenerateInteractiveVideoRequest(BaseModel): script: dict audio: dict # SynthesizeSectionsResponse module_id: str class GenerateInteractiveVideoResponse(BaseModel): video_id: str bucket: str object_key: str duration_seconds: float size_bytes: int @app.post("/synthesize-sections", response_model=SynthesizeSectionsResponse) async def synthesize_sections(req: SynthesizeSectionsRequest): """Synthesize audio for multiple sections, returning per-section timing.""" if not req.sections: raise HTTPException(status_code=400, detail="No sections provided") results = [] cumulative = 0.0 with tempfile.TemporaryDirectory() as tmpdir: for i, section in enumerate(req.sections): text = section.get("text", "") heading = section.get("heading", f"Section {i+1}") if not text.strip(): results.append({ "heading": heading, "audio_path": "", "audio_object_key": "", "duration": 0.0, "start_timestamp": cumulative, }) continue try: mp3_path, duration = tts.synthesize_to_mp3(text, tmpdir, suffix=f"_section_{i}") object_key = f"audio/{req.module_id}/section_{i}.mp3" storage.upload_file(AUDIO_BUCKET, object_key, mp3_path, "audio/mpeg") results.append({ "heading": heading, "audio_path": mp3_path, "audio_object_key": object_key, "duration": round(duration, 2), "start_timestamp": round(cumulative, 2), }) cumulative += duration except Exception as e: logger.error(f"Section {i} synthesis failed: {e}") raise HTTPException(status_code=500, detail=f"Section {i} synthesis failed: {e}") return SynthesizeSectionsResponse( sections=results, total_duration=round(cumulative, 2), ) @app.post("/generate-interactive-video", response_model=GenerateInteractiveVideoResponse) async def generate_interactive_video(req: GenerateInteractiveVideoRequest): """Generate an interactive presentation video with checkpoint slides.""" try: from video_generator import generate_interactive_presentation_video except ImportError: raise HTTPException(status_code=501, detail="Interactive video generation not available") video_id = str(uuid.uuid4()) object_key = f"video/{req.module_id}/interactive.mp4" with tempfile.TemporaryDirectory() as tmpdir: try: mp4_path, duration = generate_interactive_presentation_video( script=req.script, audio_sections=req.audio.get("sections", []), output_dir=tmpdir, storage=storage, audio_bucket=AUDIO_BUCKET, ) size_bytes = storage.upload_file(VIDEO_BUCKET, object_key, mp4_path, "video/mp4") except Exception as e: logger.error(f"Interactive video generation failed: {e}") raise HTTPException(status_code=500, detail=str(e)) return GenerateInteractiveVideoResponse( video_id=video_id, bucket=VIDEO_BUCKET, object_key=object_key, duration_seconds=round(duration, 2), size_bytes=size_bytes, ) @app.post("/generate-video", response_model=GenerateVideoResponse) async def generate_video(req: GenerateVideoRequest): """Generate a presentation video from slides + audio.""" try: from video_generator import generate_presentation_video except ImportError: raise HTTPException(status_code=501, detail="Video generation not available yet") video_id = str(uuid.uuid4()) object_key = f"video/{req.module_id}/presentation.mp4" with tempfile.TemporaryDirectory() as tmpdir: try: mp4_path, duration = generate_presentation_video( script=req.script, audio_object_key=req.audio_object_key, output_dir=tmpdir, storage=storage, audio_bucket=AUDIO_BUCKET, ) size_bytes = storage.upload_file(VIDEO_BUCKET, object_key, mp4_path, "video/mp4") except Exception as e: logger.error(f"Video generation failed: {e}") raise HTTPException(status_code=500, detail=str(e)) return GenerateVideoResponse( video_id=video_id, bucket=VIDEO_BUCKET, object_key=object_key, duration_seconds=round(duration, 2), size_bytes=size_bytes, ) def _check_ffmpeg() -> bool: """Check if ffmpeg is available.""" import subprocess try: subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5) return True except Exception: return False