Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
231 lines
7.2 KiB
Python
231 lines
7.2 KiB
Python
"""
|
|
BreakPilot Transcription Tasks
|
|
|
|
RQ task definitions for transcription processing.
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import tempfile
|
|
import structlog
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
|
|
from .transcriber import WhisperTranscriber
|
|
from .diarizer import SpeakerDiarizer
|
|
from .aligner import TranscriptAligner
|
|
from .storage import MinIOStorage
|
|
from .export import export_to_vtt, export_to_srt, export_to_json
|
|
|
|
log = structlog.get_logger(__name__)
|
|
|
|
# Configuration
|
|
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "large-v3")
|
|
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "cpu")
|
|
WHISPER_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8")
|
|
PYANNOTE_AUTH_TOKEN = os.getenv("PYANNOTE_AUTH_TOKEN")
|
|
TEMP_DIR = os.getenv("TEMP_DIR", "/tmp/transcriptions")
|
|
|
|
# MinIO Configuration
|
|
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
|
|
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "breakpilot")
|
|
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "breakpilot123")
|
|
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "breakpilot-recordings")
|
|
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
|
|
|
|
# Database URL for status updates
|
|
DATABASE_URL = os.getenv("DATABASE_URL")
|
|
|
|
|
|
def update_transcription_status(
|
|
transcription_id: str,
|
|
status: str,
|
|
error_message: Optional[str] = None,
|
|
**kwargs
|
|
):
|
|
"""Update transcription status in database."""
|
|
# TODO: Implement database update
|
|
log.info(
|
|
"status_update",
|
|
transcription_id=transcription_id,
|
|
status=status,
|
|
error=error_message,
|
|
**kwargs
|
|
)
|
|
|
|
|
|
def transcribe_recording(
|
|
transcription_id: str,
|
|
recording_id: str,
|
|
audio_path: str,
|
|
language: str = "de",
|
|
enable_diarization: bool = True
|
|
) -> dict:
|
|
"""
|
|
Main transcription task.
|
|
|
|
Downloads audio from MinIO, transcribes with Whisper,
|
|
optionally performs speaker diarization, and uploads results.
|
|
|
|
Args:
|
|
transcription_id: UUID of the transcription record
|
|
recording_id: UUID of the source recording
|
|
audio_path: Path to audio file in MinIO bucket
|
|
language: Language code (de, en, etc.)
|
|
enable_diarization: Whether to perform speaker diarization
|
|
|
|
Returns:
|
|
dict with transcription results and paths
|
|
"""
|
|
start_time = time.time()
|
|
|
|
log.info(
|
|
"transcription_started",
|
|
transcription_id=transcription_id,
|
|
recording_id=recording_id,
|
|
audio_path=audio_path,
|
|
language=language
|
|
)
|
|
|
|
# Update status to processing
|
|
update_transcription_status(
|
|
transcription_id,
|
|
status="processing",
|
|
processing_started_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
try:
|
|
# Initialize storage
|
|
storage = MinIOStorage(
|
|
endpoint=MINIO_ENDPOINT,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
bucket=MINIO_BUCKET,
|
|
secure=MINIO_SECURE
|
|
)
|
|
|
|
# Create temp directory
|
|
os.makedirs(TEMP_DIR, exist_ok=True)
|
|
|
|
# Download audio file
|
|
local_audio_path = os.path.join(TEMP_DIR, f"{transcription_id}_audio.wav")
|
|
storage.download_file(audio_path, local_audio_path)
|
|
log.info("audio_downloaded", path=local_audio_path)
|
|
|
|
# Initialize transcriber
|
|
transcriber = WhisperTranscriber(
|
|
model_name=WHISPER_MODEL,
|
|
device=WHISPER_DEVICE,
|
|
compute_type=WHISPER_COMPUTE_TYPE
|
|
)
|
|
|
|
# Transcribe audio
|
|
log.info("transcription_starting", model=WHISPER_MODEL)
|
|
segments = transcriber.transcribe(
|
|
audio_path=local_audio_path,
|
|
language=language
|
|
)
|
|
log.info("transcription_complete", segments_count=len(segments))
|
|
|
|
# Speaker diarization (if enabled and token available)
|
|
if enable_diarization and PYANNOTE_AUTH_TOKEN:
|
|
log.info("diarization_starting")
|
|
diarizer = SpeakerDiarizer(auth_token=PYANNOTE_AUTH_TOKEN)
|
|
speaker_segments = diarizer.diarize(local_audio_path)
|
|
|
|
# Align transcription with speakers
|
|
aligner = TranscriptAligner()
|
|
segments = aligner.align(segments, speaker_segments)
|
|
log.info("diarization_complete", speakers=aligner.get_speaker_count())
|
|
else:
|
|
log.info("diarization_skipped", reason="disabled or no token")
|
|
|
|
# Calculate statistics
|
|
full_text = " ".join(s["text"] for s in segments)
|
|
word_count = len(full_text.split())
|
|
avg_confidence = sum(s.get("confidence", 0) for s in segments) / len(segments) if segments else 0
|
|
|
|
# Export to different formats
|
|
base_path = audio_path.rsplit("/", 1)[0] # recordings/{recording_name}
|
|
|
|
# WebVTT
|
|
vtt_content = export_to_vtt(segments)
|
|
vtt_path = f"{base_path}/transcript.vtt"
|
|
storage.upload_content(vtt_content, vtt_path, content_type="text/vtt")
|
|
|
|
# SRT
|
|
srt_content = export_to_srt(segments)
|
|
srt_path = f"{base_path}/transcript.srt"
|
|
storage.upload_content(srt_content, srt_path, content_type="text/plain")
|
|
|
|
# JSON (full data with speakers)
|
|
json_content = export_to_json(segments, {
|
|
"transcription_id": transcription_id,
|
|
"recording_id": recording_id,
|
|
"language": language,
|
|
"model": WHISPER_MODEL,
|
|
"word_count": word_count,
|
|
"confidence": avg_confidence
|
|
})
|
|
json_path = f"{base_path}/transcript.json"
|
|
storage.upload_content(json_content, json_path, content_type="application/json")
|
|
|
|
# Cleanup temp file
|
|
if os.path.exists(local_audio_path):
|
|
os.remove(local_audio_path)
|
|
|
|
# Calculate processing time
|
|
processing_duration = int(time.time() - start_time)
|
|
|
|
# Update status to completed
|
|
result = {
|
|
"transcription_id": transcription_id,
|
|
"recording_id": recording_id,
|
|
"status": "completed",
|
|
"full_text": full_text,
|
|
"word_count": word_count,
|
|
"confidence_score": round(avg_confidence, 3),
|
|
"segments_count": len(segments),
|
|
"vtt_path": vtt_path,
|
|
"srt_path": srt_path,
|
|
"json_path": json_path,
|
|
"processing_duration_seconds": processing_duration
|
|
}
|
|
|
|
update_transcription_status(
|
|
transcription_id,
|
|
status="completed",
|
|
full_text=full_text,
|
|
word_count=word_count,
|
|
confidence_score=avg_confidence,
|
|
vtt_path=vtt_path,
|
|
srt_path=srt_path,
|
|
json_path=json_path,
|
|
processing_duration_seconds=processing_duration,
|
|
processing_completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
log.info(
|
|
"transcription_completed",
|
|
transcription_id=transcription_id,
|
|
word_count=word_count,
|
|
duration_seconds=processing_duration
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
log.error(
|
|
"transcription_failed",
|
|
transcription_id=transcription_id,
|
|
error=str(e)
|
|
)
|
|
|
|
update_transcription_status(
|
|
transcription_id,
|
|
status="failed",
|
|
error_message=str(e)
|
|
)
|
|
|
|
raise
|