This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

203 lines
6.2 KiB
Python

"""
BreakPilot Transcript Aligner
Aligns Whisper transcription segments with pyannote speaker diarization.
Assigns speaker IDs to each transcribed segment.
"""
import structlog
from typing import List, Dict, Optional
from collections import defaultdict
log = structlog.get_logger(__name__)
class TranscriptAligner:
"""
Aligns transcription segments with speaker diarization results.
Uses overlap-based matching to assign speaker IDs to each
transcribed segment. Handles cases where speakers change
mid-sentence.
"""
def __init__(self):
"""Initialize the aligner."""
self._speaker_count = 0
self._speaker_map = {} # Maps pyannote IDs to friendly names
def align(
self,
transcription_segments: List[Dict],
diarization_segments: List[Dict],
min_overlap_ratio: float = 0.3
) -> List[Dict]:
"""
Align transcription with speaker diarization.
Args:
transcription_segments: List of segments from Whisper
diarization_segments: List of segments from pyannote
min_overlap_ratio: Minimum overlap ratio to assign speaker
Returns:
Transcription segments with speaker_id added
"""
if not diarization_segments:
log.warning("no_diarization_segments", message="Returning transcription without speakers")
return transcription_segments
log.info(
"aligning_transcription",
transcription_count=len(transcription_segments),
diarization_count=len(diarization_segments)
)
# Build speaker mapping
unique_speakers = set(s["speaker_id"] for s in diarization_segments)
self._speaker_count = len(unique_speakers)
for i, speaker in enumerate(sorted(unique_speakers)):
self._speaker_map[speaker] = f"SPEAKER_{i:02d}"
# Align each transcription segment
aligned_segments = []
for trans_seg in transcription_segments:
speaker_id = self._find_speaker_for_segment(
trans_seg,
diarization_segments,
min_overlap_ratio
)
aligned_seg = trans_seg.copy()
aligned_seg["speaker_id"] = speaker_id
aligned_segments.append(aligned_seg)
# Log statistics
speaker_counts = defaultdict(int)
for seg in aligned_segments:
speaker_counts[seg.get("speaker_id", "UNKNOWN")] += 1
log.info(
"alignment_complete",
speakers=dict(speaker_counts),
total_speakers=self._speaker_count
)
return aligned_segments
def _find_speaker_for_segment(
self,
trans_seg: Dict,
diarization_segments: List[Dict],
min_overlap_ratio: float
) -> Optional[str]:
"""
Find the best matching speaker for a transcription segment.
Uses overlap-based matching with the speaker who has the
highest overlap with the segment.
"""
trans_start = trans_seg["start_time_ms"]
trans_end = trans_seg["end_time_ms"]
trans_duration = trans_end - trans_start
if trans_duration <= 0:
return None
# Find overlapping diarization segments
overlaps = []
for diar_seg in diarization_segments:
diar_start = diar_seg["start_time_ms"]
diar_end = diar_seg["end_time_ms"]
# Calculate overlap
overlap_start = max(trans_start, diar_start)
overlap_end = min(trans_end, diar_end)
overlap_duration = max(0, overlap_end - overlap_start)
if overlap_duration > 0:
overlap_ratio = overlap_duration / trans_duration
overlaps.append({
"speaker_id": diar_seg["speaker_id"],
"overlap_duration": overlap_duration,
"overlap_ratio": overlap_ratio
})
if not overlaps:
return None
# Find speaker with highest overlap
best_match = max(overlaps, key=lambda x: x["overlap_duration"])
if best_match["overlap_ratio"] >= min_overlap_ratio:
original_id = best_match["speaker_id"]
return self._speaker_map.get(original_id, original_id)
return None
def get_speaker_count(self) -> int:
"""Get the number of unique speakers detected."""
return self._speaker_count
def get_speaker_mapping(self) -> Dict[str, str]:
"""Get the mapping from pyannote IDs to friendly names."""
return self._speaker_map.copy()
def merge_consecutive_segments(
self,
segments: List[Dict],
max_gap_ms: int = 1000,
same_speaker_only: bool = True
) -> List[Dict]:
"""
Merge consecutive segments that are close together.
Useful for creating cleaner subtitle output.
Args:
segments: List of aligned segments
max_gap_ms: Maximum gap between segments to merge
same_speaker_only: Only merge if same speaker
Returns:
List of merged segments
"""
if not segments:
return []
merged = []
current = segments[0].copy()
for next_seg in segments[1:]:
gap = next_seg["start_time_ms"] - current["end_time_ms"]
same_speaker = (
not same_speaker_only or
current.get("speaker_id") == next_seg.get("speaker_id")
)
if gap <= max_gap_ms and same_speaker:
# Merge segments
current["end_time_ms"] = next_seg["end_time_ms"]
current["text"] = current["text"] + " " + next_seg["text"]
# Merge word timestamps if present
if "words" in current and "words" in next_seg:
current["words"].extend(next_seg["words"])
else:
# Save current and start new
merged.append(current)
current = next_seg.copy()
# Don't forget the last segment
merged.append(current)
log.info(
"segments_merged",
original_count=len(segments),
merged_count=len(merged)
)
return merged