# ============================================== # Breakpilot Drive - Unit Analytics API # ============================================== # Erweiterte Analytics fuer Lernfortschritt: # - Pre/Post Gain Visualisierung # - Misconception-Tracking # - Stop-Level Analytics # - Aggregierte Klassen-Statistiken # - Export-Funktionen from fastapi import APIRouter, HTTPException, Query, Depends, Request from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any from datetime import datetime, timedelta from enum import Enum import os import logging import statistics logger = logging.getLogger(__name__) # Feature flags USE_DATABASE = os.getenv("GAME_USE_DATABASE", "true").lower() == "true" router = APIRouter(prefix="/api/analytics", tags=["Unit Analytics"]) # ============================================== # Pydantic Models # ============================================== class TimeRange(str, Enum): """Time range for analytics queries""" WEEK = "week" MONTH = "month" QUARTER = "quarter" ALL = "all" class LearningGainData(BaseModel): """Pre/Post learning gain data point""" student_id: str student_name: str unit_id: str precheck_score: float postcheck_score: float learning_gain: float percentile: Optional[float] = None class LearningGainSummary(BaseModel): """Aggregated learning gain statistics""" unit_id: str unit_title: str total_students: int avg_precheck: float avg_postcheck: float avg_gain: float median_gain: float std_deviation: float positive_gain_count: int negative_gain_count: int no_change_count: int gain_distribution: Dict[str, int] # "-20+", "-10-0", "0-10", "10-20", "20+" individual_gains: List[LearningGainData] class StopPerformance(BaseModel): """Performance data for a single stop""" stop_id: str stop_label: str attempts_total: int success_rate: float avg_time_seconds: float avg_attempts_before_success: float common_errors: List[str] difficulty_rating: float # 1-5 based on performance class UnitPerformanceDetail(BaseModel): """Detailed unit performance breakdown""" unit_id: str unit_title: str template: str total_sessions: int completed_sessions: int completion_rate: float avg_duration_minutes: float stops: List[StopPerformance] bottleneck_stops: List[str] # Stops where students struggle most class MisconceptionEntry(BaseModel): """Individual misconception tracking""" concept_id: str concept_label: str misconception_text: str frequency: int affected_student_ids: List[str] unit_id: str stop_id: str detected_via: str # "precheck", "postcheck", "interaction" first_detected: datetime last_detected: datetime class MisconceptionReport(BaseModel): """Comprehensive misconception report""" class_id: Optional[str] time_range: str total_misconceptions: int unique_concepts: int most_common: List[MisconceptionEntry] by_unit: Dict[str, List[MisconceptionEntry]] trending_up: List[MisconceptionEntry] # Getting more frequent resolved: List[MisconceptionEntry] # No longer appearing class StudentProgressTimeline(BaseModel): """Timeline of student progress""" student_id: str student_name: str units_completed: int total_time_minutes: int avg_score: float trend: str # "improving", "stable", "declining" timeline: List[Dict[str, Any]] # List of session events class ClassComparisonData(BaseModel): """Data for comparing class performance""" class_id: str class_name: str student_count: int units_assigned: int avg_completion_rate: float avg_learning_gain: float avg_time_per_unit: float class ExportFormat(str, Enum): """Export format options""" JSON = "json" CSV = "csv" # ============================================== # Database Integration # ============================================== _analytics_db = None async def get_analytics_database(): """Get analytics database instance.""" global _analytics_db if not USE_DATABASE: return None if _analytics_db is None: try: from unit.database import get_analytics_db _analytics_db = await get_analytics_db() logger.info("Analytics database initialized") except ImportError: logger.warning("Analytics database module not available") except Exception as e: logger.warning(f"Analytics database not available: {e}") return _analytics_db # ============================================== # Helper Functions # ============================================== def calculate_gain_distribution(gains: List[float]) -> Dict[str, int]: """Calculate distribution of learning gains into buckets.""" distribution = { "< -20%": 0, "-20% to -10%": 0, "-10% to 0%": 0, "0% to 10%": 0, "10% to 20%": 0, "> 20%": 0, } for gain in gains: gain_percent = gain * 100 if gain_percent < -20: distribution["< -20%"] += 1 elif gain_percent < -10: distribution["-20% to -10%"] += 1 elif gain_percent < 0: distribution["-10% to 0%"] += 1 elif gain_percent < 10: distribution["0% to 10%"] += 1 elif gain_percent < 20: distribution["10% to 20%"] += 1 else: distribution["> 20%"] += 1 return distribution def calculate_trend(scores: List[float]) -> str: """Calculate trend from a series of scores.""" if len(scores) < 3: return "insufficient_data" # Simple linear regression n = len(scores) x_mean = (n - 1) / 2 y_mean = sum(scores) / n numerator = sum((i - x_mean) * (scores[i] - y_mean) for i in range(n)) denominator = sum((i - x_mean) ** 2 for i in range(n)) if denominator == 0: return "stable" slope = numerator / denominator if slope > 0.05: return "improving" elif slope < -0.05: return "declining" else: return "stable" def calculate_difficulty_rating(success_rate: float, avg_attempts: float) -> float: """Calculate difficulty rating 1-5 based on success metrics.""" # Lower success rate and higher attempts = higher difficulty base_difficulty = (1 - success_rate) * 3 + 1 # 1-4 range attempt_modifier = min(avg_attempts - 1, 1) # 0-1 range return min(5.0, base_difficulty + attempt_modifier) # ============================================== # API Endpoints - Learning Gain # ============================================== # NOTE: Static routes must come BEFORE dynamic routes like /{unit_id} @router.get("/learning-gain/compare") async def compare_learning_gains( unit_ids: str = Query(..., description="Comma-separated unit IDs"), class_id: Optional[str] = Query(None), time_range: TimeRange = Query(TimeRange.MONTH), ) -> Dict[str, Any]: """ Compare learning gains across multiple units. """ unit_list = [u.strip() for u in unit_ids.split(",")] comparisons = [] for unit_id in unit_list: try: summary = await get_learning_gain_analysis(unit_id, class_id, time_range) comparisons.append({ "unit_id": unit_id, "avg_gain": summary.avg_gain, "median_gain": summary.median_gain, "total_students": summary.total_students, "positive_rate": summary.positive_gain_count / max(summary.total_students, 1), }) except Exception as e: logger.error(f"Failed to get comparison for {unit_id}: {e}") return { "time_range": time_range.value, "class_id": class_id, "comparisons": sorted(comparisons, key=lambda x: x["avg_gain"], reverse=True), } @router.get("/learning-gain/{unit_id}", response_model=LearningGainSummary) async def get_learning_gain_analysis( unit_id: str, class_id: Optional[str] = Query(None, description="Filter by class"), time_range: TimeRange = Query(TimeRange.MONTH, description="Time range for analysis"), ) -> LearningGainSummary: """ Get detailed pre/post learning gain analysis for a unit. Shows individual gains, aggregated statistics, and distribution. """ db = await get_analytics_database() individual_gains = [] if db: try: # Get all sessions with pre/post scores for this unit sessions = await db.get_unit_sessions_with_scores( unit_id=unit_id, class_id=class_id, time_range=time_range.value ) for session in sessions: if session.get("precheck_score") is not None and session.get("postcheck_score") is not None: gain = session["postcheck_score"] - session["precheck_score"] individual_gains.append(LearningGainData( student_id=session["student_id"], student_name=session.get("student_name", session["student_id"][:8]), unit_id=unit_id, precheck_score=session["precheck_score"], postcheck_score=session["postcheck_score"], learning_gain=gain, )) except Exception as e: logger.error(f"Failed to get learning gain data: {e}") # Calculate statistics if not individual_gains: # Return empty summary return LearningGainSummary( unit_id=unit_id, unit_title=f"Unit {unit_id}", total_students=0, avg_precheck=0.0, avg_postcheck=0.0, avg_gain=0.0, median_gain=0.0, std_deviation=0.0, positive_gain_count=0, negative_gain_count=0, no_change_count=0, gain_distribution={}, individual_gains=[], ) gains = [g.learning_gain for g in individual_gains] prechecks = [g.precheck_score for g in individual_gains] postchecks = [g.postcheck_score for g in individual_gains] avg_gain = statistics.mean(gains) median_gain = statistics.median(gains) std_dev = statistics.stdev(gains) if len(gains) > 1 else 0.0 # Calculate percentiles sorted_gains = sorted(gains) for data in individual_gains: rank = sorted_gains.index(data.learning_gain) + 1 data.percentile = rank / len(sorted_gains) * 100 return LearningGainSummary( unit_id=unit_id, unit_title=f"Unit {unit_id}", total_students=len(individual_gains), avg_precheck=statistics.mean(prechecks), avg_postcheck=statistics.mean(postchecks), avg_gain=avg_gain, median_gain=median_gain, std_deviation=std_dev, positive_gain_count=sum(1 for g in gains if g > 0.01), negative_gain_count=sum(1 for g in gains if g < -0.01), no_change_count=sum(1 for g in gains if -0.01 <= g <= 0.01), gain_distribution=calculate_gain_distribution(gains), individual_gains=sorted(individual_gains, key=lambda x: x.learning_gain, reverse=True), ) # ============================================== # API Endpoints - Stop-Level Analytics # ============================================== @router.get("/unit/{unit_id}/stops", response_model=UnitPerformanceDetail) async def get_unit_stop_analytics( unit_id: str, class_id: Optional[str] = Query(None), time_range: TimeRange = Query(TimeRange.MONTH), ) -> UnitPerformanceDetail: """ Get detailed stop-level performance analytics. Identifies bottleneck stops where students struggle most. """ db = await get_analytics_database() stops_data = [] if db: try: # Get stop-level telemetry stop_stats = await db.get_stop_performance( unit_id=unit_id, class_id=class_id, time_range=time_range.value ) for stop in stop_stats: difficulty = calculate_difficulty_rating( stop.get("success_rate", 0.5), stop.get("avg_attempts", 1.0) ) stops_data.append(StopPerformance( stop_id=stop["stop_id"], stop_label=stop.get("stop_label", stop["stop_id"]), attempts_total=stop.get("total_attempts", 0), success_rate=stop.get("success_rate", 0.0), avg_time_seconds=stop.get("avg_time_seconds", 0.0), avg_attempts_before_success=stop.get("avg_attempts", 1.0), common_errors=stop.get("common_errors", []), difficulty_rating=difficulty, )) # Get overall unit stats unit_stats = await db.get_unit_overall_stats(unit_id, class_id, time_range.value) except Exception as e: logger.error(f"Failed to get stop analytics: {e}") unit_stats = {} else: unit_stats = {} # Identify bottleneck stops (difficulty > 3.5 or success rate < 0.6) bottlenecks = [ s.stop_id for s in stops_data if s.difficulty_rating > 3.5 or s.success_rate < 0.6 ] return UnitPerformanceDetail( unit_id=unit_id, unit_title=f"Unit {unit_id}", template=unit_stats.get("template", "unknown"), total_sessions=unit_stats.get("total_sessions", 0), completed_sessions=unit_stats.get("completed_sessions", 0), completion_rate=unit_stats.get("completion_rate", 0.0), avg_duration_minutes=unit_stats.get("avg_duration_minutes", 0.0), stops=stops_data, bottleneck_stops=bottlenecks, ) # ============================================== # API Endpoints - Misconception Tracking # ============================================== @router.get("/misconceptions", response_model=MisconceptionReport) async def get_misconception_report( class_id: Optional[str] = Query(None), unit_id: Optional[str] = Query(None), time_range: TimeRange = Query(TimeRange.MONTH), limit: int = Query(20, ge=1, le=100), ) -> MisconceptionReport: """ Get comprehensive misconception report. Shows most common misconceptions and their frequency. """ db = await get_analytics_database() misconceptions = [] if db: try: raw_misconceptions = await db.get_misconceptions( class_id=class_id, unit_id=unit_id, time_range=time_range.value, limit=limit ) for m in raw_misconceptions: misconceptions.append(MisconceptionEntry( concept_id=m["concept_id"], concept_label=m["concept_label"], misconception_text=m["misconception_text"], frequency=m["frequency"], affected_student_ids=m.get("student_ids", []), unit_id=m["unit_id"], stop_id=m["stop_id"], detected_via=m.get("detected_via", "unknown"), first_detected=m.get("first_detected", datetime.utcnow()), last_detected=m.get("last_detected", datetime.utcnow()), )) except Exception as e: logger.error(f"Failed to get misconceptions: {e}") # Group by unit by_unit = {} for m in misconceptions: if m.unit_id not in by_unit: by_unit[m.unit_id] = [] by_unit[m.unit_id].append(m) # Identify trending misconceptions (would need historical comparison in production) trending_up = misconceptions[:3] if misconceptions else [] resolved = [] # Would identify from historical data return MisconceptionReport( class_id=class_id, time_range=time_range.value, total_misconceptions=sum(m.frequency for m in misconceptions), unique_concepts=len(set(m.concept_id for m in misconceptions)), most_common=sorted(misconceptions, key=lambda x: x.frequency, reverse=True)[:10], by_unit=by_unit, trending_up=trending_up, resolved=resolved, ) @router.get("/misconceptions/student/{student_id}") async def get_student_misconceptions( student_id: str, time_range: TimeRange = Query(TimeRange.ALL), ) -> Dict[str, Any]: """ Get misconceptions for a specific student. Useful for personalized remediation. """ db = await get_analytics_database() if db: try: misconceptions = await db.get_student_misconceptions( student_id=student_id, time_range=time_range.value ) return { "student_id": student_id, "misconceptions": misconceptions, "recommended_remediation": [ {"concept": m["concept_label"], "activity": f"Review {m['unit_id']}/{m['stop_id']}"} for m in misconceptions[:5] ] } except Exception as e: logger.error(f"Failed to get student misconceptions: {e}") return { "student_id": student_id, "misconceptions": [], "recommended_remediation": [], } # ============================================== # API Endpoints - Student Progress Timeline # ============================================== @router.get("/student/{student_id}/timeline", response_model=StudentProgressTimeline) async def get_student_timeline( student_id: str, time_range: TimeRange = Query(TimeRange.ALL), ) -> StudentProgressTimeline: """ Get detailed progress timeline for a student. Shows all unit sessions and performance trend. """ db = await get_analytics_database() timeline = [] scores = [] if db: try: sessions = await db.get_student_sessions( student_id=student_id, time_range=time_range.value ) for session in sessions: timeline.append({ "date": session.get("started_at"), "unit_id": session.get("unit_id"), "completed": session.get("completed_at") is not None, "precheck": session.get("precheck_score"), "postcheck": session.get("postcheck_score"), "duration_minutes": session.get("duration_seconds", 0) // 60, }) if session.get("postcheck_score") is not None: scores.append(session["postcheck_score"]) except Exception as e: logger.error(f"Failed to get student timeline: {e}") trend = calculate_trend(scores) if scores else "insufficient_data" return StudentProgressTimeline( student_id=student_id, student_name=f"Student {student_id[:8]}", # Would load actual name units_completed=sum(1 for t in timeline if t["completed"]), total_time_minutes=sum(t["duration_minutes"] for t in timeline), avg_score=statistics.mean(scores) if scores else 0.0, trend=trend, timeline=timeline, ) # ============================================== # API Endpoints - Class Comparison # ============================================== @router.get("/compare/classes", response_model=List[ClassComparisonData]) async def compare_classes( class_ids: str = Query(..., description="Comma-separated class IDs"), time_range: TimeRange = Query(TimeRange.MONTH), ) -> List[ClassComparisonData]: """ Compare performance across multiple classes. """ class_list = [c.strip() for c in class_ids.split(",")] comparisons = [] db = await get_analytics_database() if db: for class_id in class_list: try: stats = await db.get_class_aggregate_stats(class_id, time_range.value) comparisons.append(ClassComparisonData( class_id=class_id, class_name=stats.get("class_name", f"Klasse {class_id[:8]}"), student_count=stats.get("student_count", 0), units_assigned=stats.get("units_assigned", 0), avg_completion_rate=stats.get("avg_completion_rate", 0.0), avg_learning_gain=stats.get("avg_learning_gain", 0.0), avg_time_per_unit=stats.get("avg_time_per_unit", 0.0), )) except Exception as e: logger.error(f"Failed to get stats for class {class_id}: {e}") return sorted(comparisons, key=lambda x: x.avg_learning_gain, reverse=True) # ============================================== # API Endpoints - Export # ============================================== @router.get("/export/learning-gains") async def export_learning_gains( unit_id: Optional[str] = Query(None), class_id: Optional[str] = Query(None), time_range: TimeRange = Query(TimeRange.ALL), format: ExportFormat = Query(ExportFormat.JSON), ) -> Any: """ Export learning gain data. """ from fastapi.responses import Response db = await get_analytics_database() data = [] if db: try: data = await db.export_learning_gains( unit_id=unit_id, class_id=class_id, time_range=time_range.value ) except Exception as e: logger.error(f"Failed to export data: {e}") if format == ExportFormat.CSV: # Convert to CSV if not data: csv_content = "student_id,unit_id,precheck,postcheck,gain\n" else: csv_content = "student_id,unit_id,precheck,postcheck,gain\n" for row in data: csv_content += f"{row['student_id']},{row['unit_id']},{row.get('precheck', '')},{row.get('postcheck', '')},{row.get('gain', '')}\n" return Response( content=csv_content, media_type="text/csv", headers={"Content-Disposition": "attachment; filename=learning_gains.csv"} ) return { "export_date": datetime.utcnow().isoformat(), "filters": { "unit_id": unit_id, "class_id": class_id, "time_range": time_range.value, }, "data": data, } @router.get("/export/misconceptions") async def export_misconceptions( class_id: Optional[str] = Query(None), format: ExportFormat = Query(ExportFormat.JSON), ) -> Any: """ Export misconception data for further analysis. """ report = await get_misconception_report( class_id=class_id, unit_id=None, time_range=TimeRange.MONTH, limit=100 ) if format == ExportFormat.CSV: from fastapi.responses import Response csv_content = "concept_id,concept_label,misconception,frequency,unit_id,stop_id\n" for m in report.most_common: csv_content += f'"{m.concept_id}","{m.concept_label}","{m.misconception_text}",{m.frequency},"{m.unit_id}","{m.stop_id}"\n' return Response( content=csv_content, media_type="text/csv", headers={"Content-Disposition": "attachment; filename=misconceptions.csv"} ) return { "export_date": datetime.utcnow().isoformat(), "class_id": class_id, "total_entries": len(report.most_common), "data": [m.model_dump() for m in report.most_common], } # ============================================== # API Endpoints - Dashboard Aggregates # ============================================== @router.get("/dashboard/overview") async def get_analytics_overview( time_range: TimeRange = Query(TimeRange.MONTH), ) -> Dict[str, Any]: """ Get high-level analytics overview for dashboard. """ db = await get_analytics_database() if db: try: overview = await db.get_analytics_overview(time_range.value) return overview except Exception as e: logger.error(f"Failed to get analytics overview: {e}") return { "time_range": time_range.value, "total_sessions": 0, "unique_students": 0, "avg_completion_rate": 0.0, "avg_learning_gain": 0.0, "most_played_units": [], "struggling_concepts": [], "active_classes": 0, } @router.get("/health") async def health_check() -> Dict[str, Any]: """Health check for analytics API.""" db = await get_analytics_database() return { "status": "healthy", "service": "unit-analytics", "database": "connected" if db else "disconnected", }