Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
130 lines
3.6 KiB
Python
130 lines
3.6 KiB
Python
"""
|
|
BreakPilot Transcription Worker - Main Entry Point
|
|
|
|
Runs as an RQ worker, processing transcription jobs from the queue.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import signal
|
|
import structlog
|
|
from redis import Redis
|
|
from rq import Worker, Queue, Connection
|
|
|
|
# Configure logging
|
|
structlog.configure(
|
|
processors=[
|
|
structlog.stdlib.filter_by_level,
|
|
structlog.stdlib.add_logger_name,
|
|
structlog.stdlib.add_log_level,
|
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
|
structlog.processors.TimeStamper(fmt="iso"),
|
|
structlog.processors.StackInfoRenderer(),
|
|
structlog.processors.format_exc_info,
|
|
structlog.processors.UnicodeDecoder(),
|
|
structlog.processors.JSONRenderer()
|
|
],
|
|
wrapper_class=structlog.stdlib.BoundLogger,
|
|
context_class=dict,
|
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
|
cache_logger_on_first_use=True,
|
|
)
|
|
|
|
log = structlog.get_logger(__name__)
|
|
|
|
# Configuration
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/1")
|
|
QUEUE_NAME = os.getenv("QUEUE_NAME", "transcription")
|
|
WORKER_NAME = os.getenv("WORKER_NAME", f"transcription-worker-{os.getpid()}")
|
|
|
|
|
|
def setup_signal_handlers(worker: Worker):
|
|
"""Setup graceful shutdown handlers."""
|
|
|
|
def shutdown_handler(signum, frame):
|
|
log.info("shutdown_signal_received", signal=signum)
|
|
worker.request_stop(signum, frame)
|
|
|
|
signal.signal(signal.SIGINT, shutdown_handler)
|
|
signal.signal(signal.SIGTERM, shutdown_handler)
|
|
|
|
|
|
def preload_models():
|
|
"""Preload ML models to reduce first-job latency."""
|
|
log.info("preloading_models")
|
|
|
|
try:
|
|
from .transcriber import WhisperTranscriber
|
|
from .diarizer import SpeakerDiarizer
|
|
|
|
# Initialize transcriber (downloads model if needed)
|
|
whisper_model = os.getenv("WHISPER_MODEL", "large-v3")
|
|
device = os.getenv("WHISPER_DEVICE", "cpu")
|
|
compute_type = os.getenv("WHISPER_COMPUTE_TYPE", "int8")
|
|
|
|
transcriber = WhisperTranscriber(
|
|
model_name=whisper_model,
|
|
device=device,
|
|
compute_type=compute_type
|
|
)
|
|
log.info("whisper_model_loaded", model=whisper_model, device=device)
|
|
|
|
# Initialize diarizer (downloads model if needed)
|
|
pyannote_token = os.getenv("PYANNOTE_AUTH_TOKEN")
|
|
if pyannote_token:
|
|
diarizer = SpeakerDiarizer(auth_token=pyannote_token)
|
|
log.info("pyannote_model_loaded")
|
|
else:
|
|
log.warning("pyannote_token_missing", message="Speaker diarization disabled")
|
|
|
|
except Exception as e:
|
|
log.error("model_preload_failed", error=str(e))
|
|
# Don't fail startup, models will be loaded on first job
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the worker."""
|
|
log.info(
|
|
"worker_starting",
|
|
redis_url=REDIS_URL,
|
|
queue=QUEUE_NAME,
|
|
worker_name=WORKER_NAME
|
|
)
|
|
|
|
# Connect to Redis
|
|
redis_conn = Redis.from_url(REDIS_URL)
|
|
|
|
# Test connection
|
|
try:
|
|
redis_conn.ping()
|
|
log.info("redis_connected")
|
|
except Exception as e:
|
|
log.error("redis_connection_failed", error=str(e))
|
|
sys.exit(1)
|
|
|
|
# Preload models
|
|
preload_models()
|
|
|
|
# Create queue
|
|
queue = Queue(QUEUE_NAME, connection=redis_conn)
|
|
|
|
# Create worker
|
|
worker = Worker(
|
|
queues=[queue],
|
|
connection=redis_conn,
|
|
name=WORKER_NAME
|
|
)
|
|
|
|
# Setup signal handlers
|
|
setup_signal_handlers(worker)
|
|
|
|
log.info("worker_ready", queues=[QUEUE_NAME])
|
|
|
|
# Start processing
|
|
with Connection(redis_conn):
|
|
worker.work(with_scheduler=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|