Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
198 lines
5.8 KiB
Python
198 lines
5.8 KiB
Python
"""
|
|
BreakPilot Speaker Diarizer
|
|
|
|
Uses pyannote.audio (MIT License) for speaker diarization.
|
|
Identifies who spoke when in an audio recording.
|
|
"""
|
|
|
|
import os
|
|
import structlog
|
|
from typing import List, Dict, Optional
|
|
|
|
log = structlog.get_logger(__name__)
|
|
|
|
|
|
class SpeakerDiarizer:
|
|
"""
|
|
Speaker diarization using pyannote.audio.
|
|
|
|
Identifies distinct speakers in an audio recording and provides
|
|
timestamp information for when each speaker is talking.
|
|
|
|
License: MIT
|
|
Source: https://github.com/pyannote/pyannote-audio
|
|
|
|
Note: Requires a HuggingFace token with access to pyannote models.
|
|
Accept the conditions at: https://huggingface.co/pyannote/speaker-diarization
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
auth_token: Optional[str] = None,
|
|
device: str = "auto"
|
|
):
|
|
"""
|
|
Initialize the diarizer.
|
|
|
|
Args:
|
|
auth_token: HuggingFace token with pyannote access
|
|
device: Device to run on ("cpu", "cuda", "auto")
|
|
"""
|
|
self.auth_token = auth_token or os.getenv("PYANNOTE_AUTH_TOKEN")
|
|
self.device = device
|
|
self._pipeline = None
|
|
|
|
if not self.auth_token:
|
|
log.warning(
|
|
"pyannote_token_missing",
|
|
message="Speaker diarization requires a HuggingFace token"
|
|
)
|
|
|
|
def _load_pipeline(self):
|
|
"""Lazy load the diarization pipeline."""
|
|
if self._pipeline is not None:
|
|
return
|
|
|
|
if not self.auth_token:
|
|
raise ValueError(
|
|
"HuggingFace token required for pyannote.audio. "
|
|
"Set PYANNOTE_AUTH_TOKEN environment variable."
|
|
)
|
|
|
|
try:
|
|
from pyannote.audio import Pipeline
|
|
import torch
|
|
|
|
log.info("loading_pyannote_pipeline", device=self.device)
|
|
|
|
# Load pre-trained speaker diarization pipeline
|
|
self._pipeline = Pipeline.from_pretrained(
|
|
"pyannote/speaker-diarization-3.1",
|
|
use_auth_token=self.auth_token
|
|
)
|
|
|
|
# Move to appropriate device
|
|
if self.device == "auto":
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
else:
|
|
device = torch.device(self.device)
|
|
|
|
self._pipeline.to(device)
|
|
|
|
log.info("pyannote_pipeline_loaded", device=str(device))
|
|
|
|
except ImportError:
|
|
log.error("pyannote_not_installed")
|
|
raise ImportError(
|
|
"pyannote.audio is not installed. "
|
|
"Install with: pip install pyannote.audio"
|
|
)
|
|
|
|
def diarize(
|
|
self,
|
|
audio_path: str,
|
|
num_speakers: Optional[int] = None,
|
|
min_speakers: Optional[int] = None,
|
|
max_speakers: Optional[int] = None
|
|
) -> List[Dict]:
|
|
"""
|
|
Perform speaker diarization on an audio file.
|
|
|
|
Args:
|
|
audio_path: Path to audio file (WAV recommended)
|
|
num_speakers: Exact number of speakers (if known)
|
|
min_speakers: Minimum number of speakers
|
|
max_speakers: Maximum number of speakers
|
|
|
|
Returns:
|
|
List of speaker segments with speaker ID and timestamps
|
|
"""
|
|
self._load_pipeline()
|
|
|
|
if not os.path.exists(audio_path):
|
|
raise FileNotFoundError(f"Audio file not found: {audio_path}")
|
|
|
|
log.info(
|
|
"starting_diarization",
|
|
audio_path=audio_path,
|
|
num_speakers=num_speakers
|
|
)
|
|
|
|
# Run diarization
|
|
diarization = self._pipeline(
|
|
audio_path,
|
|
num_speakers=num_speakers,
|
|
min_speakers=min_speakers,
|
|
max_speakers=max_speakers
|
|
)
|
|
|
|
# Convert to list of segments
|
|
segments = []
|
|
for turn, _, speaker in diarization.itertracks(yield_label=True):
|
|
segments.append({
|
|
"speaker_id": speaker,
|
|
"start_time_ms": int(turn.start * 1000),
|
|
"end_time_ms": int(turn.end * 1000),
|
|
"duration_ms": int((turn.end - turn.start) * 1000)
|
|
})
|
|
|
|
# Get unique speakers
|
|
unique_speakers = set(s["speaker_id"] for s in segments)
|
|
|
|
log.info(
|
|
"diarization_complete",
|
|
segments_count=len(segments),
|
|
speakers_count=len(unique_speakers),
|
|
speakers=list(unique_speakers)
|
|
)
|
|
|
|
return segments
|
|
|
|
def get_speaker_stats(self, segments: List[Dict]) -> Dict:
|
|
"""
|
|
Calculate speaking statistics per speaker.
|
|
|
|
Args:
|
|
segments: List of speaker segments from diarize()
|
|
|
|
Returns:
|
|
dict with speaking time and percentage per speaker
|
|
"""
|
|
speaker_times = {}
|
|
|
|
for seg in segments:
|
|
speaker = seg["speaker_id"]
|
|
duration = seg["duration_ms"]
|
|
|
|
if speaker not in speaker_times:
|
|
speaker_times[speaker] = 0
|
|
speaker_times[speaker] += duration
|
|
|
|
total_time = sum(speaker_times.values())
|
|
|
|
stats = {}
|
|
for speaker, time_ms in speaker_times.items():
|
|
stats[speaker] = {
|
|
"total_time_ms": time_ms,
|
|
"total_time_seconds": round(time_ms / 1000, 1),
|
|
"percentage": round((time_ms / total_time) * 100, 1) if total_time > 0 else 0
|
|
}
|
|
|
|
return {
|
|
"speakers": stats,
|
|
"total_speakers": len(stats),
|
|
"total_duration_ms": total_time
|
|
}
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if diarization is available (token configured)."""
|
|
return bool(self.auth_token)
|
|
|
|
def get_pipeline_info(self) -> dict:
|
|
"""Get information about the pipeline."""
|
|
return {
|
|
"available": self.is_available(),
|
|
"device": self.device,
|
|
"loaded": self._pipeline is not None
|
|
}
|