"""FFmpeg video generator — combines slides + audio into presentation video.""" import logging import os import subprocess import tempfile from slide_renderer import render_slide, render_title_slide logger = logging.getLogger(__name__) def generate_presentation_video( script: dict, audio_object_key: str, output_dir: str, storage, audio_bucket: str, ) -> tuple[str, float]: """ Generate a presentation video from a slide script and audio. 1. Download audio from MinIO 2. Get audio duration 3. Render slides as PNGs 4. Calculate timing per slide (proportional to text length) 5. Create FFmpeg concat list 6. Combine slides + audio into MP4 Returns (mp4_path, duration_seconds). """ title = script.get("title", "Compliance Training") sections = script.get("sections", []) if not sections: raise ValueError("Script has no sections") # Step 1: Download audio audio_path = os.path.join(output_dir, "audio.mp3") storage.client.download_file(audio_bucket, audio_object_key, audio_path) # Step 2: Get audio duration duration = _get_duration(audio_path) # Step 3: Render slides slides_dir = os.path.join(output_dir, "slides") os.makedirs(slides_dir, exist_ok=True) slide_paths = [] text_lengths = [] # Title slide title_path = os.path.join(slides_dir, "slide_000.png") render_title_slide(title, "Compliance Schulung", title_path) slide_paths.append(title_path) text_lengths.append(len(title) + 20) # Small weight for title # Content slides module_code = script.get("module_code", "") total_slides = len(sections) + 1 # +1 for title for i, section in enumerate(sections): slide_path = os.path.join(slides_dir, f"slide_{i+1:03d}.png") render_slide( heading=section.get("heading", ""), text=section.get("text", ""), bullet_points=section.get("bullet_points", []), slide_number=i + 2, # 1-based, title is 1 total_slides=total_slides, module_code=module_code, output_path=slide_path, ) slide_paths.append(slide_path) # Text length for timing text_len = len(section.get("heading", "")) + len(section.get("text", "")) text_len += sum(len(bp) for bp in section.get("bullet_points", [])) text_lengths.append(max(text_len, 50)) # Step 4: Calculate timing total_text = sum(text_lengths) slide_durations = [(tl / total_text) * duration for tl in text_lengths] # Minimum 3 seconds per slide for i in range(len(slide_durations)): if slide_durations[i] < 3.0: slide_durations[i] = 3.0 # Step 5: Create FFmpeg concat file concat_path = os.path.join(output_dir, "concat.txt") with open(concat_path, "w") as f: for slide_path, dur in zip(slide_paths, slide_durations): f.write(f"file '{slide_path}'\n") f.write(f"duration {dur:.2f}\n") # Repeat last slide for FFmpeg concat demuxer f.write(f"file '{slide_paths[-1]}'\n") # Step 6: Combine with FFmpeg output_path = os.path.join(output_dir, "presentation.mp4") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_path, "-i", audio_path, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", "-b:a", "128k", "-shortest", "-movflags", "+faststart", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) if result.returncode != 0: raise RuntimeError(f"FFmpeg video generation failed: {result.stderr}") video_duration = _get_duration(output_path) return output_path, video_duration def generate_interactive_presentation_video( script: dict, audio_sections: list[dict], output_dir: str, storage, audio_bucket: str, ) -> tuple[str, float]: """ Generate an interactive presentation video from narrator script + per-section audio. Includes checkpoint slides (red-bordered pause markers) between sections. Returns (mp4_path, duration_seconds). """ from slide_renderer import render_slide, render_title_slide, render_checkpoint_slide title = script.get("title", "Compliance Training") sections = script.get("sections", []) if not sections: raise ValueError("Script has no sections") if not audio_sections: raise ValueError("No audio sections provided") # Step 1: Download all section audio files audio_paths = [] for i, sec in enumerate(audio_sections): obj_key = sec.get("audio_object_key", "") if not obj_key: continue audio_path = os.path.join(output_dir, f"section_{i}.mp3") storage.client.download_file(audio_bucket, obj_key, audio_path) audio_paths.append((i, audio_path, sec.get("duration", 0.0))) # Step 2: Render slides slides_dir = os.path.join(output_dir, "slides") os.makedirs(slides_dir, exist_ok=True) # All slide entries: (png_path, duration) slide_entries = [] # Title slide (5 seconds) title_path = os.path.join(slides_dir, "slide_000_title.png") render_title_slide(title, "Interaktive Compliance-Schulung", title_path) slide_entries.append((title_path, 5.0)) total_content_slides = sum(1 for _ in sections) # for numbering slide_num = 1 for i, section in enumerate(sections): heading = section.get("heading", "") narrator_text = section.get("narrator_text", "") bullet_points = section.get("bullet_points", []) # Content slide for this section slide_path = os.path.join(slides_dir, f"slide_{i+1:03d}_content.png") render_slide( heading=heading, text=narrator_text[:200] if len(narrator_text) > 200 else narrator_text, bullet_points=bullet_points, slide_number=slide_num + 1, total_slides=total_content_slides + 1, module_code=script.get("module_code", ""), output_path=slide_path, ) slide_num += 1 # Duration = matching audio section duration section_duration = 5.0 # fallback if i < len(audio_paths): section_duration = audio_paths[i][2] or 5.0 slide_entries.append((slide_path, section_duration)) # Checkpoint slide (if this section has a checkpoint) checkpoint = section.get("checkpoint") if checkpoint: cp_title = checkpoint.get("title", f"Checkpoint {i+1}") questions = checkpoint.get("questions", []) question_preview = questions[0].get("question", "") if questions else "" cp_path = os.path.join(slides_dir, f"slide_{i+1:03d}_checkpoint.png") render_checkpoint_slide(cp_title, question_preview, len(questions), cp_path) slide_entries.append((cp_path, 3.0)) # 3s still frame as pause marker # Step 3: Concatenate all section audio files into one combined_audio = os.path.join(output_dir, "combined_audio.mp3") if len(audio_paths) == 1: import shutil shutil.copy2(audio_paths[0][1], combined_audio) elif len(audio_paths) > 1: # Use FFmpeg to concatenate audio audio_list_path = os.path.join(output_dir, "audio_list.txt") with open(audio_list_path, "w") as f: for _, apath, _ in audio_paths: f.write(f"file '{apath}'\n") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", audio_list_path, "-c", "copy", combined_audio, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: raise RuntimeError(f"FFmpeg audio concat failed: {result.stderr}") else: raise ValueError("No audio files to concatenate") # Step 4: Create FFmpeg concat file for slides concat_path = os.path.join(output_dir, "concat.txt") with open(concat_path, "w") as f: for slide_path, dur in slide_entries: f.write(f"file '{slide_path}'\n") f.write(f"duration {dur:.2f}\n") # Repeat last slide for FFmpeg concat demuxer f.write(f"file '{slide_entries[-1][0]}'\n") # Step 5: Combine slides + audio into MP4 output_path = os.path.join(output_dir, "interactive.mp4") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_path, "-i", combined_audio, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", "-b:a", "128k", "-shortest", "-movflags", "+faststart", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) if result.returncode != 0: raise RuntimeError(f"FFmpeg interactive video failed: {result.stderr}") video_duration = _get_duration(output_path) return output_path, video_duration def _get_duration(file_path: str) -> float: """Get media duration using FFprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) return float(result.stdout.strip())