Files
Benjamin Admin df5b6d69ef
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 32s
CI/CD / test-python-backend-compliance (push) Successful in 31s
CI/CD / test-python-document-crawler (push) Successful in 21s
CI/CD / test-python-dsms-gateway (push) Successful in 20s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
feat(tts): add Edge TTS (Microsoft Neural Voices) as primary engine with Piper fallback
Edge TTS provides near-human quality voices (de-DE-ConradNeural, en-US-GuyNeural).
Falls back to Piper TTS when Edge TTS is unavailable (e.g. no internet).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 16:13:10 +01:00

401 lines
12 KiB
Python

"""Compliance TTS Service — Piper TTS + FFmpeg Audio/Video Pipeline."""
import hashlib
import logging
import os
import tempfile
import uuid
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, Response
from pydantic import BaseModel
from storage import StorageClient
from tts_engine import PiperTTS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Compliance TTS Service", version="1.0.0")
# Configuration
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "bp-core-minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "breakpilot")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "breakpilot123")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "/app/models/de_DE-thorsten-high.onnx")
PIPER_MODEL_EN_PATH = os.getenv("PIPER_MODEL_EN_PATH", "/app/models/en_US-lessac-high.onnx")
AUDIO_BUCKET = "compliance-training-audio"
VIDEO_BUCKET = "compliance-training-video"
# Initialize services
storage = StorageClient(MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY, secure=MINIO_SECURE)
tts = PiperTTS(PIPER_MODEL_PATH)
tts_en = PiperTTS(PIPER_MODEL_EN_PATH) if os.path.exists(PIPER_MODEL_EN_PATH) else None
@app.on_event("startup")
async def startup():
"""Ensure buckets exist on startup."""
storage.ensure_bucket(AUDIO_BUCKET)
storage.ensure_bucket(VIDEO_BUCKET)
logger.info("TTS Service started")
# --- Models ---
class SynthesizeRequest(BaseModel):
text: str
language: str = "de"
voice: str = "thorsten-high"
module_id: str
content_id: str | None = None
class SynthesizeResponse(BaseModel):
audio_id: str
bucket: str
object_key: str
duration_seconds: float
size_bytes: int
class GenerateVideoRequest(BaseModel):
script: dict
audio_object_key: str
module_id: str
class GenerateVideoResponse(BaseModel):
video_id: str
bucket: str
object_key: str
duration_seconds: float
size_bytes: int
class PresignedURLRequest(BaseModel):
bucket: str
object_key: str
expires: int = 3600
class PresignedURLResponse(BaseModel):
url: str
expires_in: int
class VoiceInfo(BaseModel):
id: str
language: str
name: str
quality: str
# --- Endpoints ---
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy",
"piper_available": tts.is_available,
"ffmpeg_available": _check_ffmpeg(),
"minio_connected": storage.is_connected(),
}
@app.get("/voices")
async def list_voices():
"""List available TTS voices."""
voices = [
VoiceInfo(
id="de_DE-thorsten-high",
language="de",
name="Thorsten (High Quality)",
quality="high",
),
]
if tts_en is not None:
voices.append(VoiceInfo(
id="en_US-lessac-high",
language="en",
name="Lessac (High Quality)",
quality="high",
))
return {"voices": voices}
class SynthesizeDirectRequest(BaseModel):
text: str
language: str = "de"
# Simple disk cache for synthesized audio (avoids re-synthesis of same text)
TTS_CACHE_DIR = "/tmp/tts-cache"
os.makedirs(TTS_CACHE_DIR, exist_ok=True)
EDGE_TTS_VOICES = {
"de": "de-DE-ConradNeural",
"en": "en-US-GuyNeural",
}
async def _edge_tts_synthesize(text: str, language: str, output_path: str) -> bool:
"""Synthesize using Edge TTS (Microsoft Neural Voices). Returns True on success."""
try:
import edge_tts
voice = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["de"])
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_path)
return True
except Exception as e:
logger.warning(f"Edge TTS failed, falling back to Piper: {e}")
return False
@app.post("/synthesize-direct")
async def synthesize_direct(req: SynthesizeDirectRequest):
"""Synthesize text and return MP3 audio directly (no MinIO upload).
Uses Edge TTS (Microsoft Neural Voices) for high-quality speech.
Falls back to Piper TTS if Edge TTS is unavailable (e.g. no internet).
Includes disk caching so identical text is only synthesized once.
"""
if not req.text.strip():
raise HTTPException(status_code=400, detail="Text is empty")
# Cache key based on text + language hash
text_hash = hashlib.sha256(f"{req.language}:{req.text}".encode()).hexdigest()[:16]
cache_path = os.path.join(TTS_CACHE_DIR, f"{text_hash}.mp3")
if os.path.exists(cache_path):
logger.info(f"TTS cache hit: {text_hash}")
return FileResponse(
cache_path,
media_type="audio/mpeg",
headers={"X-TTS-Cache": "hit"},
)
# Try Edge TTS first (high quality neural voices)
success = await _edge_tts_synthesize(req.text, req.language, cache_path)
if success and os.path.exists(cache_path):
size = os.path.getsize(cache_path)
logger.info(f"Edge TTS ({req.language}): {len(req.text)} chars, {size} bytes, cached as {text_hash}")
return FileResponse(
cache_path,
media_type="audio/mpeg",
headers={"X-TTS-Cache": "miss", "X-TTS-Engine": "edge"},
)
# Fallback: Piper TTS
engine = tts
if req.language == "en" and tts_en is not None:
engine = tts_en
with tempfile.TemporaryDirectory() as tmpdir:
try:
mp3_path, duration = engine.synthesize_to_mp3(req.text, tmpdir)
import shutil
shutil.copy2(mp3_path, cache_path)
logger.info(f"Piper TTS ({req.language}): {len(req.text)} chars, {duration:.1f}s, cached as {text_hash}")
except Exception as e:
logger.error(f"TTS synthesis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
return FileResponse(
cache_path,
media_type="audio/mpeg",
headers={"X-TTS-Cache": "miss", "X-TTS-Engine": "piper"},
)
@app.post("/presigned-url", response_model=PresignedURLResponse)
async def get_presigned_url(req: PresignedURLRequest):
"""Generate a presigned URL for accessing a stored media file."""
try:
url = storage.get_presigned_url(req.bucket, req.object_key, req.expires)
return PresignedURLResponse(url=url, expires_in=req.expires)
except Exception as e:
logger.error(f"Presigned URL generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/synthesize", response_model=SynthesizeResponse)
async def synthesize(req: SynthesizeRequest):
"""Synthesize text to audio and upload to storage."""
if not req.text.strip():
raise HTTPException(status_code=400, detail="Text is empty")
audio_id = str(uuid.uuid4())
content_suffix = req.content_id or "full"
object_key = f"audio/{req.module_id}/{content_suffix}.mp3"
with tempfile.TemporaryDirectory() as tmpdir:
try:
mp3_path, duration = tts.synthesize_to_mp3(req.text, tmpdir)
size_bytes = storage.upload_file(AUDIO_BUCKET, object_key, mp3_path, "audio/mpeg")
except Exception as e:
logger.error(f"Synthesis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
return SynthesizeResponse(
audio_id=audio_id,
bucket=AUDIO_BUCKET,
object_key=object_key,
duration_seconds=round(duration, 2),
size_bytes=size_bytes,
)
class SynthesizeSectionsRequest(BaseModel):
sections: list[dict] # [{text, heading}]
voice: str = "de_DE-thorsten-high"
module_id: str = ""
class SynthesizeSectionsResponse(BaseModel):
sections: list[dict]
total_duration: float
class GenerateInteractiveVideoRequest(BaseModel):
script: dict
audio: dict # SynthesizeSectionsResponse
module_id: str
class GenerateInteractiveVideoResponse(BaseModel):
video_id: str
bucket: str
object_key: str
duration_seconds: float
size_bytes: int
@app.post("/synthesize-sections", response_model=SynthesizeSectionsResponse)
async def synthesize_sections(req: SynthesizeSectionsRequest):
"""Synthesize audio for multiple sections, returning per-section timing."""
if not req.sections:
raise HTTPException(status_code=400, detail="No sections provided")
results = []
cumulative = 0.0
with tempfile.TemporaryDirectory() as tmpdir:
for i, section in enumerate(req.sections):
text = section.get("text", "")
heading = section.get("heading", f"Section {i+1}")
if not text.strip():
results.append({
"heading": heading,
"audio_path": "",
"audio_object_key": "",
"duration": 0.0,
"start_timestamp": cumulative,
})
continue
try:
mp3_path, duration = tts.synthesize_to_mp3(text, tmpdir, suffix=f"_section_{i}")
object_key = f"audio/{req.module_id}/section_{i}.mp3"
storage.upload_file(AUDIO_BUCKET, object_key, mp3_path, "audio/mpeg")
results.append({
"heading": heading,
"audio_path": mp3_path,
"audio_object_key": object_key,
"duration": round(duration, 2),
"start_timestamp": round(cumulative, 2),
})
cumulative += duration
except Exception as e:
logger.error(f"Section {i} synthesis failed: {e}")
raise HTTPException(status_code=500, detail=f"Section {i} synthesis failed: {e}")
return SynthesizeSectionsResponse(
sections=results,
total_duration=round(cumulative, 2),
)
@app.post("/generate-interactive-video", response_model=GenerateInteractiveVideoResponse)
async def generate_interactive_video(req: GenerateInteractiveVideoRequest):
"""Generate an interactive presentation video with checkpoint slides."""
try:
from video_generator import generate_interactive_presentation_video
except ImportError:
raise HTTPException(status_code=501, detail="Interactive video generation not available")
video_id = str(uuid.uuid4())
object_key = f"video/{req.module_id}/interactive.mp4"
with tempfile.TemporaryDirectory() as tmpdir:
try:
mp4_path, duration = generate_interactive_presentation_video(
script=req.script,
audio_sections=req.audio.get("sections", []),
output_dir=tmpdir,
storage=storage,
audio_bucket=AUDIO_BUCKET,
)
size_bytes = storage.upload_file(VIDEO_BUCKET, object_key, mp4_path, "video/mp4")
except Exception as e:
logger.error(f"Interactive video generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
return GenerateInteractiveVideoResponse(
video_id=video_id,
bucket=VIDEO_BUCKET,
object_key=object_key,
duration_seconds=round(duration, 2),
size_bytes=size_bytes,
)
@app.post("/generate-video", response_model=GenerateVideoResponse)
async def generate_video(req: GenerateVideoRequest):
"""Generate a presentation video from slides + audio."""
try:
from video_generator import generate_presentation_video
except ImportError:
raise HTTPException(status_code=501, detail="Video generation not available yet")
video_id = str(uuid.uuid4())
object_key = f"video/{req.module_id}/presentation.mp4"
with tempfile.TemporaryDirectory() as tmpdir:
try:
mp4_path, duration = generate_presentation_video(
script=req.script,
audio_object_key=req.audio_object_key,
output_dir=tmpdir,
storage=storage,
audio_bucket=AUDIO_BUCKET,
)
size_bytes = storage.upload_file(VIDEO_BUCKET, object_key, mp4_path, "video/mp4")
except Exception as e:
logger.error(f"Video generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
return GenerateVideoResponse(
video_id=video_id,
bucket=VIDEO_BUCKET,
object_key=object_key,
duration_seconds=round(duration, 2),
size_bytes=size_bytes,
)
def _check_ffmpeg() -> bool:
"""Check if ffmpeg is available."""
import subprocess
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5)
return True
except Exception:
return False