This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

231 lines
7.2 KiB
Python

"""
BreakPilot Transcription Tasks
RQ task definitions for transcription processing.
"""
import os
import time
import tempfile
import structlog
from typing import Optional
from datetime import datetime
from .transcriber import WhisperTranscriber
from .diarizer import SpeakerDiarizer
from .aligner import TranscriptAligner
from .storage import MinIOStorage
from .export import export_to_vtt, export_to_srt, export_to_json
log = structlog.get_logger(__name__)
# Configuration
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "large-v3")
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "cpu")
WHISPER_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8")
PYANNOTE_AUTH_TOKEN = os.getenv("PYANNOTE_AUTH_TOKEN")
TEMP_DIR = os.getenv("TEMP_DIR", "/tmp/transcriptions")
# MinIO Configuration
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "breakpilot")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "breakpilot123")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "breakpilot-recordings")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
# Database URL for status updates
DATABASE_URL = os.getenv("DATABASE_URL")
def update_transcription_status(
transcription_id: str,
status: str,
error_message: Optional[str] = None,
**kwargs
):
"""Update transcription status in database."""
# TODO: Implement database update
log.info(
"status_update",
transcription_id=transcription_id,
status=status,
error=error_message,
**kwargs
)
def transcribe_recording(
transcription_id: str,
recording_id: str,
audio_path: str,
language: str = "de",
enable_diarization: bool = True
) -> dict:
"""
Main transcription task.
Downloads audio from MinIO, transcribes with Whisper,
optionally performs speaker diarization, and uploads results.
Args:
transcription_id: UUID of the transcription record
recording_id: UUID of the source recording
audio_path: Path to audio file in MinIO bucket
language: Language code (de, en, etc.)
enable_diarization: Whether to perform speaker diarization
Returns:
dict with transcription results and paths
"""
start_time = time.time()
log.info(
"transcription_started",
transcription_id=transcription_id,
recording_id=recording_id,
audio_path=audio_path,
language=language
)
# Update status to processing
update_transcription_status(
transcription_id,
status="processing",
processing_started_at=datetime.utcnow().isoformat()
)
try:
# Initialize storage
storage = MinIOStorage(
endpoint=MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket=MINIO_BUCKET,
secure=MINIO_SECURE
)
# Create temp directory
os.makedirs(TEMP_DIR, exist_ok=True)
# Download audio file
local_audio_path = os.path.join(TEMP_DIR, f"{transcription_id}_audio.wav")
storage.download_file(audio_path, local_audio_path)
log.info("audio_downloaded", path=local_audio_path)
# Initialize transcriber
transcriber = WhisperTranscriber(
model_name=WHISPER_MODEL,
device=WHISPER_DEVICE,
compute_type=WHISPER_COMPUTE_TYPE
)
# Transcribe audio
log.info("transcription_starting", model=WHISPER_MODEL)
segments = transcriber.transcribe(
audio_path=local_audio_path,
language=language
)
log.info("transcription_complete", segments_count=len(segments))
# Speaker diarization (if enabled and token available)
if enable_diarization and PYANNOTE_AUTH_TOKEN:
log.info("diarization_starting")
diarizer = SpeakerDiarizer(auth_token=PYANNOTE_AUTH_TOKEN)
speaker_segments = diarizer.diarize(local_audio_path)
# Align transcription with speakers
aligner = TranscriptAligner()
segments = aligner.align(segments, speaker_segments)
log.info("diarization_complete", speakers=aligner.get_speaker_count())
else:
log.info("diarization_skipped", reason="disabled or no token")
# Calculate statistics
full_text = " ".join(s["text"] for s in segments)
word_count = len(full_text.split())
avg_confidence = sum(s.get("confidence", 0) for s in segments) / len(segments) if segments else 0
# Export to different formats
base_path = audio_path.rsplit("/", 1)[0] # recordings/{recording_name}
# WebVTT
vtt_content = export_to_vtt(segments)
vtt_path = f"{base_path}/transcript.vtt"
storage.upload_content(vtt_content, vtt_path, content_type="text/vtt")
# SRT
srt_content = export_to_srt(segments)
srt_path = f"{base_path}/transcript.srt"
storage.upload_content(srt_content, srt_path, content_type="text/plain")
# JSON (full data with speakers)
json_content = export_to_json(segments, {
"transcription_id": transcription_id,
"recording_id": recording_id,
"language": language,
"model": WHISPER_MODEL,
"word_count": word_count,
"confidence": avg_confidence
})
json_path = f"{base_path}/transcript.json"
storage.upload_content(json_content, json_path, content_type="application/json")
# Cleanup temp file
if os.path.exists(local_audio_path):
os.remove(local_audio_path)
# Calculate processing time
processing_duration = int(time.time() - start_time)
# Update status to completed
result = {
"transcription_id": transcription_id,
"recording_id": recording_id,
"status": "completed",
"full_text": full_text,
"word_count": word_count,
"confidence_score": round(avg_confidence, 3),
"segments_count": len(segments),
"vtt_path": vtt_path,
"srt_path": srt_path,
"json_path": json_path,
"processing_duration_seconds": processing_duration
}
update_transcription_status(
transcription_id,
status="completed",
full_text=full_text,
word_count=word_count,
confidence_score=avg_confidence,
vtt_path=vtt_path,
srt_path=srt_path,
json_path=json_path,
processing_duration_seconds=processing_duration,
processing_completed_at=datetime.utcnow().isoformat()
)
log.info(
"transcription_completed",
transcription_id=transcription_id,
word_count=word_count,
duration_seconds=processing_duration
)
return result
except Exception as e:
log.error(
"transcription_failed",
transcription_id=transcription_id,
error=str(e)
)
update_transcription_status(
transcription_id,
status="failed",
error_message=str(e)
)
raise