Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
292 lines
7.6 KiB
Python
292 lines
7.6 KiB
Python
"""
|
|
BreakPilot Transcript Export
|
|
|
|
Functions to export transcription segments to various formats:
|
|
- WebVTT (for HTML5 video captions)
|
|
- SRT (universal subtitle format)
|
|
- JSON (full data with speakers and timestamps)
|
|
"""
|
|
|
|
import json
|
|
from typing import List, Dict, Any
|
|
from datetime import datetime
|
|
|
|
|
|
def ms_to_vtt_timestamp(ms: int) -> str:
|
|
"""
|
|
Convert milliseconds to WebVTT timestamp format.
|
|
|
|
Args:
|
|
ms: Milliseconds
|
|
|
|
Returns:
|
|
Timestamp string (HH:MM:SS.mmm)
|
|
"""
|
|
hours = ms // 3600000
|
|
minutes = (ms % 3600000) // 60000
|
|
seconds = (ms % 60000) // 1000
|
|
millis = ms % 1000
|
|
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{millis:03d}"
|
|
|
|
|
|
def ms_to_srt_timestamp(ms: int) -> str:
|
|
"""
|
|
Convert milliseconds to SRT timestamp format.
|
|
|
|
Args:
|
|
ms: Milliseconds
|
|
|
|
Returns:
|
|
Timestamp string (HH:MM:SS,mmm)
|
|
"""
|
|
hours = ms // 3600000
|
|
minutes = (ms % 3600000) // 60000
|
|
seconds = (ms % 60000) // 1000
|
|
millis = ms % 1000
|
|
|
|
# SRT uses comma as decimal separator
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"
|
|
|
|
|
|
def export_to_vtt(
|
|
segments: List[Dict],
|
|
include_speakers: bool = True,
|
|
header: str = "WEBVTT\nKind: captions\nLanguage: de\n"
|
|
) -> str:
|
|
"""
|
|
Export segments to WebVTT format.
|
|
|
|
Args:
|
|
segments: List of transcription segments
|
|
include_speakers: Include speaker labels
|
|
header: VTT header text
|
|
|
|
Returns:
|
|
WebVTT formatted string
|
|
"""
|
|
lines = [header]
|
|
|
|
for i, seg in enumerate(segments):
|
|
# Cue identifier
|
|
lines.append(f"\n{i + 1}")
|
|
|
|
# Timestamps
|
|
start = ms_to_vtt_timestamp(seg["start_time_ms"])
|
|
end = ms_to_vtt_timestamp(seg["end_time_ms"])
|
|
lines.append(f"{start} --> {end}")
|
|
|
|
# Text with optional speaker
|
|
text = seg["text"]
|
|
if include_speakers and seg.get("speaker_id"):
|
|
text = f"<v {seg['speaker_id']}>{text}"
|
|
|
|
lines.append(text)
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def export_to_srt(
|
|
segments: List[Dict],
|
|
include_speakers: bool = True
|
|
) -> str:
|
|
"""
|
|
Export segments to SRT format.
|
|
|
|
Args:
|
|
segments: List of transcription segments
|
|
include_speakers: Include speaker labels in text
|
|
|
|
Returns:
|
|
SRT formatted string
|
|
"""
|
|
lines = []
|
|
|
|
for i, seg in enumerate(segments):
|
|
# Sequence number
|
|
lines.append(str(i + 1))
|
|
|
|
# Timestamps
|
|
start = ms_to_srt_timestamp(seg["start_time_ms"])
|
|
end = ms_to_srt_timestamp(seg["end_time_ms"])
|
|
lines.append(f"{start} --> {end}")
|
|
|
|
# Text with optional speaker
|
|
text = seg["text"]
|
|
if include_speakers and seg.get("speaker_id"):
|
|
text = f"[{seg['speaker_id']}] {text}"
|
|
|
|
lines.append(text)
|
|
lines.append("") # Empty line between entries
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def export_to_json(
|
|
segments: List[Dict],
|
|
metadata: Dict[str, Any] = None
|
|
) -> str:
|
|
"""
|
|
Export segments to JSON format with full metadata.
|
|
|
|
Args:
|
|
segments: List of transcription segments
|
|
metadata: Additional metadata to include
|
|
|
|
Returns:
|
|
JSON formatted string
|
|
"""
|
|
# Prepare export data
|
|
export_data = {
|
|
"version": "1.0",
|
|
"format": "breakpilot-transcript",
|
|
"generated_at": datetime.utcnow().isoformat() + "Z",
|
|
"metadata": metadata or {},
|
|
"segments": []
|
|
}
|
|
|
|
# Add segments
|
|
for seg in segments:
|
|
export_seg = {
|
|
"index": seg.get("index", 0),
|
|
"start_ms": seg["start_time_ms"],
|
|
"end_ms": seg["end_time_ms"],
|
|
"duration_ms": seg["end_time_ms"] - seg["start_time_ms"],
|
|
"text": seg["text"],
|
|
"speaker_id": seg.get("speaker_id"),
|
|
"confidence": seg.get("confidence")
|
|
}
|
|
|
|
# Include word-level timestamps if available
|
|
if "words" in seg:
|
|
export_seg["words"] = seg["words"]
|
|
|
|
export_data["segments"].append(export_seg)
|
|
|
|
# Calculate statistics
|
|
total_duration_ms = sum(s["duration_ms"] for s in export_data["segments"])
|
|
total_words = sum(len(s["text"].split()) for s in export_data["segments"])
|
|
unique_speakers = set(s["speaker_id"] for s in export_data["segments"] if s["speaker_id"])
|
|
|
|
export_data["statistics"] = {
|
|
"total_segments": len(export_data["segments"]),
|
|
"total_duration_ms": total_duration_ms,
|
|
"total_duration_seconds": round(total_duration_ms / 1000, 1),
|
|
"total_words": total_words,
|
|
"unique_speakers": len(unique_speakers),
|
|
"speakers": list(unique_speakers)
|
|
}
|
|
|
|
return json.dumps(export_data, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def export_to_txt(
|
|
segments: List[Dict],
|
|
include_timestamps: bool = False,
|
|
include_speakers: bool = True,
|
|
paragraph_gap_ms: int = 3000
|
|
) -> str:
|
|
"""
|
|
Export segments to plain text format.
|
|
|
|
Args:
|
|
segments: List of transcription segments
|
|
include_timestamps: Add timestamps
|
|
include_speakers: Add speaker labels
|
|
paragraph_gap_ms: Gap threshold for new paragraph
|
|
|
|
Returns:
|
|
Plain text formatted string
|
|
"""
|
|
lines = []
|
|
last_end = 0
|
|
current_speaker = None
|
|
|
|
for seg in segments:
|
|
# Add paragraph break for large gaps
|
|
gap = seg["start_time_ms"] - last_end
|
|
if gap > paragraph_gap_ms and lines:
|
|
lines.append("")
|
|
|
|
# Build text line
|
|
parts = []
|
|
|
|
if include_timestamps:
|
|
ts = ms_to_vtt_timestamp(seg["start_time_ms"])
|
|
parts.append(f"[{ts}]")
|
|
|
|
speaker = seg.get("speaker_id")
|
|
if include_speakers and speaker and speaker != current_speaker:
|
|
parts.append(f"\n{speaker}:")
|
|
current_speaker = speaker
|
|
|
|
parts.append(seg["text"])
|
|
|
|
lines.append(" ".join(parts))
|
|
last_end = seg["end_time_ms"]
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def create_chapters(
|
|
segments: List[Dict],
|
|
min_chapter_duration_ms: int = 60000,
|
|
speaker_change_as_chapter: bool = True
|
|
) -> List[Dict]:
|
|
"""
|
|
Create chapter markers from segments.
|
|
|
|
Useful for video navigation and table of contents.
|
|
|
|
Args:
|
|
segments: List of transcription segments
|
|
min_chapter_duration_ms: Minimum chapter duration
|
|
speaker_change_as_chapter: Create chapter on speaker change
|
|
|
|
Returns:
|
|
List of chapter markers
|
|
"""
|
|
if not segments:
|
|
return []
|
|
|
|
chapters = []
|
|
chapter_start = segments[0]["start_time_ms"]
|
|
chapter_text_parts = []
|
|
current_speaker = segments[0].get("speaker_id")
|
|
|
|
for seg in segments:
|
|
elapsed = seg["start_time_ms"] - chapter_start
|
|
|
|
# Check for new chapter
|
|
speaker_changed = (
|
|
speaker_change_as_chapter and
|
|
seg.get("speaker_id") and
|
|
seg.get("speaker_id") != current_speaker
|
|
)
|
|
|
|
if elapsed >= min_chapter_duration_ms or speaker_changed:
|
|
# Save current chapter
|
|
if chapter_text_parts:
|
|
chapters.append({
|
|
"start_ms": chapter_start,
|
|
"title": " ".join(chapter_text_parts[:5]) + "...", # First 5 words
|
|
"speaker": current_speaker
|
|
})
|
|
|
|
# Start new chapter
|
|
chapter_start = seg["start_time_ms"]
|
|
chapter_text_parts = []
|
|
current_speaker = seg.get("speaker_id")
|
|
|
|
chapter_text_parts.extend(seg["text"].split())
|
|
|
|
# Don't forget the last chapter
|
|
if chapter_text_parts:
|
|
chapters.append({
|
|
"start_ms": chapter_start,
|
|
"title": " ".join(chapter_text_parts[:5]) + "...",
|
|
"speaker": current_speaker
|
|
})
|
|
|
|
return chapters
|