feat(training): add Media Pipeline — TTS Audio, Presentation Video, Bulk Generation
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 36s
CI / test-python-backend-compliance (push) Successful in 31s
CI / test-python-document-crawler (push) Successful in 23s
CI / test-python-dsms-gateway (push) Successful in 21s

Phase A: 8 new IT-Security training modules (SEC-PWD, SEC-DESK, SEC-KIAI,
SEC-BYOD, SEC-VIDEO, SEC-USB, SEC-INC, SEC-HOME) with CTM entries.
Bulk content and quiz generation endpoints for all 28 modules.

Phase B: Piper TTS service (Python/FastAPI) for local German speech synthesis.
training_media table, TTSClient in Go backend, audio generation endpoints,
AudioPlayer component in frontend. MinIO storage integration.

Phase C: FFmpeg presentation video pipeline — LLM generates slide scripts,
ImageMagick renders 1920x1080 slides, FFmpeg combines with audio to MP4.
VideoPlayer and ScriptPreview components in frontend.

New files: 15 created, 9 modified
- compliance-tts-service/ (Dockerfile, main.py, tts_engine.py, storage.py,
  slide_renderer.py, video_generator.py)
- migrations 014-016 (training engine, IT-security modules, media table)
- training package (models, store, content_generator, media, handlers)
- frontend (AudioPlayer, VideoPlayer, ScriptPreview, api, types, page)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Boenisch
2026-02-16 21:42:33 +01:00
parent fba4c411dc
commit 375914e568
28 changed files with 7015 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
FROM python:3.12-slim
# System dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
libsndfile1 \
imagemagick \
fonts-dejavu-core \
wget \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN useradd -m -u 1000 ttsuser
WORKDIR /app
# Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Download Piper model (German, thorsten, high quality)
RUN mkdir -p /app/models && \
wget -q -O /app/models/de_DE-thorsten-high.onnx \
"https://huggingface.co/rhasspy/piper-voices/resolve/main/de/de_DE/thorsten/high/de_DE-thorsten-high.onnx" && \
wget -q -O /app/models/de_DE-thorsten-high.onnx.json \
"https://huggingface.co/rhasspy/piper-voices/resolve/main/de/de_DE/thorsten/high/de_DE-thorsten-high.onnx.json"
# Copy application
COPY . .
# Fix ImageMagick policy for PDF/text rendering
RUN if [ -f /etc/ImageMagick-6/policy.xml ]; then \
sed -i 's/rights="none" pattern="PDF"/rights="read|write" pattern="PDF"/' /etc/ImageMagick-6/policy.xml; \
fi
RUN chown -R ttsuser:ttsuser /app
USER ttsuser
EXPOSE 8095
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8095/health')"
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8095"]

View File

@@ -0,0 +1,175 @@
"""Compliance TTS Service — Piper TTS + FFmpeg Audio/Video Pipeline."""
import logging
import os
import tempfile
import uuid
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from storage import StorageClient
from tts_engine import PiperTTS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Compliance TTS Service", version="1.0.0")
# Configuration
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "bp-core-minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "breakpilot")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "breakpilot123")
PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "/app/models/de_DE-thorsten-high.onnx")
AUDIO_BUCKET = "compliance-training-audio"
VIDEO_BUCKET = "compliance-training-video"
# Initialize services
storage = StorageClient(MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY)
tts = PiperTTS(PIPER_MODEL_PATH)
@app.on_event("startup")
async def startup():
"""Ensure buckets exist on startup."""
storage.ensure_bucket(AUDIO_BUCKET)
storage.ensure_bucket(VIDEO_BUCKET)
logger.info("TTS Service started")
# --- Models ---
class SynthesizeRequest(BaseModel):
text: str
language: str = "de"
voice: str = "thorsten-high"
module_id: str
content_id: str | None = None
class SynthesizeResponse(BaseModel):
audio_id: str
bucket: str
object_key: str
duration_seconds: float
size_bytes: int
class GenerateVideoRequest(BaseModel):
script: dict
audio_object_key: str
module_id: str
class GenerateVideoResponse(BaseModel):
video_id: str
bucket: str
object_key: str
duration_seconds: float
size_bytes: int
class VoiceInfo(BaseModel):
id: str
language: str
name: str
quality: str
# --- Endpoints ---
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy",
"piper_available": tts.is_available,
"ffmpeg_available": _check_ffmpeg(),
"minio_connected": storage.is_connected(),
}
@app.get("/voices")
async def list_voices():
"""List available TTS voices."""
return {
"voices": [
VoiceInfo(
id="de_DE-thorsten-high",
language="de",
name="Thorsten (High Quality)",
quality="high",
),
],
}
@app.post("/synthesize", response_model=SynthesizeResponse)
async def synthesize(req: SynthesizeRequest):
"""Synthesize text to audio and upload to storage."""
if not req.text.strip():
raise HTTPException(status_code=400, detail="Text is empty")
audio_id = str(uuid.uuid4())
content_suffix = req.content_id or "full"
object_key = f"audio/{req.module_id}/{content_suffix}.mp3"
with tempfile.TemporaryDirectory() as tmpdir:
try:
mp3_path, duration = tts.synthesize_to_mp3(req.text, tmpdir)
size_bytes = storage.upload_file(AUDIO_BUCKET, object_key, mp3_path, "audio/mpeg")
except Exception as e:
logger.error(f"Synthesis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
return SynthesizeResponse(
audio_id=audio_id,
bucket=AUDIO_BUCKET,
object_key=object_key,
duration_seconds=round(duration, 2),
size_bytes=size_bytes,
)
@app.post("/generate-video", response_model=GenerateVideoResponse)
async def generate_video(req: GenerateVideoRequest):
"""Generate a presentation video from slides + audio."""
try:
from video_generator import generate_presentation_video
except ImportError:
raise HTTPException(status_code=501, detail="Video generation not available yet")
video_id = str(uuid.uuid4())
object_key = f"video/{req.module_id}/presentation.mp4"
with tempfile.TemporaryDirectory() as tmpdir:
try:
mp4_path, duration = generate_presentation_video(
script=req.script,
audio_object_key=req.audio_object_key,
output_dir=tmpdir,
storage=storage,
audio_bucket=AUDIO_BUCKET,
)
size_bytes = storage.upload_file(VIDEO_BUCKET, object_key, mp4_path, "video/mp4")
except Exception as e:
logger.error(f"Video generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
return GenerateVideoResponse(
video_id=video_id,
bucket=VIDEO_BUCKET,
object_key=object_key,
duration_seconds=round(duration, 2),
size_bytes=size_bytes,
)
def _check_ffmpeg() -> bool:
"""Check if ffmpeg is available."""
import subprocess
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5)
return True
except Exception:
return False

View File

@@ -0,0 +1,6 @@
fastapi==0.109.2
uvicorn[standard]==0.27.1
piper-tts==1.2.0
boto3==1.34.25
python-multipart==0.0.6
pydantic==2.6.1

View File

@@ -0,0 +1,132 @@
"""ImageMagick slide renderer for presentation videos."""
import logging
import os
import subprocess
import textwrap
logger = logging.getLogger(__name__)
# Slide dimensions
WIDTH = 1920
HEIGHT = 1080
HEADER_HEIGHT = 120
FOOTER_HEIGHT = 60
FONT = "DejaVu-Sans"
FONT_BOLD = "DejaVu-Sans-Bold"
def render_slide(
heading: str,
text: str,
bullet_points: list[str],
slide_number: int,
total_slides: int,
module_code: str,
output_path: str,
) -> None:
"""Render a single slide as PNG using ImageMagick."""
cmd = [
"convert",
"-size", f"{WIDTH}x{HEIGHT}",
"xc:white",
# Blue header bar
"-fill", "#1e3a5f",
"-draw", f"rectangle 0,0 {WIDTH},{HEADER_HEIGHT}",
# Header text
"-fill", "white",
"-font", FONT_BOLD,
"-pointsize", "42",
"-gravity", "NorthWest",
"-annotate", f"+60+{(HEADER_HEIGHT - 42) // 2}", heading[:80],
]
y_pos = HEADER_HEIGHT + 40
# Main text
if text:
wrapped = textwrap.fill(text, width=80)
for line in wrapped.split("\n")[:6]:
cmd.extend([
"-fill", "#333333",
"-font", FONT,
"-pointsize", "28",
"-gravity", "NorthWest",
"-annotate", f"+60+{y_pos}", line,
])
y_pos += 38
y_pos += 20
# Bullet points
for bp in bullet_points[:8]:
wrapped_bp = textwrap.fill(bp, width=75)
first_line = True
for line in wrapped_bp.split("\n"):
prefix = "" if first_line else " "
cmd.extend([
"-fill", "#444444",
"-font", FONT,
"-pointsize", "26",
"-gravity", "NorthWest",
"-annotate", f"+60+{y_pos}", f"{prefix}{line}",
])
y_pos += 34
first_line = False
y_pos += 8
# Footer bar
cmd.extend([
"-fill", "#f0f0f0",
"-draw", f"rectangle 0,{HEIGHT - FOOTER_HEIGHT} {WIDTH},{HEIGHT}",
"-fill", "#888888",
"-font", FONT,
"-pointsize", "20",
"-gravity", "SouthWest",
"-annotate", f"+60+{(FOOTER_HEIGHT - 20) // 2}", f"{module_code}",
"-gravity", "SouthEast",
"-annotate", f"+60+{(FOOTER_HEIGHT - 20) // 2}", f"Folie {slide_number}/{total_slides}",
])
cmd.append(output_path)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"ImageMagick failed: {result.stderr}")
def render_title_slide(
title: str,
subtitle: str,
output_path: str,
) -> None:
"""Render a title slide."""
cmd = [
"convert",
"-size", f"{WIDTH}x{HEIGHT}",
"xc:white",
# Full blue background
"-fill", "#1e3a5f",
"-draw", f"rectangle 0,0 {WIDTH},{HEIGHT}",
# Title
"-fill", "white",
"-font", FONT_BOLD,
"-pointsize", "56",
"-gravity", "Center",
"-annotate", "+0-60", title[:60],
# Subtitle
"-fill", "#b0c4de",
"-font", FONT,
"-pointsize", "32",
"-gravity", "Center",
"-annotate", "+0+40", subtitle[:80],
# Footer
"-fill", "#6688aa",
"-pointsize", "22",
"-gravity", "South",
"-annotate", "+0+30", "BreakPilot Compliance Training",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"ImageMagick title slide failed: {result.stderr}")

View File

@@ -0,0 +1,56 @@
"""MinIO/S3 storage client for audio and video files."""
import logging
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
class StorageClient:
"""S3-compatible storage client for MinIO."""
def __init__(self, endpoint: str, access_key: str, secret_key: str, secure: bool = False):
self.client = boto3.client(
"s3",
endpoint_url=f"{'https' if secure else 'http'}://{endpoint}",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name="us-east-1",
)
self.endpoint = endpoint
def ensure_bucket(self, bucket: str) -> None:
"""Create bucket if it doesn't exist."""
try:
self.client.head_bucket(Bucket=bucket)
except ClientError:
try:
self.client.create_bucket(Bucket=bucket)
logger.info(f"Created bucket: {bucket}")
except ClientError as e:
logger.error(f"Failed to create bucket {bucket}: {e}")
def upload_file(self, bucket: str, object_key: str, file_path: str, content_type: str = "audio/mpeg") -> int:
"""Upload a file to storage and return file size in bytes."""
import os
self.client.upload_file(
file_path, bucket, object_key,
ExtraArgs={"ContentType": content_type},
)
return os.path.getsize(file_path)
def get_presigned_url(self, bucket: str, object_key: str, expires: int = 3600) -> str:
"""Generate a presigned URL for file access."""
return self.client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": object_key},
ExpiresIn=expires,
)
def is_connected(self) -> bool:
"""Check if storage is accessible."""
try:
self.client.list_buckets()
return True
except Exception:
return False

View File

@@ -0,0 +1,157 @@
"""Piper TTS engine wrapper for speech synthesis."""
import logging
import os
import re
import subprocess
import tempfile
import wave
from pathlib import Path
logger = logging.getLogger(__name__)
# Sentence-end pattern: split on . ! ? followed by whitespace or end
SENTENCE_SPLIT = re.compile(r'(?<=[.!?])\s+')
# Markdown stripping patterns
MD_PATTERNS = [
(re.compile(r'^#{1,6}\s+', re.MULTILINE), ''), # Headers
(re.compile(r'\*\*(.+?)\*\*'), r'\1'), # Bold
(re.compile(r'\*(.+?)\*'), r'\1'), # Italic
(re.compile(r'`(.+?)`'), r'\1'), # Inline code
(re.compile(r'```[\s\S]*?```'), ''), # Code blocks
(re.compile(r'^\s*[-*+]\s+', re.MULTILINE), ''), # List markers
(re.compile(r'^\s*\d+\.\s+', re.MULTILINE), ''), # Numbered lists
(re.compile(r'\[([^\]]+)\]\([^)]+\)'), r'\1'), # Links
(re.compile(r'^\s*>\s+', re.MULTILINE), ''), # Blockquotes
(re.compile(r'---+'), ''), # Horizontal rules
(re.compile(r'\n{3,}'), '\n\n'), # Multiple newlines
]
def strip_markdown(text: str) -> str:
"""Convert markdown to plain text for TTS."""
for pattern, replacement in MD_PATTERNS:
text = pattern.sub(replacement, text)
return text.strip()
def split_sentences(text: str) -> list[str]:
"""Split text into sentences."""
sentences = SENTENCE_SPLIT.split(text)
return [s.strip() for s in sentences if s.strip()]
class PiperTTS:
"""Piper TTS wrapper for local speech synthesis."""
def __init__(self, model_path: str):
self.model_path = model_path
self._check_piper()
def _check_piper(self) -> None:
"""Verify piper is installed and model exists."""
if not Path(self.model_path).exists():
raise FileNotFoundError(f"Piper model not found: {self.model_path}")
try:
result = subprocess.run(
["piper", "--version"], capture_output=True, text=True, timeout=10,
)
logger.info(f"Piper TTS available: {result.stdout.strip()}")
except FileNotFoundError:
# piper-tts pip package installs as python module
logger.info("Piper available via Python module")
def synthesize_to_wav(self, text: str, output_path: str) -> None:
"""Synthesize text to a WAV file using Piper."""
cmd = [
"piper",
"--model", self.model_path,
"--output_file", output_path,
]
proc = subprocess.run(
cmd, input=text, capture_output=True, text=True, timeout=120,
)
if proc.returncode != 0:
raise RuntimeError(f"Piper failed: {proc.stderr}")
def synthesize_to_mp3(self, text: str, output_dir: str) -> tuple[str, float]:
"""
Synthesize text to MP3.
Splits text into sentences, synthesizes each, concatenates, encodes to MP3.
Returns (mp3_path, duration_seconds).
"""
plain_text = strip_markdown(text)
sentences = split_sentences(plain_text)
if not sentences:
sentences = [plain_text]
wav_files = []
try:
for i, sentence in enumerate(sentences):
wav_path = os.path.join(output_dir, f"seg_{i:04d}.wav")
self.synthesize_to_wav(sentence, wav_path)
wav_files.append(wav_path)
# Concatenate WAV files
combined_wav = os.path.join(output_dir, "combined.wav")
self._concatenate_wavs(wav_files, combined_wav)
# Convert to MP3
mp3_path = os.path.join(output_dir, "output.mp3")
self._wav_to_mp3(combined_wav, mp3_path)
# Get duration
duration = self._get_audio_duration(mp3_path)
return mp3_path, duration
finally:
# Cleanup individual segments
for f in wav_files:
if os.path.exists(f):
os.remove(f)
def _concatenate_wavs(self, wav_files: list[str], output_path: str) -> None:
"""Concatenate multiple WAV files into one."""
if len(wav_files) == 1:
import shutil
shutil.copy2(wav_files[0], output_path)
return
# Read parameters from first file
with wave.open(wav_files[0], 'rb') as wf:
params = wf.getparams()
with wave.open(output_path, 'wb') as out:
out.setparams(params)
for wav_file in wav_files:
with wave.open(wav_file, 'rb') as wf:
out.writeframes(wf.readframes(wf.getnframes()))
def _wav_to_mp3(self, wav_path: str, mp3_path: str) -> None:
"""Convert WAV to MP3 using FFmpeg."""
cmd = [
"ffmpeg", "-y", "-i", wav_path,
"-codec:a", "libmp3lame", "-qscale:a", "2",
mp3_path,
]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if proc.returncode != 0:
raise RuntimeError(f"FFmpeg MP3 encoding failed: {proc.stderr}")
def _get_audio_duration(self, file_path: str) -> float:
"""Get audio duration using FFprobe."""
cmd = [
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return float(result.stdout.strip())
@property
def is_available(self) -> bool:
"""Check if Piper is available."""
try:
subprocess.run(["piper", "--version"], capture_output=True, timeout=5)
return True
except Exception:
return False

View File

@@ -0,0 +1,127 @@
"""FFmpeg video generator — combines slides + audio into presentation video."""
import logging
import os
import subprocess
import tempfile
from slide_renderer import render_slide, render_title_slide
logger = logging.getLogger(__name__)
def generate_presentation_video(
script: dict,
audio_object_key: str,
output_dir: str,
storage,
audio_bucket: str,
) -> tuple[str, float]:
"""
Generate a presentation video from a slide script and audio.
1. Download audio from MinIO
2. Get audio duration
3. Render slides as PNGs
4. Calculate timing per slide (proportional to text length)
5. Create FFmpeg concat list
6. Combine slides + audio into MP4
Returns (mp4_path, duration_seconds).
"""
title = script.get("title", "Compliance Training")
sections = script.get("sections", [])
if not sections:
raise ValueError("Script has no sections")
# Step 1: Download audio
audio_path = os.path.join(output_dir, "audio.mp3")
storage.client.download_file(audio_bucket, audio_object_key, audio_path)
# Step 2: Get audio duration
duration = _get_duration(audio_path)
# Step 3: Render slides
slides_dir = os.path.join(output_dir, "slides")
os.makedirs(slides_dir, exist_ok=True)
slide_paths = []
text_lengths = []
# Title slide
title_path = os.path.join(slides_dir, "slide_000.png")
render_title_slide(title, "Compliance Schulung", title_path)
slide_paths.append(title_path)
text_lengths.append(len(title) + 20) # Small weight for title
# Content slides
module_code = script.get("module_code", "")
total_slides = len(sections) + 1 # +1 for title
for i, section in enumerate(sections):
slide_path = os.path.join(slides_dir, f"slide_{i+1:03d}.png")
render_slide(
heading=section.get("heading", ""),
text=section.get("text", ""),
bullet_points=section.get("bullet_points", []),
slide_number=i + 2, # 1-based, title is 1
total_slides=total_slides,
module_code=module_code,
output_path=slide_path,
)
slide_paths.append(slide_path)
# Text length for timing
text_len = len(section.get("heading", "")) + len(section.get("text", ""))
text_len += sum(len(bp) for bp in section.get("bullet_points", []))
text_lengths.append(max(text_len, 50))
# Step 4: Calculate timing
total_text = sum(text_lengths)
slide_durations = [(tl / total_text) * duration for tl in text_lengths]
# Minimum 3 seconds per slide
for i in range(len(slide_durations)):
if slide_durations[i] < 3.0:
slide_durations[i] = 3.0
# Step 5: Create FFmpeg concat file
concat_path = os.path.join(output_dir, "concat.txt")
with open(concat_path, "w") as f:
for slide_path, dur in zip(slide_paths, slide_durations):
f.write(f"file '{slide_path}'\n")
f.write(f"duration {dur:.2f}\n")
# Repeat last slide for FFmpeg concat demuxer
f.write(f"file '{slide_paths[-1]}'\n")
# Step 6: Combine with FFmpeg
output_path = os.path.join(output_dir, "presentation.mp4")
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0", "-i", concat_path,
"-i", audio_path,
"-c:v", "libx264", "-pix_fmt", "yuv420p",
"-c:a", "aac", "-b:a", "128k",
"-shortest",
"-movflags", "+faststart",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg video generation failed: {result.stderr}")
video_duration = _get_duration(output_path)
return output_path, video_duration
def _get_duration(file_path: str) -> float:
"""Get media duration using FFprobe."""
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
file_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return float(result.stdout.strip())