Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 37s
CI/CD / test-python-backend-compliance (push) Successful in 39s
CI/CD / test-python-document-crawler (push) Successful in 26s
CI/CD / test-python-dsms-gateway (push) Successful in 23s
CI/CD / validate-canonical-controls (push) Successful in 12s
CI/CD / Deploy (push) Has been skipped
Interactive Training Videos (CP-TRAIN): - DB migration 022: training_checkpoints + checkpoint_progress tables - NarratorScript generation via Anthropic (AI Teacher persona, German) - TTS batch synthesis + interactive video pipeline (slides + checkpoint slides + FFmpeg) - 4 new API endpoints: generate-interactive, interactive-manifest, checkpoint submit, checkpoint progress - InteractiveVideoPlayer component (HTML5 Video, quiz overlay, seek protection, progress tracking) - Learner portal integration with automatic completion on all checkpoints passed - 30 new tests (handler validation + grading logic + manifest/progress + seek protection) Training Blocks: - Block generator, block store, block config CRUD + preview/generate endpoints - Migration 021: training_blocks schema Control Generator + Canonical Library: - Control generator routes + service enhancements - Canonical control library helpers, sidebar entry - Citation backfill service + tests - CE libraries data (hazard, protection, evidence, lifecycle, components) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
261 lines
9.2 KiB
Python
261 lines
9.2 KiB
Python
"""FFmpeg video generator — combines slides + audio into presentation video."""
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
|
|
from slide_renderer import render_slide, render_title_slide
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def generate_presentation_video(
|
|
script: dict,
|
|
audio_object_key: str,
|
|
output_dir: str,
|
|
storage,
|
|
audio_bucket: str,
|
|
) -> tuple[str, float]:
|
|
"""
|
|
Generate a presentation video from a slide script and audio.
|
|
|
|
1. Download audio from MinIO
|
|
2. Get audio duration
|
|
3. Render slides as PNGs
|
|
4. Calculate timing per slide (proportional to text length)
|
|
5. Create FFmpeg concat list
|
|
6. Combine slides + audio into MP4
|
|
|
|
Returns (mp4_path, duration_seconds).
|
|
"""
|
|
title = script.get("title", "Compliance Training")
|
|
sections = script.get("sections", [])
|
|
|
|
if not sections:
|
|
raise ValueError("Script has no sections")
|
|
|
|
# Step 1: Download audio
|
|
audio_path = os.path.join(output_dir, "audio.mp3")
|
|
storage.client.download_file(audio_bucket, audio_object_key, audio_path)
|
|
|
|
# Step 2: Get audio duration
|
|
duration = _get_duration(audio_path)
|
|
|
|
# Step 3: Render slides
|
|
slides_dir = os.path.join(output_dir, "slides")
|
|
os.makedirs(slides_dir, exist_ok=True)
|
|
|
|
slide_paths = []
|
|
text_lengths = []
|
|
|
|
# Title slide
|
|
title_path = os.path.join(slides_dir, "slide_000.png")
|
|
render_title_slide(title, "Compliance Schulung", title_path)
|
|
slide_paths.append(title_path)
|
|
text_lengths.append(len(title) + 20) # Small weight for title
|
|
|
|
# Content slides
|
|
module_code = script.get("module_code", "")
|
|
total_slides = len(sections) + 1 # +1 for title
|
|
|
|
for i, section in enumerate(sections):
|
|
slide_path = os.path.join(slides_dir, f"slide_{i+1:03d}.png")
|
|
render_slide(
|
|
heading=section.get("heading", ""),
|
|
text=section.get("text", ""),
|
|
bullet_points=section.get("bullet_points", []),
|
|
slide_number=i + 2, # 1-based, title is 1
|
|
total_slides=total_slides,
|
|
module_code=module_code,
|
|
output_path=slide_path,
|
|
)
|
|
slide_paths.append(slide_path)
|
|
|
|
# Text length for timing
|
|
text_len = len(section.get("heading", "")) + len(section.get("text", ""))
|
|
text_len += sum(len(bp) for bp in section.get("bullet_points", []))
|
|
text_lengths.append(max(text_len, 50))
|
|
|
|
# Step 4: Calculate timing
|
|
total_text = sum(text_lengths)
|
|
slide_durations = [(tl / total_text) * duration for tl in text_lengths]
|
|
|
|
# Minimum 3 seconds per slide
|
|
for i in range(len(slide_durations)):
|
|
if slide_durations[i] < 3.0:
|
|
slide_durations[i] = 3.0
|
|
|
|
# Step 5: Create FFmpeg concat file
|
|
concat_path = os.path.join(output_dir, "concat.txt")
|
|
with open(concat_path, "w") as f:
|
|
for slide_path, dur in zip(slide_paths, slide_durations):
|
|
f.write(f"file '{slide_path}'\n")
|
|
f.write(f"duration {dur:.2f}\n")
|
|
# Repeat last slide for FFmpeg concat demuxer
|
|
f.write(f"file '{slide_paths[-1]}'\n")
|
|
|
|
# Step 6: Combine with FFmpeg
|
|
output_path = os.path.join(output_dir, "presentation.mp4")
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "concat", "-safe", "0", "-i", concat_path,
|
|
"-i", audio_path,
|
|
"-c:v", "libx264", "-pix_fmt", "yuv420p",
|
|
"-c:a", "aac", "-b:a", "128k",
|
|
"-shortest",
|
|
"-movflags", "+faststart",
|
|
output_path,
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg video generation failed: {result.stderr}")
|
|
|
|
video_duration = _get_duration(output_path)
|
|
return output_path, video_duration
|
|
|
|
|
|
def generate_interactive_presentation_video(
|
|
script: dict,
|
|
audio_sections: list[dict],
|
|
output_dir: str,
|
|
storage,
|
|
audio_bucket: str,
|
|
) -> tuple[str, float]:
|
|
"""
|
|
Generate an interactive presentation video from narrator script + per-section audio.
|
|
|
|
Includes checkpoint slides (red-bordered pause markers) between sections.
|
|
Returns (mp4_path, duration_seconds).
|
|
"""
|
|
from slide_renderer import render_slide, render_title_slide, render_checkpoint_slide
|
|
|
|
title = script.get("title", "Compliance Training")
|
|
sections = script.get("sections", [])
|
|
|
|
if not sections:
|
|
raise ValueError("Script has no sections")
|
|
if not audio_sections:
|
|
raise ValueError("No audio sections provided")
|
|
|
|
# Step 1: Download all section audio files
|
|
audio_paths = []
|
|
for i, sec in enumerate(audio_sections):
|
|
obj_key = sec.get("audio_object_key", "")
|
|
if not obj_key:
|
|
continue
|
|
audio_path = os.path.join(output_dir, f"section_{i}.mp3")
|
|
storage.client.download_file(audio_bucket, obj_key, audio_path)
|
|
audio_paths.append((i, audio_path, sec.get("duration", 0.0)))
|
|
|
|
# Step 2: Render slides
|
|
slides_dir = os.path.join(output_dir, "slides")
|
|
os.makedirs(slides_dir, exist_ok=True)
|
|
|
|
# All slide entries: (png_path, duration)
|
|
slide_entries = []
|
|
|
|
# Title slide (5 seconds)
|
|
title_path = os.path.join(slides_dir, "slide_000_title.png")
|
|
render_title_slide(title, "Interaktive Compliance-Schulung", title_path)
|
|
slide_entries.append((title_path, 5.0))
|
|
|
|
total_content_slides = sum(1 for _ in sections) # for numbering
|
|
slide_num = 1
|
|
|
|
for i, section in enumerate(sections):
|
|
heading = section.get("heading", "")
|
|
narrator_text = section.get("narrator_text", "")
|
|
bullet_points = section.get("bullet_points", [])
|
|
|
|
# Content slide for this section
|
|
slide_path = os.path.join(slides_dir, f"slide_{i+1:03d}_content.png")
|
|
render_slide(
|
|
heading=heading,
|
|
text=narrator_text[:200] if len(narrator_text) > 200 else narrator_text,
|
|
bullet_points=bullet_points,
|
|
slide_number=slide_num + 1,
|
|
total_slides=total_content_slides + 1,
|
|
module_code=script.get("module_code", ""),
|
|
output_path=slide_path,
|
|
)
|
|
slide_num += 1
|
|
|
|
# Duration = matching audio section duration
|
|
section_duration = 5.0 # fallback
|
|
if i < len(audio_paths):
|
|
section_duration = audio_paths[i][2] or 5.0
|
|
slide_entries.append((slide_path, section_duration))
|
|
|
|
# Checkpoint slide (if this section has a checkpoint)
|
|
checkpoint = section.get("checkpoint")
|
|
if checkpoint:
|
|
cp_title = checkpoint.get("title", f"Checkpoint {i+1}")
|
|
questions = checkpoint.get("questions", [])
|
|
question_preview = questions[0].get("question", "") if questions else ""
|
|
cp_path = os.path.join(slides_dir, f"slide_{i+1:03d}_checkpoint.png")
|
|
render_checkpoint_slide(cp_title, question_preview, len(questions), cp_path)
|
|
slide_entries.append((cp_path, 3.0)) # 3s still frame as pause marker
|
|
|
|
# Step 3: Concatenate all section audio files into one
|
|
combined_audio = os.path.join(output_dir, "combined_audio.mp3")
|
|
if len(audio_paths) == 1:
|
|
import shutil
|
|
shutil.copy2(audio_paths[0][1], combined_audio)
|
|
elif len(audio_paths) > 1:
|
|
# Use FFmpeg to concatenate audio
|
|
audio_list_path = os.path.join(output_dir, "audio_list.txt")
|
|
with open(audio_list_path, "w") as f:
|
|
for _, apath, _ in audio_paths:
|
|
f.write(f"file '{apath}'\n")
|
|
cmd = [
|
|
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
|
|
"-i", audio_list_path, "-c", "copy", combined_audio,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg audio concat failed: {result.stderr}")
|
|
else:
|
|
raise ValueError("No audio files to concatenate")
|
|
|
|
# Step 4: Create FFmpeg concat file for slides
|
|
concat_path = os.path.join(output_dir, "concat.txt")
|
|
with open(concat_path, "w") as f:
|
|
for slide_path, dur in slide_entries:
|
|
f.write(f"file '{slide_path}'\n")
|
|
f.write(f"duration {dur:.2f}\n")
|
|
# Repeat last slide for FFmpeg concat demuxer
|
|
f.write(f"file '{slide_entries[-1][0]}'\n")
|
|
|
|
# Step 5: Combine slides + audio into MP4
|
|
output_path = os.path.join(output_dir, "interactive.mp4")
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "concat", "-safe", "0", "-i", concat_path,
|
|
"-i", combined_audio,
|
|
"-c:v", "libx264", "-pix_fmt", "yuv420p",
|
|
"-c:a", "aac", "-b:a", "128k",
|
|
"-shortest",
|
|
"-movflags", "+faststart",
|
|
output_path,
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg interactive video failed: {result.stderr}")
|
|
|
|
video_duration = _get_duration(output_path)
|
|
return output_path, video_duration
|
|
|
|
|
|
def _get_duration(file_path: str) -> float:
|
|
"""Get media duration using FFprobe."""
|
|
cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
file_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
return float(result.stdout.strip())
|