Files
breakpilot-lehrer/backend-lehrer/recording_api.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

849 lines
27 KiB
Python

"""
BreakPilot Recording API
Verwaltet Jibri Meeting-Aufzeichnungen und deren Metadaten.
Empfaengt Webhooks von Jibri nach Upload zu MinIO.
"""
import os
import uuid
from datetime import datetime, timedelta
from typing import Optional, List
from pydantic import BaseModel, Field
from fastapi import APIRouter, HTTPException, Query, Depends, Request
from fastapi.responses import JSONResponse
router = APIRouter(prefix="/api/recordings", tags=["Recordings"])
# ==========================================
# ENVIRONMENT CONFIGURATION
# ==========================================
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "breakpilot")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "breakpilot123")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "breakpilot-recordings")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
# Default retention period in days (DSGVO compliance)
DEFAULT_RETENTION_DAYS = int(os.getenv("RECORDING_RETENTION_DAYS", "365"))
# ==========================================
# PYDANTIC MODELS
# ==========================================
class JibriWebhookPayload(BaseModel):
"""Webhook payload from Jibri finalize.sh script."""
event: str = Field(..., description="Event type: recording_completed")
recording_name: str = Field(..., description="Unique recording identifier")
storage_path: str = Field(..., description="Path in MinIO bucket")
audio_path: Optional[str] = Field(None, description="Extracted audio path")
file_size_bytes: int = Field(..., description="Video file size in bytes")
timestamp: str = Field(..., description="ISO timestamp of upload")
class RecordingCreate(BaseModel):
"""Manual recording creation (for testing)."""
meeting_id: str
title: Optional[str] = None
storage_path: str
audio_path: Optional[str] = None
duration_seconds: Optional[int] = None
participant_count: Optional[int] = 0
retention_days: Optional[int] = DEFAULT_RETENTION_DAYS
class RecordingResponse(BaseModel):
"""Recording details response."""
id: str
meeting_id: str
title: Optional[str]
storage_path: str
audio_path: Optional[str]
file_size_bytes: Optional[int]
duration_seconds: Optional[int]
participant_count: int
status: str
recorded_at: datetime
retention_days: int
retention_expires_at: datetime
transcription_status: Optional[str] = None
transcription_id: Optional[str] = None
class RecordingListResponse(BaseModel):
"""Paginated list of recordings."""
recordings: List[RecordingResponse]
total: int
page: int
page_size: int
class TranscriptionRequest(BaseModel):
"""Request to start transcription."""
language: str = Field(default="de", description="Language code: de, en, etc.")
model: str = Field(default="large-v3", description="Whisper model to use")
priority: int = Field(default=0, description="Queue priority (higher = sooner)")
class TranscriptionStatusResponse(BaseModel):
"""Transcription status and progress."""
id: str
recording_id: str
status: str
language: str
model: str
word_count: Optional[int]
confidence_score: Optional[float]
processing_duration_seconds: Optional[int]
error_message: Optional[str]
created_at: datetime
completed_at: Optional[datetime]
# ==========================================
# IN-MEMORY STORAGE (Dev Mode)
# ==========================================
# In production, these would be database queries
_recordings_store: dict = {}
_transcriptions_store: dict = {}
_audit_log: list = []
def log_audit(
action: str,
recording_id: Optional[str] = None,
transcription_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[dict] = None
):
"""Log audit event for DSGVO compliance."""
_audit_log.append({
"id": str(uuid.uuid4()),
"recording_id": recording_id,
"transcription_id": transcription_id,
"user_id": user_id,
"action": action,
"metadata": metadata or {},
"created_at": datetime.utcnow().isoformat()
})
# ==========================================
# WEBHOOK ENDPOINT (Jibri)
# ==========================================
@router.post("/webhook")
async def jibri_webhook(payload: JibriWebhookPayload, request: Request):
"""
Webhook endpoint called by Jibri finalize.sh after upload.
This creates a new recording entry and optionally triggers transcription.
"""
if payload.event != "recording_completed":
return JSONResponse(
status_code=400,
content={"error": f"Unknown event type: {payload.event}"}
)
# Extract meeting_id from recording_name (format: meetingId_timestamp)
parts = payload.recording_name.split("_")
meeting_id = parts[0] if parts else payload.recording_name
# Create recording entry
recording_id = str(uuid.uuid4())
recorded_at = datetime.utcnow()
recording = {
"id": recording_id,
"meeting_id": meeting_id,
"jibri_session_id": payload.recording_name,
"title": f"Recording {meeting_id}",
"storage_path": payload.storage_path,
"audio_path": payload.audio_path,
"file_size_bytes": payload.file_size_bytes,
"duration_seconds": None, # Will be updated after analysis
"participant_count": 0,
"status": "uploaded",
"recorded_at": recorded_at.isoformat(),
"retention_days": DEFAULT_RETENTION_DAYS,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
_recordings_store[recording_id] = recording
# Log the creation
log_audit(
action="created",
recording_id=recording_id,
metadata={
"source": "jibri_webhook",
"storage_path": payload.storage_path,
"file_size_bytes": payload.file_size_bytes
}
)
return {
"success": True,
"recording_id": recording_id,
"meeting_id": meeting_id,
"status": "uploaded",
"message": "Recording registered successfully"
}
# ==========================================
# HEALTH & AUDIT ENDPOINTS (must be before parameterized routes)
# ==========================================
@router.get("/health")
async def recordings_health():
"""Health check for recording service."""
return {
"status": "healthy",
"recordings_count": len(_recordings_store),
"transcriptions_count": len(_transcriptions_store),
"minio_endpoint": MINIO_ENDPOINT,
"bucket": MINIO_BUCKET
}
@router.get("/audit/log")
async def get_audit_log(
recording_id: Optional[str] = Query(None),
action: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000)
):
"""
Get audit log entries (DSGVO compliance).
Admin-only endpoint for reviewing recording access history.
"""
logs = _audit_log.copy()
if recording_id:
logs = [l for l in logs if l.get("recording_id") == recording_id]
if action:
logs = [l for l in logs if l.get("action") == action]
# Sort by created_at descending
logs.sort(key=lambda x: x["created_at"], reverse=True)
return {
"entries": logs[:limit],
"total": len(logs)
}
# ==========================================
# RECORDING MANAGEMENT ENDPOINTS
# ==========================================
@router.get("", response_model=RecordingListResponse)
async def list_recordings(
status: Optional[str] = Query(None, description="Filter by status"),
meeting_id: Optional[str] = Query(None, description="Filter by meeting ID"),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Items per page")
):
"""
List all recordings with optional filtering.
Supports pagination and filtering by status or meeting ID.
"""
# Filter recordings
recordings = list(_recordings_store.values())
if status:
recordings = [r for r in recordings if r["status"] == status]
if meeting_id:
recordings = [r for r in recordings if r["meeting_id"] == meeting_id]
# Sort by recorded_at descending
recordings.sort(key=lambda x: x["recorded_at"], reverse=True)
# Paginate
total = len(recordings)
start = (page - 1) * page_size
end = start + page_size
page_recordings = recordings[start:end]
# Convert to response format
result = []
for rec in page_recordings:
recorded_at = datetime.fromisoformat(rec["recorded_at"])
retention_expires = recorded_at + timedelta(days=rec["retention_days"])
# Check for transcription
trans = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == rec["id"]),
None
)
result.append(RecordingResponse(
id=rec["id"],
meeting_id=rec["meeting_id"],
title=rec.get("title"),
storage_path=rec["storage_path"],
audio_path=rec.get("audio_path"),
file_size_bytes=rec.get("file_size_bytes"),
duration_seconds=rec.get("duration_seconds"),
participant_count=rec.get("participant_count", 0),
status=rec["status"],
recorded_at=recorded_at,
retention_days=rec["retention_days"],
retention_expires_at=retention_expires,
transcription_status=trans["status"] if trans else None,
transcription_id=trans["id"] if trans else None
))
return RecordingListResponse(
recordings=result,
total=total,
page=page,
page_size=page_size
)
@router.get("/{recording_id}", response_model=RecordingResponse)
async def get_recording(recording_id: str):
"""
Get details for a specific recording.
"""
recording = _recordings_store.get(recording_id)
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
# Log view action
log_audit(action="viewed", recording_id=recording_id)
recorded_at = datetime.fromisoformat(recording["recorded_at"])
retention_expires = recorded_at + timedelta(days=recording["retention_days"])
# Check for transcription
trans = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
return RecordingResponse(
id=recording["id"],
meeting_id=recording["meeting_id"],
title=recording.get("title"),
storage_path=recording["storage_path"],
audio_path=recording.get("audio_path"),
file_size_bytes=recording.get("file_size_bytes"),
duration_seconds=recording.get("duration_seconds"),
participant_count=recording.get("participant_count", 0),
status=recording["status"],
recorded_at=recorded_at,
retention_days=recording["retention_days"],
retention_expires_at=retention_expires,
transcription_status=trans["status"] if trans else None,
transcription_id=trans["id"] if trans else None
)
@router.delete("/{recording_id}")
async def delete_recording(
recording_id: str,
reason: str = Query(..., description="Reason for deletion (DSGVO audit)")
):
"""
Soft-delete a recording (DSGVO compliance).
The recording is marked as deleted but retained for audit purposes.
Actual file deletion happens after the audit retention period.
"""
recording = _recordings_store.get(recording_id)
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
# Soft delete
recording["status"] = "deleted"
recording["deleted_at"] = datetime.utcnow().isoformat()
recording["updated_at"] = datetime.utcnow().isoformat()
# Log deletion with reason
log_audit(
action="deleted",
recording_id=recording_id,
metadata={"reason": reason}
)
return {
"success": True,
"recording_id": recording_id,
"status": "deleted",
"message": "Recording marked for deletion"
}
# ==========================================
# TRANSCRIPTION ENDPOINTS
# ==========================================
@router.post("/{recording_id}/transcribe", response_model=TranscriptionStatusResponse)
async def start_transcription(recording_id: str, request: TranscriptionRequest):
"""
Start transcription for a recording.
Queues the recording for processing by the transcription worker.
"""
recording = _recordings_store.get(recording_id)
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
if recording["status"] == "deleted":
raise HTTPException(status_code=400, detail="Cannot transcribe deleted recording")
# Check if transcription already exists
existing = next(
(t for t in _transcriptions_store.values()
if t["recording_id"] == recording_id and t["status"] != "failed"),
None
)
if existing:
raise HTTPException(
status_code=409,
detail=f"Transcription already exists with status: {existing['status']}"
)
# Create transcription entry
transcription_id = str(uuid.uuid4())
now = datetime.utcnow()
transcription = {
"id": transcription_id,
"recording_id": recording_id,
"language": request.language,
"model": request.model,
"status": "pending",
"full_text": None,
"word_count": None,
"confidence_score": None,
"vtt_path": None,
"srt_path": None,
"json_path": None,
"error_message": None,
"processing_started_at": None,
"processing_completed_at": None,
"processing_duration_seconds": None,
"created_at": now.isoformat(),
"updated_at": now.isoformat()
}
_transcriptions_store[transcription_id] = transcription
# Update recording status
recording["status"] = "processing"
recording["updated_at"] = now.isoformat()
# Log transcription start
log_audit(
action="transcription_started",
recording_id=recording_id,
transcription_id=transcription_id,
metadata={"language": request.language, "model": request.model}
)
# TODO: Queue job to Redis/Valkey for transcription worker
# from redis import Redis
# from rq import Queue
# q = Queue(connection=Redis.from_url(os.getenv("REDIS_URL")))
# q.enqueue('transcription_worker.tasks.transcribe', transcription_id, ...)
return TranscriptionStatusResponse(
id=transcription_id,
recording_id=recording_id,
status="pending",
language=request.language,
model=request.model,
word_count=None,
confidence_score=None,
processing_duration_seconds=None,
error_message=None,
created_at=now,
completed_at=None
)
@router.get("/{recording_id}/transcription", response_model=TranscriptionStatusResponse)
async def get_transcription_status(recording_id: str):
"""
Get transcription status for a recording.
"""
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
return TranscriptionStatusResponse(
id=transcription["id"],
recording_id=transcription["recording_id"],
status=transcription["status"],
language=transcription["language"],
model=transcription["model"],
word_count=transcription.get("word_count"),
confidence_score=transcription.get("confidence_score"),
processing_duration_seconds=transcription.get("processing_duration_seconds"),
error_message=transcription.get("error_message"),
created_at=datetime.fromisoformat(transcription["created_at"]),
completed_at=(
datetime.fromisoformat(transcription["processing_completed_at"])
if transcription.get("processing_completed_at") else None
)
)
@router.get("/{recording_id}/transcription/text")
async def get_transcription_text(recording_id: str):
"""
Get the full transcription text.
"""
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
if transcription["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Transcription not ready. Status: {transcription['status']}"
)
return {
"transcription_id": transcription["id"],
"recording_id": recording_id,
"language": transcription["language"],
"text": transcription.get("full_text", ""),
"word_count": transcription.get("word_count", 0)
}
@router.get("/{recording_id}/transcription/vtt")
async def get_transcription_vtt(recording_id: str):
"""
Download transcription as WebVTT subtitle file.
"""
from fastapi.responses import PlainTextResponse
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
if transcription["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Transcription not ready. Status: {transcription['status']}"
)
# Generate VTT content
# In production, this would read from the stored VTT file
vtt_content = "WEBVTT\n\n"
text = transcription.get("full_text", "")
if text:
# Simple VTT generation - split into sentences
sentences = text.replace(".", ".\n").split("\n")
time_offset = 0
for sentence in sentences:
sentence = sentence.strip()
if sentence:
start = format_vtt_time(time_offset)
# Estimate ~3 seconds per sentence
time_offset += 3000
end = format_vtt_time(time_offset)
vtt_content += f"{start} --> {end}\n{sentence}\n\n"
return PlainTextResponse(
content=vtt_content,
media_type="text/vtt",
headers={"Content-Disposition": f"attachment; filename=transcript_{recording_id}.vtt"}
)
@router.get("/{recording_id}/transcription/srt")
async def get_transcription_srt(recording_id: str):
"""
Download transcription as SRT subtitle file.
"""
from fastapi.responses import PlainTextResponse
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this recording")
if transcription["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Transcription not ready. Status: {transcription['status']}"
)
# Generate SRT content
srt_content = ""
text = transcription.get("full_text", "")
if text:
sentences = text.replace(".", ".\n").split("\n")
time_offset = 0
index = 1
for sentence in sentences:
sentence = sentence.strip()
if sentence:
start = format_srt_time(time_offset)
time_offset += 3000
end = format_srt_time(time_offset)
srt_content += f"{index}\n{start} --> {end}\n{sentence}\n\n"
index += 1
return PlainTextResponse(
content=srt_content,
media_type="text/plain",
headers={"Content-Disposition": f"attachment; filename=transcript_{recording_id}.srt"}
)
def format_vtt_time(ms: int) -> str:
"""Format milliseconds to VTT timestamp (HH:MM:SS.mmm)."""
hours = ms // 3600000
minutes = (ms % 3600000) // 60000
seconds = (ms % 60000) // 1000
millis = ms % 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{millis:03d}"
def format_srt_time(ms: int) -> str:
"""Format milliseconds to SRT timestamp (HH:MM:SS,mmm)."""
hours = ms // 3600000
minutes = (ms % 3600000) // 60000
seconds = (ms % 60000) // 1000
millis = ms % 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{millis:03d}"
@router.get("/{recording_id}/download")
async def download_recording(recording_id: str):
"""
Download the recording file.
In production, this would generate a presigned URL to MinIO.
"""
recording = _recordings_store.get(recording_id)
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
if recording["status"] == "deleted":
raise HTTPException(status_code=410, detail="Recording has been deleted")
# Log download action
log_audit(action="downloaded", recording_id=recording_id)
# In production, generate presigned URL to MinIO
# For now, return info about where the file is
return {
"recording_id": recording_id,
"storage_path": recording["storage_path"],
"file_size_bytes": recording.get("file_size_bytes"),
"message": "In production, this would redirect to a presigned MinIO URL"
}
# ==========================================
# MEETING MINUTES ENDPOINTS
# ==========================================
# In-memory store for meeting minutes (dev mode)
_minutes_store: dict = {}
@router.post("/{recording_id}/minutes")
async def generate_meeting_minutes(
recording_id: str,
title: Optional[str] = Query(None, description="Meeting-Titel"),
model: str = Query("breakpilot-teacher-8b", description="LLM Modell")
):
"""
Generiert KI-basierte Meeting Minutes aus der Transkription.
Nutzt das LLM Gateway (Ollama/vLLM) fuer lokale Verarbeitung.
"""
from meeting_minutes_generator import get_minutes_generator, MeetingMinutes
# Check recording exists
recording = _recordings_store.get(recording_id)
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
# Check transcription exists and is completed
transcription = next(
(t for t in _transcriptions_store.values() if t["recording_id"] == recording_id),
None
)
if not transcription:
raise HTTPException(status_code=400, detail="No transcription found. Please transcribe first.")
if transcription["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Transcription not ready. Status: {transcription['status']}"
)
# Check if minutes already exist
existing = _minutes_store.get(recording_id)
if existing and existing.get("status") == "completed":
# Return existing minutes
return existing
# Get transcript text
transcript_text = transcription.get("full_text", "")
if not transcript_text:
raise HTTPException(status_code=400, detail="Transcription has no text content")
# Generate meeting minutes
generator = get_minutes_generator()
try:
minutes = await generator.generate(
transcript=transcript_text,
recording_id=recording_id,
transcription_id=transcription["id"],
title=title,
date=recording.get("recorded_at", "")[:10] if recording.get("recorded_at") else None,
duration_minutes=recording.get("duration_seconds", 0) // 60 if recording.get("duration_seconds") else None,
participant_count=recording.get("participant_count", 0),
model=model
)
# Store minutes
minutes_dict = minutes.model_dump()
minutes_dict["generated_at"] = minutes.generated_at.isoformat()
_minutes_store[recording_id] = minutes_dict
# Log action
log_audit(
action="minutes_generated",
recording_id=recording_id,
metadata={"model": model, "generation_time": minutes.generation_time_seconds}
)
return minutes_dict
except Exception as e:
raise HTTPException(status_code=500, detail=f"Minutes generation failed: {str(e)}")
@router.get("/{recording_id}/minutes")
async def get_meeting_minutes(recording_id: str):
"""
Ruft generierte Meeting Minutes ab.
"""
minutes = _minutes_store.get(recording_id)
if not minutes:
raise HTTPException(status_code=404, detail="No meeting minutes found. Generate them first with POST.")
return minutes
@router.get("/{recording_id}/minutes/markdown")
async def get_minutes_markdown(recording_id: str):
"""
Exportiert Meeting Minutes als Markdown.
"""
from fastapi.responses import PlainTextResponse
from meeting_minutes_generator import minutes_to_markdown, MeetingMinutes
minutes_dict = _minutes_store.get(recording_id)
if not minutes_dict:
raise HTTPException(status_code=404, detail="No meeting minutes found")
# Convert dict back to MeetingMinutes
minutes_dict_copy = minutes_dict.copy()
if isinstance(minutes_dict_copy.get("generated_at"), str):
minutes_dict_copy["generated_at"] = datetime.fromisoformat(minutes_dict_copy["generated_at"])
minutes = MeetingMinutes(**minutes_dict_copy)
markdown = minutes_to_markdown(minutes)
return PlainTextResponse(
content=markdown,
media_type="text/markdown",
headers={"Content-Disposition": f"attachment; filename=protokoll_{recording_id}.md"}
)
@router.get("/{recording_id}/minutes/html")
async def get_minutes_html(recording_id: str):
"""
Exportiert Meeting Minutes als HTML.
"""
from fastapi.responses import HTMLResponse
from meeting_minutes_generator import minutes_to_html, MeetingMinutes
minutes_dict = _minutes_store.get(recording_id)
if not minutes_dict:
raise HTTPException(status_code=404, detail="No meeting minutes found")
# Convert dict back to MeetingMinutes
minutes_dict_copy = minutes_dict.copy()
if isinstance(minutes_dict_copy.get("generated_at"), str):
minutes_dict_copy["generated_at"] = datetime.fromisoformat(minutes_dict_copy["generated_at"])
minutes = MeetingMinutes(**minutes_dict_copy)
html = minutes_to_html(minutes)
return HTMLResponse(content=html)
@router.get("/{recording_id}/minutes/pdf")
async def get_minutes_pdf(recording_id: str):
"""
Exportiert Meeting Minutes als PDF.
Benoetigt WeasyPrint (pip install weasyprint).
"""
from meeting_minutes_generator import minutes_to_html, MeetingMinutes
minutes_dict = _minutes_store.get(recording_id)
if not minutes_dict:
raise HTTPException(status_code=404, detail="No meeting minutes found")
# Convert dict back to MeetingMinutes
minutes_dict_copy = minutes_dict.copy()
if isinstance(minutes_dict_copy.get("generated_at"), str):
minutes_dict_copy["generated_at"] = datetime.fromisoformat(minutes_dict_copy["generated_at"])
minutes = MeetingMinutes(**minutes_dict_copy)
html = minutes_to_html(minutes)
try:
from weasyprint import HTML
from fastapi.responses import Response
pdf_bytes = HTML(string=html).write_pdf()
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=protokoll_{recording_id}.pdf"}
)
except ImportError:
raise HTTPException(
status_code=501,
detail="PDF export not available. Install weasyprint: pip install weasyprint"
)