This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/unit_api.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

1227 lines
41 KiB
Python

# ==============================================
# Breakpilot Drive - Unit API
# ==============================================
# API-Endpunkte fuer kontextuelle Lerneinheiten:
# - Unit-Sessions erstellen und verwalten
# - Telemetrie-Events empfangen
# - Unit-Definitionen abrufen
# - Pre/Post-Check verarbeiten
#
# Mit PostgreSQL-Integration fuer persistente Speicherung.
# Auth: Optional via GAME_REQUIRE_AUTH=true
from fastapi import APIRouter, HTTPException, Query, Depends, Request
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
import uuid
import os
import logging
import jwt
logger = logging.getLogger(__name__)
# Feature flags
USE_DATABASE = os.getenv("GAME_USE_DATABASE", "true").lower() == "true"
REQUIRE_AUTH = os.getenv("GAME_REQUIRE_AUTH", "false").lower() == "true"
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-key-change-in-production")
router = APIRouter(prefix="/api/units", tags=["Breakpilot Units"])
# ==============================================
# Auth Dependency (reuse from game_api)
# ==============================================
async def get_optional_current_user(request: Request) -> Optional[Dict[str, Any]]:
"""Optional auth dependency for Unit API."""
if not REQUIRE_AUTH:
return None
try:
from auth import get_current_user
return await get_current_user(request)
except ImportError:
logger.warning("Auth module not available")
return None
except HTTPException:
raise
except Exception as e:
logger.error(f"Auth error: {e}")
raise HTTPException(status_code=401, detail="Authentication failed")
# ==============================================
# Pydantic Models
# ==============================================
class UnitDefinitionResponse(BaseModel):
"""Unit definition response"""
unit_id: str
template: str
version: str
locale: List[str]
grade_band: List[str]
duration_minutes: int
difficulty: str
definition: Dict[str, Any]
class CreateSessionRequest(BaseModel):
"""Request to create a unit session"""
unit_id: str
student_id: str
locale: str = "de-DE"
difficulty: str = "base"
class SessionResponse(BaseModel):
"""Response after creating a session"""
session_id: str
unit_definition_url: str
session_token: str
telemetry_endpoint: str
expires_at: datetime
class TelemetryEvent(BaseModel):
"""Single telemetry event"""
ts: Optional[str] = None
type: str = Field(..., alias="type")
stop_id: Optional[str] = None
metrics: Optional[Dict[str, Any]] = None
class Config:
populate_by_name = True
class TelemetryPayload(BaseModel):
"""Batch telemetry payload"""
session_id: str
events: List[TelemetryEvent]
class TelemetryResponse(BaseModel):
"""Response after receiving telemetry"""
accepted: int
class PostcheckAnswer(BaseModel):
"""Single postcheck answer"""
question_id: str
answer: str
class CompleteSessionRequest(BaseModel):
"""Request to complete a session"""
postcheck_answers: Optional[List[PostcheckAnswer]] = None
class SessionSummaryResponse(BaseModel):
"""Response with session summary"""
summary: Dict[str, Any]
next_recommendations: Dict[str, Any]
class UnitListItem(BaseModel):
"""Unit list item"""
unit_id: str
template: str
difficulty: str
duration_minutes: int
locale: List[str]
grade_band: List[str]
class RecommendedUnit(BaseModel):
"""Recommended unit with reason"""
unit_id: str
template: str
difficulty: str
reason: str
class CreateUnitRequest(BaseModel):
"""Request to create a new unit definition"""
unit_id: str = Field(..., description="Unique unit identifier")
template: str = Field(..., description="Template type: flight_path or station_loop")
version: str = Field(default="1.0.0", description="Version string")
locale: List[str] = Field(default=["de-DE"], description="Supported locales")
grade_band: List[str] = Field(default=["5", "6", "7"], description="Target grade levels")
duration_minutes: int = Field(default=8, ge=3, le=20, description="Expected duration")
difficulty: str = Field(default="base", description="Difficulty level: base or advanced")
subject: Optional[str] = Field(default=None, description="Subject area")
topic: Optional[str] = Field(default=None, description="Topic within subject")
learning_objectives: List[str] = Field(default=[], description="Learning objectives")
stops: List[Dict[str, Any]] = Field(default=[], description="Unit stops/stations")
precheck: Optional[Dict[str, Any]] = Field(default=None, description="Pre-check configuration")
postcheck: Optional[Dict[str, Any]] = Field(default=None, description="Post-check configuration")
teacher_controls: Optional[Dict[str, Any]] = Field(default=None, description="Teacher control settings")
assets: Optional[Dict[str, Any]] = Field(default=None, description="Asset configuration")
metadata: Optional[Dict[str, Any]] = Field(default=None, description="Additional metadata")
status: str = Field(default="draft", description="Publication status: draft or published")
class UpdateUnitRequest(BaseModel):
"""Request to update an existing unit definition"""
version: Optional[str] = None
locale: Optional[List[str]] = None
grade_band: Optional[List[str]] = None
duration_minutes: Optional[int] = Field(default=None, ge=3, le=20)
difficulty: Optional[str] = None
subject: Optional[str] = None
topic: Optional[str] = None
learning_objectives: Optional[List[str]] = None
stops: Optional[List[Dict[str, Any]]] = None
precheck: Optional[Dict[str, Any]] = None
postcheck: Optional[Dict[str, Any]] = None
teacher_controls: Optional[Dict[str, Any]] = None
assets: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
status: Optional[str] = None
class ValidationError(BaseModel):
"""Single validation error"""
field: str
message: str
severity: str = "error" # error or warning
class ValidationResult(BaseModel):
"""Result of unit validation"""
valid: bool
errors: List[ValidationError] = []
warnings: List[ValidationError] = []
# ==============================================
# Database Integration
# ==============================================
_unit_db = None
async def get_unit_database():
"""Get unit database instance with lazy initialization."""
global _unit_db
if not USE_DATABASE:
return None
if _unit_db is None:
try:
from unit.database import get_unit_db
_unit_db = await get_unit_db()
logger.info("Unit database initialized")
except ImportError:
logger.warning("Unit database module not available")
except Exception as e:
logger.warning(f"Unit database not available: {e}")
return _unit_db
# ==============================================
# Helper Functions
# ==============================================
def create_session_token(session_id: str, student_id: str, expires_hours: int = 4) -> str:
"""Create a JWT session token for telemetry authentication."""
payload = {
"session_id": session_id,
"student_id": student_id,
"exp": datetime.utcnow() + timedelta(hours=expires_hours),
"iat": datetime.utcnow(),
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_session_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify a session token and return payload."""
try:
return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
async def get_session_from_token(request: Request) -> Optional[Dict[str, Any]]:
"""Extract and verify session from Authorization header."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return None
token = auth_header[7:]
return verify_session_token(token)
def validate_unit_definition(unit_data: Dict[str, Any]) -> ValidationResult:
"""
Validate a unit definition structure.
Returns validation result with errors and warnings.
"""
errors = []
warnings = []
# Required fields
if not unit_data.get("unit_id"):
errors.append(ValidationError(field="unit_id", message="unit_id ist erforderlich"))
if not unit_data.get("template"):
errors.append(ValidationError(field="template", message="template ist erforderlich"))
elif unit_data["template"] not in ["flight_path", "station_loop"]:
errors.append(ValidationError(
field="template",
message="template muss 'flight_path' oder 'station_loop' sein"
))
# Validate stops
stops = unit_data.get("stops", [])
if not stops:
errors.append(ValidationError(field="stops", message="Mindestens 1 Stop erforderlich"))
else:
# Check minimum stops for flight_path
if unit_data.get("template") == "flight_path" and len(stops) < 3:
warnings.append(ValidationError(
field="stops",
message="FlightPath sollte mindestens 3 Stops haben",
severity="warning"
))
# Validate each stop
stop_ids = set()
for i, stop in enumerate(stops):
if not stop.get("stop_id"):
errors.append(ValidationError(
field=f"stops[{i}].stop_id",
message=f"Stop {i}: stop_id fehlt"
))
else:
if stop["stop_id"] in stop_ids:
errors.append(ValidationError(
field=f"stops[{i}].stop_id",
message=f"Stop {i}: Doppelte stop_id '{stop['stop_id']}'"
))
stop_ids.add(stop["stop_id"])
# Check interaction type
interaction = stop.get("interaction", {})
if not interaction.get("type"):
errors.append(ValidationError(
field=f"stops[{i}].interaction.type",
message=f"Stop {stop.get('stop_id', i)}: Interaktionstyp fehlt"
))
elif interaction["type"] not in [
"aim_and_pass", "slider_adjust", "slider_equivalence",
"sequence_arrange", "toggle_switch", "drag_match",
"error_find", "transfer_apply"
]:
warnings.append(ValidationError(
field=f"stops[{i}].interaction.type",
message=f"Stop {stop.get('stop_id', i)}: Unbekannter Interaktionstyp '{interaction['type']}'",
severity="warning"
))
# Check for label
if not stop.get("label"):
warnings.append(ValidationError(
field=f"stops[{i}].label",
message=f"Stop {stop.get('stop_id', i)}: Label fehlt",
severity="warning"
))
# Validate duration
duration = unit_data.get("duration_minutes", 0)
if duration < 3 or duration > 20:
warnings.append(ValidationError(
field="duration_minutes",
message="Dauer sollte zwischen 3 und 20 Minuten liegen",
severity="warning"
))
# Validate difficulty
if unit_data.get("difficulty") and unit_data["difficulty"] not in ["base", "advanced"]:
warnings.append(ValidationError(
field="difficulty",
message="difficulty sollte 'base' oder 'advanced' sein",
severity="warning"
))
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings
)
# ==============================================
# API Endpoints
# ==============================================
@router.get("/definitions", response_model=List[UnitListItem])
async def list_unit_definitions(
template: Optional[str] = Query(None, description="Filter by template: flight_path, station_loop"),
grade: Optional[str] = Query(None, description="Filter by grade level"),
locale: str = Query("de-DE", description="Filter by locale"),
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> List[UnitListItem]:
"""
List available unit definitions.
Returns published units matching the filter criteria.
"""
db = await get_unit_database()
if db:
try:
units = await db.list_units(
template=template,
grade=grade,
locale=locale,
published_only=True
)
return [
UnitListItem(
unit_id=u["unit_id"],
template=u["template"],
difficulty=u["difficulty"],
duration_minutes=u["duration_minutes"],
locale=u["locale"],
grade_band=u["grade_band"],
)
for u in units
]
except Exception as e:
logger.error(f"Failed to list units: {e}")
# Fallback: return demo unit
return [
UnitListItem(
unit_id="demo_unit_v1",
template="flight_path",
difficulty="base",
duration_minutes=5,
locale=["de-DE"],
grade_band=["5", "6", "7"],
)
]
@router.get("/definitions/{unit_id}", response_model=UnitDefinitionResponse)
async def get_unit_definition(
unit_id: str,
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> UnitDefinitionResponse:
"""
Get a specific unit definition.
Returns the full unit configuration including stops, interactions, etc.
"""
db = await get_unit_database()
if db:
try:
unit = await db.get_unit_definition(unit_id)
if unit:
return UnitDefinitionResponse(
unit_id=unit["unit_id"],
template=unit["template"],
version=unit["version"],
locale=unit["locale"],
grade_band=unit["grade_band"],
duration_minutes=unit["duration_minutes"],
difficulty=unit["difficulty"],
definition=unit["definition"],
)
except Exception as e:
logger.error(f"Failed to get unit definition: {e}")
# Demo unit fallback
if unit_id == "demo_unit_v1":
return UnitDefinitionResponse(
unit_id="demo_unit_v1",
template="flight_path",
version="1.0.0",
locale=["de-DE"],
grade_band=["5", "6", "7"],
duration_minutes=5,
difficulty="base",
definition={
"unit_id": "demo_unit_v1",
"template": "flight_path",
"version": "1.0.0",
"learning_objectives": ["Demo: Grundfunktion testen"],
"stops": [
{"stop_id": "stop_1", "label": {"de-DE": "Start"}, "interaction": {"type": "aim_and_pass"}},
{"stop_id": "stop_2", "label": {"de-DE": "Mitte"}, "interaction": {"type": "aim_and_pass"}},
{"stop_id": "stop_3", "label": {"de-DE": "Ende"}, "interaction": {"type": "aim_and_pass"}},
],
"teacher_controls": {"allow_skip": True, "allow_replay": True},
},
)
raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}")
@router.post("/definitions", response_model=UnitDefinitionResponse)
async def create_unit_definition(
request_data: CreateUnitRequest,
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> UnitDefinitionResponse:
"""
Create a new unit definition.
- Validates unit structure
- Saves to database or JSON file
- Returns created unit
"""
import json
from pathlib import Path
# Build full definition
definition = {
"unit_id": request_data.unit_id,
"template": request_data.template,
"version": request_data.version,
"locale": request_data.locale,
"grade_band": request_data.grade_band,
"duration_minutes": request_data.duration_minutes,
"difficulty": request_data.difficulty,
"subject": request_data.subject,
"topic": request_data.topic,
"learning_objectives": request_data.learning_objectives,
"stops": request_data.stops,
"precheck": request_data.precheck or {
"question_set_id": f"{request_data.unit_id}_precheck",
"required": True,
"time_limit_seconds": 120
},
"postcheck": request_data.postcheck or {
"question_set_id": f"{request_data.unit_id}_postcheck",
"required": True,
"time_limit_seconds": 180
},
"teacher_controls": request_data.teacher_controls or {
"allow_skip": True,
"allow_replay": True,
"max_time_per_stop_sec": 90,
"show_hints": True,
"require_precheck": True,
"require_postcheck": True
},
"assets": request_data.assets or {},
"metadata": request_data.metadata or {
"author": user.get("email", "Unknown") if user else "Unknown",
"created": datetime.utcnow().isoformat(),
"curriculum_reference": ""
}
}
# Validate
validation = validate_unit_definition(definition)
if not validation.valid:
error_msgs = [f"{e.field}: {e.message}" for e in validation.errors]
raise HTTPException(status_code=400, detail=f"Validierung fehlgeschlagen: {'; '.join(error_msgs)}")
# Check if unit_id already exists
db = await get_unit_database()
if db:
try:
existing = await db.get_unit_definition(request_data.unit_id)
if existing:
raise HTTPException(status_code=409, detail=f"Unit existiert bereits: {request_data.unit_id}")
# Save to database
await db.create_unit_definition(
unit_id=request_data.unit_id,
template=request_data.template,
version=request_data.version,
locale=request_data.locale,
grade_band=request_data.grade_band,
duration_minutes=request_data.duration_minutes,
difficulty=request_data.difficulty,
definition=definition,
status=request_data.status
)
logger.info(f"Unit created in database: {request_data.unit_id}")
except HTTPException:
raise
except Exception as e:
logger.warning(f"Database save failed, using JSON fallback: {e}")
# Fallback to JSON
units_dir = Path(__file__).parent / "data" / "units"
units_dir.mkdir(parents=True, exist_ok=True)
json_path = units_dir / f"{request_data.unit_id}.json"
if json_path.exists():
raise HTTPException(status_code=409, detail=f"Unit existiert bereits: {request_data.unit_id}")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(definition, f, ensure_ascii=False, indent=2)
logger.info(f"Unit created as JSON: {json_path}")
else:
# JSON only mode
units_dir = Path(__file__).parent / "data" / "units"
units_dir.mkdir(parents=True, exist_ok=True)
json_path = units_dir / f"{request_data.unit_id}.json"
if json_path.exists():
raise HTTPException(status_code=409, detail=f"Unit existiert bereits: {request_data.unit_id}")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(definition, f, ensure_ascii=False, indent=2)
logger.info(f"Unit created as JSON: {json_path}")
return UnitDefinitionResponse(
unit_id=request_data.unit_id,
template=request_data.template,
version=request_data.version,
locale=request_data.locale,
grade_band=request_data.grade_band,
duration_minutes=request_data.duration_minutes,
difficulty=request_data.difficulty,
definition=definition
)
@router.put("/definitions/{unit_id}", response_model=UnitDefinitionResponse)
async def update_unit_definition(
unit_id: str,
request_data: UpdateUnitRequest,
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> UnitDefinitionResponse:
"""
Update an existing unit definition.
- Merges updates with existing definition
- Re-validates
- Saves updated version
"""
import json
from pathlib import Path
# Get existing unit
db = await get_unit_database()
existing = None
if db:
try:
existing = await db.get_unit_definition(unit_id)
except Exception as e:
logger.warning(f"Database read failed: {e}")
if not existing:
# Try JSON file
json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json"
if json_path.exists():
with open(json_path, "r", encoding="utf-8") as f:
file_data = json.load(f)
existing = {
"unit_id": file_data.get("unit_id"),
"template": file_data.get("template"),
"version": file_data.get("version", "1.0.0"),
"locale": file_data.get("locale", ["de-DE"]),
"grade_band": file_data.get("grade_band", []),
"duration_minutes": file_data.get("duration_minutes", 8),
"difficulty": file_data.get("difficulty", "base"),
"definition": file_data
}
if not existing:
raise HTTPException(status_code=404, detail=f"Unit nicht gefunden: {unit_id}")
# Merge updates into existing definition
definition = existing.get("definition", {})
update_dict = request_data.model_dump(exclude_unset=True)
for key, value in update_dict.items():
if value is not None:
definition[key] = value
# Validate updated definition
validation = validate_unit_definition(definition)
if not validation.valid:
error_msgs = [f"{e.field}: {e.message}" for e in validation.errors]
raise HTTPException(status_code=400, detail=f"Validierung fehlgeschlagen: {'; '.join(error_msgs)}")
# Save
if db:
try:
await db.update_unit_definition(
unit_id=unit_id,
version=definition.get("version"),
locale=definition.get("locale"),
grade_band=definition.get("grade_band"),
duration_minutes=definition.get("duration_minutes"),
difficulty=definition.get("difficulty"),
definition=definition,
status=update_dict.get("status")
)
logger.info(f"Unit updated in database: {unit_id}")
except Exception as e:
logger.warning(f"Database update failed, using JSON: {e}")
json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json"
with open(json_path, "w", encoding="utf-8") as f:
json.dump(definition, f, ensure_ascii=False, indent=2)
else:
json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json"
with open(json_path, "w", encoding="utf-8") as f:
json.dump(definition, f, ensure_ascii=False, indent=2)
logger.info(f"Unit updated as JSON: {json_path}")
return UnitDefinitionResponse(
unit_id=unit_id,
template=definition.get("template", existing.get("template")),
version=definition.get("version", existing.get("version", "1.0.0")),
locale=definition.get("locale", existing.get("locale", ["de-DE"])),
grade_band=definition.get("grade_band", existing.get("grade_band", [])),
duration_minutes=definition.get("duration_minutes", existing.get("duration_minutes", 8)),
difficulty=definition.get("difficulty", existing.get("difficulty", "base")),
definition=definition
)
@router.delete("/definitions/{unit_id}")
async def delete_unit_definition(
unit_id: str,
force: bool = Query(False, description="Force delete even if published"),
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> Dict[str, Any]:
"""
Delete a unit definition.
- By default, only drafts can be deleted
- Use force=true to delete published units
"""
import json
from pathlib import Path
db = await get_unit_database()
deleted = False
if db:
try:
existing = await db.get_unit_definition(unit_id)
if existing:
status = existing.get("status", "draft")
if status == "published" and not force:
raise HTTPException(
status_code=400,
detail="Veroeffentlichte Units koennen nicht geloescht werden. Verwende force=true."
)
await db.delete_unit_definition(unit_id)
deleted = True
logger.info(f"Unit deleted from database: {unit_id}")
except HTTPException:
raise
except Exception as e:
logger.warning(f"Database delete failed: {e}")
# Also check JSON file
json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json"
if json_path.exists():
json_path.unlink()
deleted = True
logger.info(f"Unit JSON deleted: {json_path}")
if not deleted:
raise HTTPException(status_code=404, detail=f"Unit nicht gefunden: {unit_id}")
return {"success": True, "unit_id": unit_id, "message": "Unit geloescht"}
@router.post("/definitions/validate", response_model=ValidationResult)
async def validate_unit(
unit_data: Dict[str, Any],
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> ValidationResult:
"""
Validate a unit definition without saving.
Returns validation result with errors and warnings.
"""
return validate_unit_definition(unit_data)
@router.post("/sessions", response_model=SessionResponse)
async def create_unit_session(
request_data: CreateSessionRequest,
request: Request,
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> SessionResponse:
"""
Create a new unit session.
- Validates unit exists
- Creates session record
- Returns session token for telemetry
"""
session_id = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(hours=4)
# Validate unit exists
db = await get_unit_database()
if db:
try:
unit = await db.get_unit_definition(request_data.unit_id)
if not unit:
raise HTTPException(status_code=404, detail=f"Unit not found: {request_data.unit_id}")
# Create session in database
total_stops = len(unit.get("definition", {}).get("stops", []))
await db.create_session(
session_id=session_id,
unit_id=request_data.unit_id,
student_id=request_data.student_id,
locale=request_data.locale,
difficulty=request_data.difficulty,
total_stops=total_stops,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create session: {e}")
# Continue with in-memory fallback
# Create session token
session_token = create_session_token(session_id, request_data.student_id)
# Build definition URL
base_url = str(request.base_url).rstrip("/")
definition_url = f"{base_url}/api/units/definitions/{request_data.unit_id}"
return SessionResponse(
session_id=session_id,
unit_definition_url=definition_url,
session_token=session_token,
telemetry_endpoint="/api/units/telemetry",
expires_at=expires_at,
)
@router.post("/telemetry", response_model=TelemetryResponse)
async def receive_telemetry(
payload: TelemetryPayload,
request: Request,
) -> TelemetryResponse:
"""
Receive batched telemetry events from Unity client.
- Validates session token
- Stores events in database
- Returns count of accepted events
"""
# Verify session token
session_data = await get_session_from_token(request)
if session_data is None:
# Allow without auth in dev mode
if REQUIRE_AUTH:
raise HTTPException(status_code=401, detail="Invalid or expired session token")
logger.warning("Telemetry received without valid token (dev mode)")
# Verify session_id matches
if session_data and session_data.get("session_id") != payload.session_id:
raise HTTPException(status_code=403, detail="Session ID mismatch")
accepted = 0
db = await get_unit_database()
for event in payload.events:
try:
# Set timestamp if not provided
timestamp = event.ts or datetime.utcnow().isoformat()
if db:
await db.store_telemetry_event(
session_id=payload.session_id,
event_type=event.type,
stop_id=event.stop_id,
timestamp=timestamp,
metrics=event.metrics,
)
accepted += 1
logger.debug(f"Telemetry: {event.type} for session {payload.session_id}")
except Exception as e:
logger.error(f"Failed to store telemetry event: {e}")
return TelemetryResponse(accepted=accepted)
@router.post("/sessions/{session_id}/complete", response_model=SessionSummaryResponse)
async def complete_session(
session_id: str,
request_data: CompleteSessionRequest,
request: Request,
) -> SessionSummaryResponse:
"""
Complete a unit session.
- Processes postcheck answers if provided
- Calculates learning gain
- Returns summary and recommendations
"""
# Verify session token
session_data = await get_session_from_token(request)
if REQUIRE_AUTH and session_data is None:
raise HTTPException(status_code=401, detail="Invalid or expired session token")
db = await get_unit_database()
summary = {}
recommendations = {}
if db:
try:
# Get session data
session = await db.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
# Calculate postcheck score if answers provided
postcheck_score = None
if request_data.postcheck_answers:
# Simple scoring: count correct answers
# In production, would validate against question bank
postcheck_score = len(request_data.postcheck_answers) * 0.2 # Placeholder
postcheck_score = min(postcheck_score, 1.0)
# Complete session in database
await db.complete_session(
session_id=session_id,
postcheck_score=postcheck_score,
)
# Get updated session summary
session = await db.get_session(session_id)
# Calculate learning gain
pre_score = session.get("precheck_score")
post_score = session.get("postcheck_score")
learning_gain = None
if pre_score is not None and post_score is not None:
learning_gain = post_score - pre_score
summary = {
"session_id": session_id,
"unit_id": session.get("unit_id"),
"duration_seconds": session.get("duration_seconds"),
"completion_rate": session.get("completion_rate"),
"precheck_score": pre_score,
"postcheck_score": post_score,
"pre_to_post_gain": learning_gain,
"stops_completed": session.get("stops_completed"),
"total_stops": session.get("total_stops"),
}
# Get recommendations
recommendations = await db.get_recommendations(
student_id=session.get("student_id"),
completed_unit_id=session.get("unit_id"),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to complete session: {e}")
summary = {"session_id": session_id, "error": str(e)}
else:
# Fallback summary
summary = {
"session_id": session_id,
"duration_seconds": 0,
"completion_rate": 1.0,
"message": "Database not available",
}
return SessionSummaryResponse(
summary=summary,
next_recommendations=recommendations or {
"h5p_activity_ids": [],
"worksheet_pdf_url": None,
},
)
@router.get("/sessions/{session_id}")
async def get_session(
session_id: str,
request: Request,
) -> Dict[str, Any]:
"""
Get session details.
Returns current state of a session including progress.
"""
# Verify session token
session_data = await get_session_from_token(request)
if REQUIRE_AUTH and session_data is None:
raise HTTPException(status_code=401, detail="Invalid or expired session token")
db = await get_unit_database()
if db:
try:
session = await db.get_session(session_id)
if session:
return session
except Exception as e:
logger.error(f"Failed to get session: {e}")
raise HTTPException(status_code=404, detail="Session not found")
@router.get("/recommendations/{student_id}", response_model=List[RecommendedUnit])
async def get_recommendations(
student_id: str,
grade: Optional[str] = Query(None, description="Grade level filter"),
locale: str = Query("de-DE", description="Locale filter"),
limit: int = Query(5, ge=1, le=20),
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> List[RecommendedUnit]:
"""
Get recommended units for a student.
Based on completion status and performance.
"""
db = await get_unit_database()
if db:
try:
recommendations = await db.get_student_recommendations(
student_id=student_id,
grade=grade,
locale=locale,
limit=limit,
)
return [
RecommendedUnit(
unit_id=r["unit_id"],
template=r["template"],
difficulty=r["difficulty"],
reason=r["reason"],
)
for r in recommendations
]
except Exception as e:
logger.error(f"Failed to get recommendations: {e}")
# Fallback: recommend demo unit
return [
RecommendedUnit(
unit_id="demo_unit_v1",
template="flight_path",
difficulty="base",
reason="Neu: Noch nicht gespielt",
)
]
@router.get("/analytics/student/{student_id}")
async def get_student_analytics(
student_id: str,
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> Dict[str, Any]:
"""
Get unit analytics for a student.
Includes completion rates, learning gains, time spent.
"""
db = await get_unit_database()
if db:
try:
analytics = await db.get_student_unit_analytics(student_id)
return analytics
except Exception as e:
logger.error(f"Failed to get analytics: {e}")
return {
"student_id": student_id,
"units_attempted": 0,
"units_completed": 0,
"avg_completion_rate": 0.0,
"avg_learning_gain": None,
"total_minutes": 0,
}
@router.get("/analytics/unit/{unit_id}")
async def get_unit_analytics(
unit_id: str,
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> Dict[str, Any]:
"""
Get analytics for a specific unit.
Shows aggregate performance across all students.
"""
db = await get_unit_database()
if db:
try:
analytics = await db.get_unit_performance(unit_id)
return analytics
except Exception as e:
logger.error(f"Failed to get unit analytics: {e}")
return {
"unit_id": unit_id,
"total_sessions": 0,
"completed_sessions": 0,
"completion_percent": 0.0,
"avg_duration_minutes": 0,
"avg_learning_gain": None,
}
# ==============================================
# Content Generation Endpoints
# ==============================================
@router.get("/content/{unit_id}/h5p")
async def generate_h5p_content(
unit_id: str,
locale: str = Query("de-DE", description="Target locale"),
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> Dict[str, Any]:
"""
Generate H5P content items for a unit.
Returns H5P-compatible content structures for:
- Drag and Drop (vocabulary matching)
- Fill in the Blanks (concept texts)
- Multiple Choice (misconception targeting)
"""
from content_generators import generate_h5p_for_unit, H5PGenerator, generate_h5p_manifest
# Get unit definition
db = await get_unit_database()
unit_def = None
if db:
try:
unit = await db.get_unit_definition(unit_id)
if unit:
unit_def = unit.get("definition", {})
except Exception as e:
logger.error(f"Failed to get unit for H5P generation: {e}")
if not unit_def:
raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}")
try:
generator = H5PGenerator(locale=locale)
contents = generator.generate_from_unit(unit_def)
manifest = generate_h5p_manifest(contents, unit_id)
return {
"unit_id": unit_id,
"locale": locale,
"generated_count": len(contents),
"manifest": manifest,
"contents": [c.to_h5p_structure() for c in contents]
}
except Exception as e:
logger.error(f"H5P generation failed: {e}")
raise HTTPException(status_code=500, detail=f"H5P generation failed: {str(e)}")
@router.get("/content/{unit_id}/worksheet")
async def generate_worksheet_html(
unit_id: str,
locale: str = Query("de-DE", description="Target locale"),
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
) -> Dict[str, Any]:
"""
Generate worksheet HTML for a unit.
Returns HTML that can be:
- Displayed in browser
- Converted to PDF using weasyprint
- Printed directly
"""
from content_generators import PDFGenerator
# Get unit definition
db = await get_unit_database()
unit_def = None
if db:
try:
unit = await db.get_unit_definition(unit_id)
if unit:
unit_def = unit.get("definition", {})
except Exception as e:
logger.error(f"Failed to get unit for worksheet generation: {e}")
if not unit_def:
raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}")
try:
generator = PDFGenerator(locale=locale)
worksheet = generator.generate_from_unit(unit_def)
return {
"unit_id": unit_id,
"locale": locale,
"title": worksheet.title,
"sections": len(worksheet.sections),
"html": worksheet.to_html()
}
except Exception as e:
logger.error(f"Worksheet generation failed: {e}")
raise HTTPException(status_code=500, detail=f"Worksheet generation failed: {str(e)}")
@router.get("/content/{unit_id}/worksheet.pdf")
async def download_worksheet_pdf(
unit_id: str,
locale: str = Query("de-DE", description="Target locale"),
user: Optional[Dict[str, Any]] = Depends(get_optional_current_user)
):
"""
Generate and download worksheet as PDF.
Requires weasyprint to be installed on the server.
"""
from fastapi.responses import Response
# Get unit definition
db = await get_unit_database()
unit_def = None
if db:
try:
unit = await db.get_unit_definition(unit_id)
if unit:
unit_def = unit.get("definition", {})
except Exception as e:
logger.error(f"Failed to get unit for PDF generation: {e}")
if not unit_def:
raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}")
try:
from content_generators import generate_worksheet_pdf
pdf_bytes = generate_worksheet_pdf(unit_def, locale)
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={
"Content-Disposition": f'attachment; filename="{unit_id}_worksheet.pdf"'
}
)
except ImportError:
raise HTTPException(
status_code=501,
detail="PDF generation not available. Install weasyprint: pip install weasyprint"
)
except Exception as e:
logger.error(f"PDF generation failed: {e}")
raise HTTPException(status_code=500, detail=f"PDF generation failed: {str(e)}")
@router.get("/health")
async def health_check() -> Dict[str, Any]:
"""Health check for unit API."""
db = await get_unit_database()
db_status = "connected" if db else "disconnected"
return {
"status": "healthy",
"service": "breakpilot-units",
"database": db_status,
"auth_required": REQUIRE_AUTH,
}