"""Piper TTS engine wrapper for speech synthesis.""" import logging import os import re import subprocess import tempfile import wave from pathlib import Path logger = logging.getLogger(__name__) # Sentence-end pattern: split on . ! ? followed by whitespace or end SENTENCE_SPLIT = re.compile(r'(?<=[.!?])\s+') # Markdown stripping patterns MD_PATTERNS = [ (re.compile(r'^#{1,6}\s+', re.MULTILINE), ''), # Headers (re.compile(r'\*\*(.+?)\*\*'), r'\1'), # Bold (re.compile(r'\*(.+?)\*'), r'\1'), # Italic (re.compile(r'`(.+?)`'), r'\1'), # Inline code (re.compile(r'```[\s\S]*?```'), ''), # Code blocks (re.compile(r'^\s*[-*+]\s+', re.MULTILINE), ''), # List markers (re.compile(r'^\s*\d+\.\s+', re.MULTILINE), ''), # Numbered lists (re.compile(r'\[([^\]]+)\]\([^)]+\)'), r'\1'), # Links (re.compile(r'^\s*>\s+', re.MULTILINE), ''), # Blockquotes (re.compile(r'---+'), ''), # Horizontal rules (re.compile(r'\n{3,}'), '\n\n'), # Multiple newlines ] def strip_markdown(text: str) -> str: """Convert markdown to plain text for TTS.""" for pattern, replacement in MD_PATTERNS: text = pattern.sub(replacement, text) return text.strip() def split_sentences(text: str) -> list[str]: """Split text into sentences.""" sentences = SENTENCE_SPLIT.split(text) return [s.strip() for s in sentences if s.strip()] class PiperTTS: """Piper TTS wrapper for local speech synthesis.""" def __init__(self, model_path: str): self.model_path = model_path self._check_piper() def _check_piper(self) -> None: """Verify piper is installed and model exists.""" if not Path(self.model_path).exists(): raise FileNotFoundError(f"Piper model not found: {self.model_path}") try: result = subprocess.run( ["piper", "--version"], capture_output=True, text=True, timeout=10, ) logger.info(f"Piper TTS available: {result.stdout.strip()}") except FileNotFoundError: # piper-tts pip package installs as python module logger.info("Piper available via Python module") def synthesize_to_wav(self, text: str, output_path: str) -> None: """Synthesize text to a WAV file using Piper.""" cmd = [ "piper", "--model", self.model_path, "--output_file", output_path, ] proc = subprocess.run( cmd, input=text, capture_output=True, text=True, timeout=120, ) if proc.returncode != 0: raise RuntimeError(f"Piper failed: {proc.stderr}") def synthesize_to_mp3(self, text: str, output_dir: str, suffix: str = "") -> tuple[str, float]: """ Synthesize text to MP3. Splits text into sentences, synthesizes each, concatenates, encodes to MP3. Returns (mp3_path, duration_seconds). """ plain_text = strip_markdown(text) sentences = split_sentences(plain_text) if not sentences: sentences = [plain_text] wav_files = [] try: for i, sentence in enumerate(sentences): wav_path = os.path.join(output_dir, f"seg{suffix}_{i:04d}.wav") self.synthesize_to_wav(sentence, wav_path) wav_files.append(wav_path) # Concatenate WAV files combined_wav = os.path.join(output_dir, f"combined{suffix}.wav") self._concatenate_wavs(wav_files, combined_wav) # Convert to MP3 mp3_path = os.path.join(output_dir, f"output{suffix}.mp3") self._wav_to_mp3(combined_wav, mp3_path) # Get duration duration = self._get_audio_duration(mp3_path) return mp3_path, duration finally: # Cleanup individual segments for f in wav_files: if os.path.exists(f): os.remove(f) def _concatenate_wavs(self, wav_files: list[str], output_path: str) -> None: """Concatenate multiple WAV files into one.""" if len(wav_files) == 1: import shutil shutil.copy2(wav_files[0], output_path) return # Read parameters from first file with wave.open(wav_files[0], 'rb') as wf: params = wf.getparams() with wave.open(output_path, 'wb') as out: out.setparams(params) for wav_file in wav_files: with wave.open(wav_file, 'rb') as wf: out.writeframes(wf.readframes(wf.getnframes())) def _wav_to_mp3(self, wav_path: str, mp3_path: str) -> None: """Convert WAV to MP3 using FFmpeg.""" cmd = [ "ffmpeg", "-y", "-i", wav_path, "-codec:a", "libmp3lame", "-qscale:a", "2", mp3_path, ] proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if proc.returncode != 0: raise RuntimeError(f"FFmpeg MP3 encoding failed: {proc.stderr}") def _get_audio_duration(self, file_path: str) -> float: """Get audio duration using FFprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) return float(result.stdout.strip()) @property def is_available(self) -> bool: """Check if Piper is available.""" try: subprocess.run(["piper", "--version"], capture_output=True, timeout=5) return True except Exception: return False