This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/transcription_worker/export.py
Benjamin Admin bfdaf63ba9 fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

292 lines
7.6 KiB
Python

"""
BreakPilot Transcript Export
Functions to export transcription segments to various formats:
- WebVTT (for HTML5 video captions)
- SRT (universal subtitle format)
- JSON (full data with speakers and timestamps)
"""
import json
from typing import List, Dict, Any
from datetime import datetime
def ms_to_vtt_timestamp(ms: int) -> str:
"""
Convert milliseconds to WebVTT timestamp format.
Args:
ms: Milliseconds
Returns:
Timestamp string (HH:MM:SS.mmm)
"""
hours = ms // 3600000
minutes = (ms % 3600000) // 60000
seconds = (ms % 60000) // 1000
millis = ms % 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{millis:03d}"
def ms_to_srt_timestamp(ms: int) -> str:
"""
Convert milliseconds to SRT timestamp format.
Args:
ms: Milliseconds
Returns:
Timestamp string (HH:MM:SS,mmm)
"""
hours = ms // 3600000
minutes = (ms % 3600000) // 60000
seconds = (ms % 60000) // 1000
millis = ms % 1000
# SRT uses comma as decimal separator
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"
def export_to_vtt(
segments: List[Dict],
include_speakers: bool = True,
header: str = "WEBVTT\nKind: captions\nLanguage: de\n"
) -> str:
"""
Export segments to WebVTT format.
Args:
segments: List of transcription segments
include_speakers: Include speaker labels
header: VTT header text
Returns:
WebVTT formatted string
"""
lines = [header]
for i, seg in enumerate(segments):
# Cue identifier
lines.append(f"\n{i + 1}")
# Timestamps
start = ms_to_vtt_timestamp(seg["start_time_ms"])
end = ms_to_vtt_timestamp(seg["end_time_ms"])
lines.append(f"{start} --> {end}")
# Text with optional speaker
text = seg["text"]
if include_speakers and seg.get("speaker_id"):
text = f"<v {seg['speaker_id']}>{text}"
lines.append(text)
return "\n".join(lines) + "\n"
def export_to_srt(
segments: List[Dict],
include_speakers: bool = True
) -> str:
"""
Export segments to SRT format.
Args:
segments: List of transcription segments
include_speakers: Include speaker labels in text
Returns:
SRT formatted string
"""
lines = []
for i, seg in enumerate(segments):
# Sequence number
lines.append(str(i + 1))
# Timestamps
start = ms_to_srt_timestamp(seg["start_time_ms"])
end = ms_to_srt_timestamp(seg["end_time_ms"])
lines.append(f"{start} --> {end}")
# Text with optional speaker
text = seg["text"]
if include_speakers and seg.get("speaker_id"):
text = f"[{seg['speaker_id']}] {text}"
lines.append(text)
lines.append("") # Empty line between entries
return "\n".join(lines)
def export_to_json(
segments: List[Dict],
metadata: Dict[str, Any] = None
) -> str:
"""
Export segments to JSON format with full metadata.
Args:
segments: List of transcription segments
metadata: Additional metadata to include
Returns:
JSON formatted string
"""
# Prepare export data
export_data = {
"version": "1.0",
"format": "breakpilot-transcript",
"generated_at": datetime.utcnow().isoformat() + "Z",
"metadata": metadata or {},
"segments": []
}
# Add segments
for seg in segments:
export_seg = {
"index": seg.get("index", 0),
"start_ms": seg["start_time_ms"],
"end_ms": seg["end_time_ms"],
"duration_ms": seg["end_time_ms"] - seg["start_time_ms"],
"text": seg["text"],
"speaker_id": seg.get("speaker_id"),
"confidence": seg.get("confidence")
}
# Include word-level timestamps if available
if "words" in seg:
export_seg["words"] = seg["words"]
export_data["segments"].append(export_seg)
# Calculate statistics
total_duration_ms = sum(s["duration_ms"] for s in export_data["segments"])
total_words = sum(len(s["text"].split()) for s in export_data["segments"])
unique_speakers = set(s["speaker_id"] for s in export_data["segments"] if s["speaker_id"])
export_data["statistics"] = {
"total_segments": len(export_data["segments"]),
"total_duration_ms": total_duration_ms,
"total_duration_seconds": round(total_duration_ms / 1000, 1),
"total_words": total_words,
"unique_speakers": len(unique_speakers),
"speakers": list(unique_speakers)
}
return json.dumps(export_data, indent=2, ensure_ascii=False)
def export_to_txt(
segments: List[Dict],
include_timestamps: bool = False,
include_speakers: bool = True,
paragraph_gap_ms: int = 3000
) -> str:
"""
Export segments to plain text format.
Args:
segments: List of transcription segments
include_timestamps: Add timestamps
include_speakers: Add speaker labels
paragraph_gap_ms: Gap threshold for new paragraph
Returns:
Plain text formatted string
"""
lines = []
last_end = 0
current_speaker = None
for seg in segments:
# Add paragraph break for large gaps
gap = seg["start_time_ms"] - last_end
if gap > paragraph_gap_ms and lines:
lines.append("")
# Build text line
parts = []
if include_timestamps:
ts = ms_to_vtt_timestamp(seg["start_time_ms"])
parts.append(f"[{ts}]")
speaker = seg.get("speaker_id")
if include_speakers and speaker and speaker != current_speaker:
parts.append(f"\n{speaker}:")
current_speaker = speaker
parts.append(seg["text"])
lines.append(" ".join(parts))
last_end = seg["end_time_ms"]
return "\n".join(lines)
def create_chapters(
segments: List[Dict],
min_chapter_duration_ms: int = 60000,
speaker_change_as_chapter: bool = True
) -> List[Dict]:
"""
Create chapter markers from segments.
Useful for video navigation and table of contents.
Args:
segments: List of transcription segments
min_chapter_duration_ms: Minimum chapter duration
speaker_change_as_chapter: Create chapter on speaker change
Returns:
List of chapter markers
"""
if not segments:
return []
chapters = []
chapter_start = segments[0]["start_time_ms"]
chapter_text_parts = []
current_speaker = segments[0].get("speaker_id")
for seg in segments:
elapsed = seg["start_time_ms"] - chapter_start
# Check for new chapter
speaker_changed = (
speaker_change_as_chapter and
seg.get("speaker_id") and
seg.get("speaker_id") != current_speaker
)
if elapsed >= min_chapter_duration_ms or speaker_changed:
# Save current chapter
if chapter_text_parts:
chapters.append({
"start_ms": chapter_start,
"title": " ".join(chapter_text_parts[:5]) + "...", # First 5 words
"speaker": current_speaker
})
# Start new chapter
chapter_start = seg["start_time_ms"]
chapter_text_parts = []
current_speaker = seg.get("speaker_id")
chapter_text_parts.extend(seg["text"].split())
# Don't forget the last chapter
if chapter_text_parts:
chapters.append({
"start_ms": chapter_start,
"title": " ".join(chapter_text_parts[:5]) + "...",
"speaker": current_speaker
})
return chapters