Files
breakpilot-lehrer/voice-service/services/audio_processor.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

304 lines
8.2 KiB
Python

"""
Audio Processor - Mimi Codec Compatible
Handles audio encoding/decoding for voice streaming
Mimi Codec specifications:
- Sample rate: 24kHz
- Frame size: 80ms
- Format: Int16 PCM
- Channels: Mono
IMPORTANT: Audio is NEVER persisted to disk.
All processing happens in RAM only.
"""
import structlog
import numpy as np
from typing import Optional, Iterator, Tuple
from dataclasses import dataclass
from config import settings
logger = structlog.get_logger(__name__)
@dataclass
class AudioFrame:
"""A single audio frame for processing."""
samples: np.ndarray
timestamp_ms: int
duration_ms: int = 80
class AudioProcessor:
"""
Processes audio for the Mimi codec.
All audio processing is transient - data exists only
in RAM and is discarded after processing.
"""
def __init__(self):
self.sample_rate = settings.audio_sample_rate
self.frame_size_ms = settings.audio_frame_size_ms
self.samples_per_frame = int(self.sample_rate * self.frame_size_ms / 1000)
def bytes_to_samples(self, audio_bytes: bytes) -> np.ndarray:
"""
Convert raw bytes to numpy samples.
Args:
audio_bytes: Int16 PCM audio data
Returns:
numpy array of float32 samples (-1.0 to 1.0)
"""
# Convert bytes to int16
samples_int16 = np.frombuffer(audio_bytes, dtype=np.int16)
# Normalize to float32 (-1.0 to 1.0)
samples_float = samples_int16.astype(np.float32) / 32768.0
return samples_float
def samples_to_bytes(self, samples: np.ndarray) -> bytes:
"""
Convert numpy samples to raw bytes.
Args:
samples: float32 samples (-1.0 to 1.0)
Returns:
Int16 PCM audio data
"""
# Clip to valid range
samples = np.clip(samples, -1.0, 1.0)
# Convert to int16
samples_int16 = (samples * 32767).astype(np.int16)
return samples_int16.tobytes()
def extract_frames(
self,
audio_bytes: bytes,
start_timestamp_ms: int = 0,
) -> Iterator[AudioFrame]:
"""
Extract frames from audio data.
Args:
audio_bytes: Raw audio data
start_timestamp_ms: Starting timestamp
Yields:
AudioFrame objects
"""
samples = self.bytes_to_samples(audio_bytes)
bytes_per_frame = self.samples_per_frame * 2 # Int16 = 2 bytes
timestamp = start_timestamp_ms
for i in range(0, len(samples), self.samples_per_frame):
frame_samples = samples[i:i + self.samples_per_frame]
# Pad last frame if needed
if len(frame_samples) < self.samples_per_frame:
frame_samples = np.pad(
frame_samples,
(0, self.samples_per_frame - len(frame_samples)),
)
yield AudioFrame(
samples=frame_samples,
timestamp_ms=timestamp,
duration_ms=self.frame_size_ms,
)
timestamp += self.frame_size_ms
def combine_frames(self, frames: list[AudioFrame]) -> bytes:
"""
Combine multiple frames into continuous audio.
Args:
frames: List of AudioFrame objects
Returns:
Combined audio bytes
"""
if not frames:
return b""
# Sort by timestamp
sorted_frames = sorted(frames, key=lambda f: f.timestamp_ms)
# Combine samples
all_samples = np.concatenate([f.samples for f in sorted_frames])
return self.samples_to_bytes(all_samples)
def detect_voice_activity(
self,
audio_bytes: bytes,
threshold: float = 0.02,
min_duration_ms: int = 100,
) -> Tuple[bool, float]:
"""
Simple voice activity detection.
Args:
audio_bytes: Raw audio data
threshold: Energy threshold for speech detection
min_duration_ms: Minimum duration for valid speech
Returns:
(is_speech, energy_level)
"""
samples = self.bytes_to_samples(audio_bytes)
# Calculate RMS energy
energy = np.sqrt(np.mean(samples ** 2))
# Check if duration is sufficient
duration_ms = len(samples) / self.sample_rate * 1000
if duration_ms < min_duration_ms:
return False, energy
return energy > threshold, energy
def resample(
self,
audio_bytes: bytes,
source_rate: int,
target_rate: Optional[int] = None,
) -> bytes:
"""
Resample audio to target sample rate.
Args:
audio_bytes: Raw audio data
source_rate: Source sample rate
target_rate: Target sample rate (default: 24kHz)
Returns:
Resampled audio bytes
"""
target_rate = target_rate or self.sample_rate
if source_rate == target_rate:
return audio_bytes
samples = self.bytes_to_samples(audio_bytes)
# Calculate new length
new_length = int(len(samples) * target_rate / source_rate)
# Simple linear interpolation resampling
# (In production, use scipy.signal.resample or librosa)
x_old = np.linspace(0, 1, len(samples))
x_new = np.linspace(0, 1, new_length)
samples_resampled = np.interp(x_new, x_old, samples)
return self.samples_to_bytes(samples_resampled)
def normalize_audio(
self,
audio_bytes: bytes,
target_db: float = -3.0,
) -> bytes:
"""
Normalize audio to target dB level.
Args:
audio_bytes: Raw audio data
target_db: Target peak level in dB
Returns:
Normalized audio bytes
"""
samples = self.bytes_to_samples(audio_bytes)
# Find peak
peak = np.max(np.abs(samples))
if peak < 0.001: # Silence
return audio_bytes
# Calculate gain
target_linear = 10 ** (target_db / 20)
gain = target_linear / peak
# Apply gain
samples_normalized = samples * gain
return self.samples_to_bytes(samples_normalized)
def apply_noise_gate(
self,
audio_bytes: bytes,
threshold_db: float = -40.0,
attack_ms: float = 5.0,
release_ms: float = 50.0,
) -> bytes:
"""
Apply noise gate to reduce background noise.
Args:
audio_bytes: Raw audio data
threshold_db: Gate threshold in dB
attack_ms: Attack time in ms
release_ms: Release time in ms
Returns:
Gated audio bytes
"""
samples = self.bytes_to_samples(audio_bytes)
# Convert threshold to linear
threshold = 10 ** (threshold_db / 20)
# Calculate envelope
envelope = np.abs(samples)
# Simple gate
gate = np.where(envelope > threshold, 1.0, 0.0)
# Smooth gate transitions
attack_samples = int(attack_ms * self.sample_rate / 1000)
release_samples = int(release_ms * self.sample_rate / 1000)
# Apply smoothing (simple moving average)
kernel_size = max(attack_samples, release_samples)
if kernel_size > 1:
kernel = np.ones(kernel_size) / kernel_size
gate = np.convolve(gate, kernel, mode='same')
# Apply gate
samples_gated = samples * gate
return self.samples_to_bytes(samples_gated)
def get_audio_stats(self, audio_bytes: bytes) -> dict:
"""
Get statistics about audio data.
Args:
audio_bytes: Raw audio data
Returns:
Dictionary with audio statistics
"""
samples = self.bytes_to_samples(audio_bytes)
# Calculate stats
rms = np.sqrt(np.mean(samples ** 2))
peak = np.max(np.abs(samples))
duration_ms = len(samples) / self.sample_rate * 1000
# Convert to dB
rms_db = 20 * np.log10(rms + 1e-10)
peak_db = 20 * np.log10(peak + 1e-10)
return {
"duration_ms": duration_ms,
"sample_count": len(samples),
"rms_db": round(rms_db, 1),
"peak_db": round(peak_db, 1),
"sample_rate": self.sample_rate,
}