# ============================================== # Breakpilot Drive - Unit API # ============================================== # API-Endpunkte fuer kontextuelle Lerneinheiten: # - Unit-Sessions erstellen und verwalten # - Telemetrie-Events empfangen # - Unit-Definitionen abrufen # - Pre/Post-Check verarbeiten # # Mit PostgreSQL-Integration fuer persistente Speicherung. # Auth: Optional via GAME_REQUIRE_AUTH=true from fastapi import APIRouter, HTTPException, Query, Depends, Request from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any from datetime import datetime, timedelta import uuid import os import logging import jwt logger = logging.getLogger(__name__) # Feature flags USE_DATABASE = os.getenv("GAME_USE_DATABASE", "true").lower() == "true" REQUIRE_AUTH = os.getenv("GAME_REQUIRE_AUTH", "false").lower() == "true" SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-key-change-in-production") router = APIRouter(prefix="/api/units", tags=["Breakpilot Units"]) # ============================================== # Auth Dependency (reuse from game_api) # ============================================== async def get_optional_current_user(request: Request) -> Optional[Dict[str, Any]]: """Optional auth dependency for Unit API.""" if not REQUIRE_AUTH: return None try: from auth import get_current_user return await get_current_user(request) except ImportError: logger.warning("Auth module not available") return None except HTTPException: raise except Exception as e: logger.error(f"Auth error: {e}") raise HTTPException(status_code=401, detail="Authentication failed") # ============================================== # Pydantic Models # ============================================== class UnitDefinitionResponse(BaseModel): """Unit definition response""" unit_id: str template: str version: str locale: List[str] grade_band: List[str] duration_minutes: int difficulty: str definition: Dict[str, Any] class CreateSessionRequest(BaseModel): """Request to create a unit session""" unit_id: str student_id: str locale: str = "de-DE" difficulty: str = "base" class SessionResponse(BaseModel): """Response after creating a session""" session_id: str unit_definition_url: str session_token: str telemetry_endpoint: str expires_at: datetime class TelemetryEvent(BaseModel): """Single telemetry event""" ts: Optional[str] = None type: str = Field(..., alias="type") stop_id: Optional[str] = None metrics: Optional[Dict[str, Any]] = None class Config: populate_by_name = True class TelemetryPayload(BaseModel): """Batch telemetry payload""" session_id: str events: List[TelemetryEvent] class TelemetryResponse(BaseModel): """Response after receiving telemetry""" accepted: int class PostcheckAnswer(BaseModel): """Single postcheck answer""" question_id: str answer: str class CompleteSessionRequest(BaseModel): """Request to complete a session""" postcheck_answers: Optional[List[PostcheckAnswer]] = None class SessionSummaryResponse(BaseModel): """Response with session summary""" summary: Dict[str, Any] next_recommendations: Dict[str, Any] class UnitListItem(BaseModel): """Unit list item""" unit_id: str template: str difficulty: str duration_minutes: int locale: List[str] grade_band: List[str] class RecommendedUnit(BaseModel): """Recommended unit with reason""" unit_id: str template: str difficulty: str reason: str class CreateUnitRequest(BaseModel): """Request to create a new unit definition""" unit_id: str = Field(..., description="Unique unit identifier") template: str = Field(..., description="Template type: flight_path or station_loop") version: str = Field(default="1.0.0", description="Version string") locale: List[str] = Field(default=["de-DE"], description="Supported locales") grade_band: List[str] = Field(default=["5", "6", "7"], description="Target grade levels") duration_minutes: int = Field(default=8, ge=3, le=20, description="Expected duration") difficulty: str = Field(default="base", description="Difficulty level: base or advanced") subject: Optional[str] = Field(default=None, description="Subject area") topic: Optional[str] = Field(default=None, description="Topic within subject") learning_objectives: List[str] = Field(default=[], description="Learning objectives") stops: List[Dict[str, Any]] = Field(default=[], description="Unit stops/stations") precheck: Optional[Dict[str, Any]] = Field(default=None, description="Pre-check configuration") postcheck: Optional[Dict[str, Any]] = Field(default=None, description="Post-check configuration") teacher_controls: Optional[Dict[str, Any]] = Field(default=None, description="Teacher control settings") assets: Optional[Dict[str, Any]] = Field(default=None, description="Asset configuration") metadata: Optional[Dict[str, Any]] = Field(default=None, description="Additional metadata") status: str = Field(default="draft", description="Publication status: draft or published") class UpdateUnitRequest(BaseModel): """Request to update an existing unit definition""" version: Optional[str] = None locale: Optional[List[str]] = None grade_band: Optional[List[str]] = None duration_minutes: Optional[int] = Field(default=None, ge=3, le=20) difficulty: Optional[str] = None subject: Optional[str] = None topic: Optional[str] = None learning_objectives: Optional[List[str]] = None stops: Optional[List[Dict[str, Any]]] = None precheck: Optional[Dict[str, Any]] = None postcheck: Optional[Dict[str, Any]] = None teacher_controls: Optional[Dict[str, Any]] = None assets: Optional[Dict[str, Any]] = None metadata: Optional[Dict[str, Any]] = None status: Optional[str] = None class ValidationError(BaseModel): """Single validation error""" field: str message: str severity: str = "error" # error or warning class ValidationResult(BaseModel): """Result of unit validation""" valid: bool errors: List[ValidationError] = [] warnings: List[ValidationError] = [] # ============================================== # Database Integration # ============================================== _unit_db = None async def get_unit_database(): """Get unit database instance with lazy initialization.""" global _unit_db if not USE_DATABASE: return None if _unit_db is None: try: from unit.database import get_unit_db _unit_db = await get_unit_db() logger.info("Unit database initialized") except ImportError: logger.warning("Unit database module not available") except Exception as e: logger.warning(f"Unit database not available: {e}") return _unit_db # ============================================== # Helper Functions # ============================================== def create_session_token(session_id: str, student_id: str, expires_hours: int = 4) -> str: """Create a JWT session token for telemetry authentication.""" payload = { "session_id": session_id, "student_id": student_id, "exp": datetime.utcnow() + timedelta(hours=expires_hours), "iat": datetime.utcnow(), } return jwt.encode(payload, SECRET_KEY, algorithm="HS256") def verify_session_token(token: str) -> Optional[Dict[str, Any]]: """Verify a session token and return payload.""" try: return jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None async def get_session_from_token(request: Request) -> Optional[Dict[str, Any]]: """Extract and verify session from Authorization header.""" auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return None token = auth_header[7:] return verify_session_token(token) def validate_unit_definition(unit_data: Dict[str, Any]) -> ValidationResult: """ Validate a unit definition structure. Returns validation result with errors and warnings. """ errors = [] warnings = [] # Required fields if not unit_data.get("unit_id"): errors.append(ValidationError(field="unit_id", message="unit_id ist erforderlich")) if not unit_data.get("template"): errors.append(ValidationError(field="template", message="template ist erforderlich")) elif unit_data["template"] not in ["flight_path", "station_loop"]: errors.append(ValidationError( field="template", message="template muss 'flight_path' oder 'station_loop' sein" )) # Validate stops stops = unit_data.get("stops", []) if not stops: errors.append(ValidationError(field="stops", message="Mindestens 1 Stop erforderlich")) else: # Check minimum stops for flight_path if unit_data.get("template") == "flight_path" and len(stops) < 3: warnings.append(ValidationError( field="stops", message="FlightPath sollte mindestens 3 Stops haben", severity="warning" )) # Validate each stop stop_ids = set() for i, stop in enumerate(stops): if not stop.get("stop_id"): errors.append(ValidationError( field=f"stops[{i}].stop_id", message=f"Stop {i}: stop_id fehlt" )) else: if stop["stop_id"] in stop_ids: errors.append(ValidationError( field=f"stops[{i}].stop_id", message=f"Stop {i}: Doppelte stop_id '{stop['stop_id']}'" )) stop_ids.add(stop["stop_id"]) # Check interaction type interaction = stop.get("interaction", {}) if not interaction.get("type"): errors.append(ValidationError( field=f"stops[{i}].interaction.type", message=f"Stop {stop.get('stop_id', i)}: Interaktionstyp fehlt" )) elif interaction["type"] not in [ "aim_and_pass", "slider_adjust", "slider_equivalence", "sequence_arrange", "toggle_switch", "drag_match", "error_find", "transfer_apply" ]: warnings.append(ValidationError( field=f"stops[{i}].interaction.type", message=f"Stop {stop.get('stop_id', i)}: Unbekannter Interaktionstyp '{interaction['type']}'", severity="warning" )) # Check for label if not stop.get("label"): warnings.append(ValidationError( field=f"stops[{i}].label", message=f"Stop {stop.get('stop_id', i)}: Label fehlt", severity="warning" )) # Validate duration duration = unit_data.get("duration_minutes", 0) if duration < 3 or duration > 20: warnings.append(ValidationError( field="duration_minutes", message="Dauer sollte zwischen 3 und 20 Minuten liegen", severity="warning" )) # Validate difficulty if unit_data.get("difficulty") and unit_data["difficulty"] not in ["base", "advanced"]: warnings.append(ValidationError( field="difficulty", message="difficulty sollte 'base' oder 'advanced' sein", severity="warning" )) return ValidationResult( valid=len(errors) == 0, errors=errors, warnings=warnings ) # ============================================== # API Endpoints # ============================================== @router.get("/definitions", response_model=List[UnitListItem]) async def list_unit_definitions( template: Optional[str] = Query(None, description="Filter by template: flight_path, station_loop"), grade: Optional[str] = Query(None, description="Filter by grade level"), locale: str = Query("de-DE", description="Filter by locale"), user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> List[UnitListItem]: """ List available unit definitions. Returns published units matching the filter criteria. """ db = await get_unit_database() if db: try: units = await db.list_units( template=template, grade=grade, locale=locale, published_only=True ) return [ UnitListItem( unit_id=u["unit_id"], template=u["template"], difficulty=u["difficulty"], duration_minutes=u["duration_minutes"], locale=u["locale"], grade_band=u["grade_band"], ) for u in units ] except Exception as e: logger.error(f"Failed to list units: {e}") # Fallback: return demo unit return [ UnitListItem( unit_id="demo_unit_v1", template="flight_path", difficulty="base", duration_minutes=5, locale=["de-DE"], grade_band=["5", "6", "7"], ) ] @router.get("/definitions/{unit_id}", response_model=UnitDefinitionResponse) async def get_unit_definition( unit_id: str, user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> UnitDefinitionResponse: """ Get a specific unit definition. Returns the full unit configuration including stops, interactions, etc. """ db = await get_unit_database() if db: try: unit = await db.get_unit_definition(unit_id) if unit: return UnitDefinitionResponse( unit_id=unit["unit_id"], template=unit["template"], version=unit["version"], locale=unit["locale"], grade_band=unit["grade_band"], duration_minutes=unit["duration_minutes"], difficulty=unit["difficulty"], definition=unit["definition"], ) except Exception as e: logger.error(f"Failed to get unit definition: {e}") # Demo unit fallback if unit_id == "demo_unit_v1": return UnitDefinitionResponse( unit_id="demo_unit_v1", template="flight_path", version="1.0.0", locale=["de-DE"], grade_band=["5", "6", "7"], duration_minutes=5, difficulty="base", definition={ "unit_id": "demo_unit_v1", "template": "flight_path", "version": "1.0.0", "learning_objectives": ["Demo: Grundfunktion testen"], "stops": [ {"stop_id": "stop_1", "label": {"de-DE": "Start"}, "interaction": {"type": "aim_and_pass"}}, {"stop_id": "stop_2", "label": {"de-DE": "Mitte"}, "interaction": {"type": "aim_and_pass"}}, {"stop_id": "stop_3", "label": {"de-DE": "Ende"}, "interaction": {"type": "aim_and_pass"}}, ], "teacher_controls": {"allow_skip": True, "allow_replay": True}, }, ) raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}") @router.post("/definitions", response_model=UnitDefinitionResponse) async def create_unit_definition( request_data: CreateUnitRequest, user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> UnitDefinitionResponse: """ Create a new unit definition. - Validates unit structure - Saves to database or JSON file - Returns created unit """ import json from pathlib import Path # Build full definition definition = { "unit_id": request_data.unit_id, "template": request_data.template, "version": request_data.version, "locale": request_data.locale, "grade_band": request_data.grade_band, "duration_minutes": request_data.duration_minutes, "difficulty": request_data.difficulty, "subject": request_data.subject, "topic": request_data.topic, "learning_objectives": request_data.learning_objectives, "stops": request_data.stops, "precheck": request_data.precheck or { "question_set_id": f"{request_data.unit_id}_precheck", "required": True, "time_limit_seconds": 120 }, "postcheck": request_data.postcheck or { "question_set_id": f"{request_data.unit_id}_postcheck", "required": True, "time_limit_seconds": 180 }, "teacher_controls": request_data.teacher_controls or { "allow_skip": True, "allow_replay": True, "max_time_per_stop_sec": 90, "show_hints": True, "require_precheck": True, "require_postcheck": True }, "assets": request_data.assets or {}, "metadata": request_data.metadata or { "author": user.get("email", "Unknown") if user else "Unknown", "created": datetime.utcnow().isoformat(), "curriculum_reference": "" } } # Validate validation = validate_unit_definition(definition) if not validation.valid: error_msgs = [f"{e.field}: {e.message}" for e in validation.errors] raise HTTPException(status_code=400, detail=f"Validierung fehlgeschlagen: {'; '.join(error_msgs)}") # Check if unit_id already exists db = await get_unit_database() if db: try: existing = await db.get_unit_definition(request_data.unit_id) if existing: raise HTTPException(status_code=409, detail=f"Unit existiert bereits: {request_data.unit_id}") # Save to database await db.create_unit_definition( unit_id=request_data.unit_id, template=request_data.template, version=request_data.version, locale=request_data.locale, grade_band=request_data.grade_band, duration_minutes=request_data.duration_minutes, difficulty=request_data.difficulty, definition=definition, status=request_data.status ) logger.info(f"Unit created in database: {request_data.unit_id}") except HTTPException: raise except Exception as e: logger.warning(f"Database save failed, using JSON fallback: {e}") # Fallback to JSON units_dir = Path(__file__).parent / "data" / "units" units_dir.mkdir(parents=True, exist_ok=True) json_path = units_dir / f"{request_data.unit_id}.json" if json_path.exists(): raise HTTPException(status_code=409, detail=f"Unit existiert bereits: {request_data.unit_id}") with open(json_path, "w", encoding="utf-8") as f: json.dump(definition, f, ensure_ascii=False, indent=2) logger.info(f"Unit created as JSON: {json_path}") else: # JSON only mode units_dir = Path(__file__).parent / "data" / "units" units_dir.mkdir(parents=True, exist_ok=True) json_path = units_dir / f"{request_data.unit_id}.json" if json_path.exists(): raise HTTPException(status_code=409, detail=f"Unit existiert bereits: {request_data.unit_id}") with open(json_path, "w", encoding="utf-8") as f: json.dump(definition, f, ensure_ascii=False, indent=2) logger.info(f"Unit created as JSON: {json_path}") return UnitDefinitionResponse( unit_id=request_data.unit_id, template=request_data.template, version=request_data.version, locale=request_data.locale, grade_band=request_data.grade_band, duration_minutes=request_data.duration_minutes, difficulty=request_data.difficulty, definition=definition ) @router.put("/definitions/{unit_id}", response_model=UnitDefinitionResponse) async def update_unit_definition( unit_id: str, request_data: UpdateUnitRequest, user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> UnitDefinitionResponse: """ Update an existing unit definition. - Merges updates with existing definition - Re-validates - Saves updated version """ import json from pathlib import Path # Get existing unit db = await get_unit_database() existing = None if db: try: existing = await db.get_unit_definition(unit_id) except Exception as e: logger.warning(f"Database read failed: {e}") if not existing: # Try JSON file json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json" if json_path.exists(): with open(json_path, "r", encoding="utf-8") as f: file_data = json.load(f) existing = { "unit_id": file_data.get("unit_id"), "template": file_data.get("template"), "version": file_data.get("version", "1.0.0"), "locale": file_data.get("locale", ["de-DE"]), "grade_band": file_data.get("grade_band", []), "duration_minutes": file_data.get("duration_minutes", 8), "difficulty": file_data.get("difficulty", "base"), "definition": file_data } if not existing: raise HTTPException(status_code=404, detail=f"Unit nicht gefunden: {unit_id}") # Merge updates into existing definition definition = existing.get("definition", {}) update_dict = request_data.model_dump(exclude_unset=True) for key, value in update_dict.items(): if value is not None: definition[key] = value # Validate updated definition validation = validate_unit_definition(definition) if not validation.valid: error_msgs = [f"{e.field}: {e.message}" for e in validation.errors] raise HTTPException(status_code=400, detail=f"Validierung fehlgeschlagen: {'; '.join(error_msgs)}") # Save if db: try: await db.update_unit_definition( unit_id=unit_id, version=definition.get("version"), locale=definition.get("locale"), grade_band=definition.get("grade_band"), duration_minutes=definition.get("duration_minutes"), difficulty=definition.get("difficulty"), definition=definition, status=update_dict.get("status") ) logger.info(f"Unit updated in database: {unit_id}") except Exception as e: logger.warning(f"Database update failed, using JSON: {e}") json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json" with open(json_path, "w", encoding="utf-8") as f: json.dump(definition, f, ensure_ascii=False, indent=2) else: json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json" with open(json_path, "w", encoding="utf-8") as f: json.dump(definition, f, ensure_ascii=False, indent=2) logger.info(f"Unit updated as JSON: {json_path}") return UnitDefinitionResponse( unit_id=unit_id, template=definition.get("template", existing.get("template")), version=definition.get("version", existing.get("version", "1.0.0")), locale=definition.get("locale", existing.get("locale", ["de-DE"])), grade_band=definition.get("grade_band", existing.get("grade_band", [])), duration_minutes=definition.get("duration_minutes", existing.get("duration_minutes", 8)), difficulty=definition.get("difficulty", existing.get("difficulty", "base")), definition=definition ) @router.delete("/definitions/{unit_id}") async def delete_unit_definition( unit_id: str, force: bool = Query(False, description="Force delete even if published"), user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> Dict[str, Any]: """ Delete a unit definition. - By default, only drafts can be deleted - Use force=true to delete published units """ import json from pathlib import Path db = await get_unit_database() deleted = False if db: try: existing = await db.get_unit_definition(unit_id) if existing: status = existing.get("status", "draft") if status == "published" and not force: raise HTTPException( status_code=400, detail="Veroeffentlichte Units koennen nicht geloescht werden. Verwende force=true." ) await db.delete_unit_definition(unit_id) deleted = True logger.info(f"Unit deleted from database: {unit_id}") except HTTPException: raise except Exception as e: logger.warning(f"Database delete failed: {e}") # Also check JSON file json_path = Path(__file__).parent / "data" / "units" / f"{unit_id}.json" if json_path.exists(): json_path.unlink() deleted = True logger.info(f"Unit JSON deleted: {json_path}") if not deleted: raise HTTPException(status_code=404, detail=f"Unit nicht gefunden: {unit_id}") return {"success": True, "unit_id": unit_id, "message": "Unit geloescht"} @router.post("/definitions/validate", response_model=ValidationResult) async def validate_unit( unit_data: Dict[str, Any], user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> ValidationResult: """ Validate a unit definition without saving. Returns validation result with errors and warnings. """ return validate_unit_definition(unit_data) @router.post("/sessions", response_model=SessionResponse) async def create_unit_session( request_data: CreateSessionRequest, request: Request, user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> SessionResponse: """ Create a new unit session. - Validates unit exists - Creates session record - Returns session token for telemetry """ session_id = str(uuid.uuid4()) expires_at = datetime.utcnow() + timedelta(hours=4) # Validate unit exists db = await get_unit_database() if db: try: unit = await db.get_unit_definition(request_data.unit_id) if not unit: raise HTTPException(status_code=404, detail=f"Unit not found: {request_data.unit_id}") # Create session in database total_stops = len(unit.get("definition", {}).get("stops", [])) await db.create_session( session_id=session_id, unit_id=request_data.unit_id, student_id=request_data.student_id, locale=request_data.locale, difficulty=request_data.difficulty, total_stops=total_stops, ) except HTTPException: raise except Exception as e: logger.error(f"Failed to create session: {e}") # Continue with in-memory fallback # Create session token session_token = create_session_token(session_id, request_data.student_id) # Build definition URL base_url = str(request.base_url).rstrip("/") definition_url = f"{base_url}/api/units/definitions/{request_data.unit_id}" return SessionResponse( session_id=session_id, unit_definition_url=definition_url, session_token=session_token, telemetry_endpoint="/api/units/telemetry", expires_at=expires_at, ) @router.post("/telemetry", response_model=TelemetryResponse) async def receive_telemetry( payload: TelemetryPayload, request: Request, ) -> TelemetryResponse: """ Receive batched telemetry events from Unity client. - Validates session token - Stores events in database - Returns count of accepted events """ # Verify session token session_data = await get_session_from_token(request) if session_data is None: # Allow without auth in dev mode if REQUIRE_AUTH: raise HTTPException(status_code=401, detail="Invalid or expired session token") logger.warning("Telemetry received without valid token (dev mode)") # Verify session_id matches if session_data and session_data.get("session_id") != payload.session_id: raise HTTPException(status_code=403, detail="Session ID mismatch") accepted = 0 db = await get_unit_database() for event in payload.events: try: # Set timestamp if not provided timestamp = event.ts or datetime.utcnow().isoformat() if db: await db.store_telemetry_event( session_id=payload.session_id, event_type=event.type, stop_id=event.stop_id, timestamp=timestamp, metrics=event.metrics, ) accepted += 1 logger.debug(f"Telemetry: {event.type} for session {payload.session_id}") except Exception as e: logger.error(f"Failed to store telemetry event: {e}") return TelemetryResponse(accepted=accepted) @router.post("/sessions/{session_id}/complete", response_model=SessionSummaryResponse) async def complete_session( session_id: str, request_data: CompleteSessionRequest, request: Request, ) -> SessionSummaryResponse: """ Complete a unit session. - Processes postcheck answers if provided - Calculates learning gain - Returns summary and recommendations """ # Verify session token session_data = await get_session_from_token(request) if REQUIRE_AUTH and session_data is None: raise HTTPException(status_code=401, detail="Invalid or expired session token") db = await get_unit_database() summary = {} recommendations = {} if db: try: # Get session data session = await db.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") # Calculate postcheck score if answers provided postcheck_score = None if request_data.postcheck_answers: # Simple scoring: count correct answers # In production, would validate against question bank postcheck_score = len(request_data.postcheck_answers) * 0.2 # Placeholder postcheck_score = min(postcheck_score, 1.0) # Complete session in database await db.complete_session( session_id=session_id, postcheck_score=postcheck_score, ) # Get updated session summary session = await db.get_session(session_id) # Calculate learning gain pre_score = session.get("precheck_score") post_score = session.get("postcheck_score") learning_gain = None if pre_score is not None and post_score is not None: learning_gain = post_score - pre_score summary = { "session_id": session_id, "unit_id": session.get("unit_id"), "duration_seconds": session.get("duration_seconds"), "completion_rate": session.get("completion_rate"), "precheck_score": pre_score, "postcheck_score": post_score, "pre_to_post_gain": learning_gain, "stops_completed": session.get("stops_completed"), "total_stops": session.get("total_stops"), } # Get recommendations recommendations = await db.get_recommendations( student_id=session.get("student_id"), completed_unit_id=session.get("unit_id"), ) except HTTPException: raise except Exception as e: logger.error(f"Failed to complete session: {e}") summary = {"session_id": session_id, "error": str(e)} else: # Fallback summary summary = { "session_id": session_id, "duration_seconds": 0, "completion_rate": 1.0, "message": "Database not available", } return SessionSummaryResponse( summary=summary, next_recommendations=recommendations or { "h5p_activity_ids": [], "worksheet_pdf_url": None, }, ) @router.get("/sessions/{session_id}") async def get_session( session_id: str, request: Request, ) -> Dict[str, Any]: """ Get session details. Returns current state of a session including progress. """ # Verify session token session_data = await get_session_from_token(request) if REQUIRE_AUTH and session_data is None: raise HTTPException(status_code=401, detail="Invalid or expired session token") db = await get_unit_database() if db: try: session = await db.get_session(session_id) if session: return session except Exception as e: logger.error(f"Failed to get session: {e}") raise HTTPException(status_code=404, detail="Session not found") @router.get("/recommendations/{student_id}", response_model=List[RecommendedUnit]) async def get_recommendations( student_id: str, grade: Optional[str] = Query(None, description="Grade level filter"), locale: str = Query("de-DE", description="Locale filter"), limit: int = Query(5, ge=1, le=20), user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> List[RecommendedUnit]: """ Get recommended units for a student. Based on completion status and performance. """ db = await get_unit_database() if db: try: recommendations = await db.get_student_recommendations( student_id=student_id, grade=grade, locale=locale, limit=limit, ) return [ RecommendedUnit( unit_id=r["unit_id"], template=r["template"], difficulty=r["difficulty"], reason=r["reason"], ) for r in recommendations ] except Exception as e: logger.error(f"Failed to get recommendations: {e}") # Fallback: recommend demo unit return [ RecommendedUnit( unit_id="demo_unit_v1", template="flight_path", difficulty="base", reason="Neu: Noch nicht gespielt", ) ] @router.get("/analytics/student/{student_id}") async def get_student_analytics( student_id: str, user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> Dict[str, Any]: """ Get unit analytics for a student. Includes completion rates, learning gains, time spent. """ db = await get_unit_database() if db: try: analytics = await db.get_student_unit_analytics(student_id) return analytics except Exception as e: logger.error(f"Failed to get analytics: {e}") return { "student_id": student_id, "units_attempted": 0, "units_completed": 0, "avg_completion_rate": 0.0, "avg_learning_gain": None, "total_minutes": 0, } @router.get("/analytics/unit/{unit_id}") async def get_unit_analytics( unit_id: str, user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> Dict[str, Any]: """ Get analytics for a specific unit. Shows aggregate performance across all students. """ db = await get_unit_database() if db: try: analytics = await db.get_unit_performance(unit_id) return analytics except Exception as e: logger.error(f"Failed to get unit analytics: {e}") return { "unit_id": unit_id, "total_sessions": 0, "completed_sessions": 0, "completion_percent": 0.0, "avg_duration_minutes": 0, "avg_learning_gain": None, } # ============================================== # Content Generation Endpoints # ============================================== @router.get("/content/{unit_id}/h5p") async def generate_h5p_content( unit_id: str, locale: str = Query("de-DE", description="Target locale"), user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> Dict[str, Any]: """ Generate H5P content items for a unit. Returns H5P-compatible content structures for: - Drag and Drop (vocabulary matching) - Fill in the Blanks (concept texts) - Multiple Choice (misconception targeting) """ from content_generators import generate_h5p_for_unit, H5PGenerator, generate_h5p_manifest # Get unit definition db = await get_unit_database() unit_def = None if db: try: unit = await db.get_unit_definition(unit_id) if unit: unit_def = unit.get("definition", {}) except Exception as e: logger.error(f"Failed to get unit for H5P generation: {e}") if not unit_def: raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}") try: generator = H5PGenerator(locale=locale) contents = generator.generate_from_unit(unit_def) manifest = generate_h5p_manifest(contents, unit_id) return { "unit_id": unit_id, "locale": locale, "generated_count": len(contents), "manifest": manifest, "contents": [c.to_h5p_structure() for c in contents] } except Exception as e: logger.error(f"H5P generation failed: {e}") raise HTTPException(status_code=500, detail=f"H5P generation failed: {str(e)}") @router.get("/content/{unit_id}/worksheet") async def generate_worksheet_html( unit_id: str, locale: str = Query("de-DE", description="Target locale"), user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ) -> Dict[str, Any]: """ Generate worksheet HTML for a unit. Returns HTML that can be: - Displayed in browser - Converted to PDF using weasyprint - Printed directly """ from content_generators import PDFGenerator # Get unit definition db = await get_unit_database() unit_def = None if db: try: unit = await db.get_unit_definition(unit_id) if unit: unit_def = unit.get("definition", {}) except Exception as e: logger.error(f"Failed to get unit for worksheet generation: {e}") if not unit_def: raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}") try: generator = PDFGenerator(locale=locale) worksheet = generator.generate_from_unit(unit_def) return { "unit_id": unit_id, "locale": locale, "title": worksheet.title, "sections": len(worksheet.sections), "html": worksheet.to_html() } except Exception as e: logger.error(f"Worksheet generation failed: {e}") raise HTTPException(status_code=500, detail=f"Worksheet generation failed: {str(e)}") @router.get("/content/{unit_id}/worksheet.pdf") async def download_worksheet_pdf( unit_id: str, locale: str = Query("de-DE", description="Target locale"), user: Optional[Dict[str, Any]] = Depends(get_optional_current_user) ): """ Generate and download worksheet as PDF. Requires weasyprint to be installed on the server. """ from fastapi.responses import Response # Get unit definition db = await get_unit_database() unit_def = None if db: try: unit = await db.get_unit_definition(unit_id) if unit: unit_def = unit.get("definition", {}) except Exception as e: logger.error(f"Failed to get unit for PDF generation: {e}") if not unit_def: raise HTTPException(status_code=404, detail=f"Unit not found: {unit_id}") try: from content_generators import generate_worksheet_pdf pdf_bytes = generate_worksheet_pdf(unit_def, locale) return Response( content=pdf_bytes, media_type="application/pdf", headers={ "Content-Disposition": f'attachment; filename="{unit_id}_worksheet.pdf"' } ) except ImportError: raise HTTPException( status_code=501, detail="PDF generation not available. Install weasyprint: pip install weasyprint" ) except Exception as e: logger.error(f"PDF generation failed: {e}") raise HTTPException(status_code=500, detail=f"PDF generation failed: {str(e)}") @router.get("/health") async def health_check() -> Dict[str, Any]: """Health check for unit API.""" db = await get_unit_database() db_status = "connected" if db else "disconnected" return { "status": "healthy", "service": "breakpilot-units", "database": db_status, "auth_required": REQUIRE_AUTH, }