Files
breakpilot-lehrer/backend-lehrer/recording/transcription.py
Benjamin Admin dde45b29db
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 27s
CI / test-go-edu-search (push) Successful in 40s
CI / test-python-klausur (push) Failing after 2m30s
CI / test-python-agent-core (push) Successful in 28s
CI / test-nodejs-website (push) Successful in 20s
Restructure: Move 43 files into 8 domain packages (backend-lehrer)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 22:32:45 +02:00

251 lines
7.8 KiB
Python

"""
Recording API - Transcription Routes.
Start transcription, get status, download VTT/SRT subtitle files.
"""
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException
from fastapi.responses import PlainTextResponse
from .models import (
TranscriptionRequest,
TranscriptionStatusResponse,
)
from .helpers import (
_recordings_store,
_transcriptions_store,
log_audit,
format_vtt_time,
format_srt_time,
)
router = APIRouter(tags=["Recordings"])
# ==========================================
# TRANSCRIPTION ENDPOINTS
# ==========================================
@router.post("/{recording_id}/transcribe", response_model=TranscriptionStatusResponse)
async def start_transcription(recording_id: str, request: TranscriptionRequest):
"""
Start transcription for a recording.
Queues the recording for processing by the transcription worker.
"""
recording = _recordings_store.get(recording_id)
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
if recording["status"] == "deleted":
raise HTTPException(status_code=400, detail="Cannot transcribe deleted recording")
# Check if transcription already exists
existing = next(
(t for t in _transcriptions_store.values()
if t["recording_id"] == recording_id and t["status"] != "failed"),
None
)
if existing:
raise HTTPException(
status_code=409,
detail=f"Transcription already exists with status: {existing['status']}"
)
# Create transcription entry
transcription_id = str(uuid.uuid4())
now = datetime.utcnow()
transcription = {
"id": transcription_id,
"recording_id": recording_id,
"language": request.language,
"model": request.model,
"status": "pending",
"full_text": None,
"word_count": None,
"confidence_score": None,
"vtt_path": None,
"srt_path": None,
"json_path": None,
"error_message": None,
"processing_started_at": None,
"processing_completed_at": None,
"processing_duration_seconds": None,
"created_at": now.isoformat(),
"updated_at": now.isoformat()
}
_transcriptions_store[transcription_id] = transcription
# Update recording status
recording["status"] = "processing"
recording["updated_at"] = now.isoformat()
# Log transcription start
log_audit(
action="transcription_started",
recording_id=recording_id,
transcription_id=transcription_id,
metadata={"language": request.language, "model": request.model}
)
# TODO: Queue job to Redis/Valkey for transcription worker
return TranscriptionStatusResponse(
id=transcription_id,
recording_id=recording_id,
status="pending",
language=request.language,
model=request.model,
word_count=None,
confidence_score=None,
processing_duration_seconds=None,
error_message=None,
created_at=now,
completed_at=None
)
@router.get("/{recording_id}/transcription", response_model=TranscriptionStatusResponse)
async def get_transcription_status(recording_id: str):
"""
Get transcription status for a recording.
"""
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
return TranscriptionStatusResponse(
id=transcription["id"],
recording_id=transcription["recording_id"],
status=transcription["status"],
language=transcription["language"],
model=transcription["model"],
word_count=transcription.get("word_count"),
confidence_score=transcription.get("confidence_score"),
processing_duration_seconds=transcription.get("processing_duration_seconds"),
error_message=transcription.get("error_message"),
created_at=datetime.fromisoformat(transcription["created_at"]),
completed_at=(
datetime.fromisoformat(transcription["processing_completed_at"])
if transcription.get("processing_completed_at") else None
)
)
@router.get("/{recording_id}/transcription/text")
async def get_transcription_text(recording_id: str):
"""
Get the full transcription text.
"""
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
if transcription["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Transcription not ready. Status: {transcription['status']}"
)
return {
"transcription_id": transcription["id"],
"recording_id": recording_id,
"language": transcription["language"],
"text": transcription.get("full_text", ""),
"word_count": transcription.get("word_count", 0)
}
@router.get("/{recording_id}/transcription/vtt")
async def get_transcription_vtt(recording_id: str):
"""
Download transcription as WebVTT subtitle file.
"""
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
if transcription["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Transcription not ready. Status: {transcription['status']}"
)
# Generate VTT content
vtt_content = "WEBVTT\n\n"
text = transcription.get("full_text", "")
if text:
sentences = text.replace(".", ".\n").split("\n")
time_offset = 0
for sentence in sentences:
sentence = sentence.strip()
if sentence:
start = format_vtt_time(time_offset)
time_offset += 3000
end = format_vtt_time(time_offset)
vtt_content += f"{start} --> {end}\n{sentence}\n\n"
return PlainTextResponse(
content=vtt_content,
media_type="text/vtt",
headers={"Content-Disposition": f"attachment; filename=transcript_{recording_id}.vtt"}
)
@router.get("/{recording_id}/transcription/srt")
async def get_transcription_srt(recording_id: str):
"""
Download transcription as SRT subtitle file.
"""
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
if transcription["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Transcription not ready. Status: {transcription['status']}"
)
# Generate SRT content
srt_content = ""
text = transcription.get("full_text", "")
if text:
sentences = text.replace(".", ".\n").split("\n")
time_offset = 0
index = 1
for sentence in sentences:
sentence = sentence.strip()
if sentence:
start = format_srt_time(time_offset)
time_offset += 3000
end = format_srt_time(time_offset)
srt_content += f"{index}\n{start} --> {end}\n{sentence}\n\n"
index += 1
return PlainTextResponse(
content=srt_content,
media_type="text/plain",
headers={"Content-Disposition": f"attachment; filename=transcript_{recording_id}.srt"}
)