Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
752 lines
24 KiB
Python
752 lines
24 KiB
Python
# ==============================================
|
|
# Breakpilot Drive - Unit Analytics API
|
|
# ==============================================
|
|
# Erweiterte Analytics fuer Lernfortschritt:
|
|
# - Pre/Post Gain Visualisierung
|
|
# - Misconception-Tracking
|
|
# - Stop-Level Analytics
|
|
# - Aggregierte Klassen-Statistiken
|
|
# - Export-Funktionen
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Depends, Request
|
|
from pydantic import BaseModel, Field
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
import os
|
|
import logging
|
|
import statistics
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Feature flags
|
|
USE_DATABASE = os.getenv("GAME_USE_DATABASE", "true").lower() == "true"
|
|
|
|
router = APIRouter(prefix="/api/analytics", tags=["Unit Analytics"])
|
|
|
|
|
|
# ==============================================
|
|
# Pydantic Models
|
|
# ==============================================
|
|
|
|
class TimeRange(str, Enum):
|
|
"""Time range for analytics queries"""
|
|
WEEK = "week"
|
|
MONTH = "month"
|
|
QUARTER = "quarter"
|
|
ALL = "all"
|
|
|
|
|
|
class LearningGainData(BaseModel):
|
|
"""Pre/Post learning gain data point"""
|
|
student_id: str
|
|
student_name: str
|
|
unit_id: str
|
|
precheck_score: float
|
|
postcheck_score: float
|
|
learning_gain: float
|
|
percentile: Optional[float] = None
|
|
|
|
|
|
class LearningGainSummary(BaseModel):
|
|
"""Aggregated learning gain statistics"""
|
|
unit_id: str
|
|
unit_title: str
|
|
total_students: int
|
|
avg_precheck: float
|
|
avg_postcheck: float
|
|
avg_gain: float
|
|
median_gain: float
|
|
std_deviation: float
|
|
positive_gain_count: int
|
|
negative_gain_count: int
|
|
no_change_count: int
|
|
gain_distribution: Dict[str, int] # "-20+", "-10-0", "0-10", "10-20", "20+"
|
|
individual_gains: List[LearningGainData]
|
|
|
|
|
|
class StopPerformance(BaseModel):
|
|
"""Performance data for a single stop"""
|
|
stop_id: str
|
|
stop_label: str
|
|
attempts_total: int
|
|
success_rate: float
|
|
avg_time_seconds: float
|
|
avg_attempts_before_success: float
|
|
common_errors: List[str]
|
|
difficulty_rating: float # 1-5 based on performance
|
|
|
|
|
|
class UnitPerformanceDetail(BaseModel):
|
|
"""Detailed unit performance breakdown"""
|
|
unit_id: str
|
|
unit_title: str
|
|
template: str
|
|
total_sessions: int
|
|
completed_sessions: int
|
|
completion_rate: float
|
|
avg_duration_minutes: float
|
|
stops: List[StopPerformance]
|
|
bottleneck_stops: List[str] # Stops where students struggle most
|
|
|
|
|
|
class MisconceptionEntry(BaseModel):
|
|
"""Individual misconception tracking"""
|
|
concept_id: str
|
|
concept_label: str
|
|
misconception_text: str
|
|
frequency: int
|
|
affected_student_ids: List[str]
|
|
unit_id: str
|
|
stop_id: str
|
|
detected_via: str # "precheck", "postcheck", "interaction"
|
|
first_detected: datetime
|
|
last_detected: datetime
|
|
|
|
|
|
class MisconceptionReport(BaseModel):
|
|
"""Comprehensive misconception report"""
|
|
class_id: Optional[str]
|
|
time_range: str
|
|
total_misconceptions: int
|
|
unique_concepts: int
|
|
most_common: List[MisconceptionEntry]
|
|
by_unit: Dict[str, List[MisconceptionEntry]]
|
|
trending_up: List[MisconceptionEntry] # Getting more frequent
|
|
resolved: List[MisconceptionEntry] # No longer appearing
|
|
|
|
|
|
class StudentProgressTimeline(BaseModel):
|
|
"""Timeline of student progress"""
|
|
student_id: str
|
|
student_name: str
|
|
units_completed: int
|
|
total_time_minutes: int
|
|
avg_score: float
|
|
trend: str # "improving", "stable", "declining"
|
|
timeline: List[Dict[str, Any]] # List of session events
|
|
|
|
|
|
class ClassComparisonData(BaseModel):
|
|
"""Data for comparing class performance"""
|
|
class_id: str
|
|
class_name: str
|
|
student_count: int
|
|
units_assigned: int
|
|
avg_completion_rate: float
|
|
avg_learning_gain: float
|
|
avg_time_per_unit: float
|
|
|
|
|
|
class ExportFormat(str, Enum):
|
|
"""Export format options"""
|
|
JSON = "json"
|
|
CSV = "csv"
|
|
|
|
|
|
# ==============================================
|
|
# Database Integration
|
|
# ==============================================
|
|
|
|
_analytics_db = None
|
|
|
|
async def get_analytics_database():
|
|
"""Get analytics database instance."""
|
|
global _analytics_db
|
|
if not USE_DATABASE:
|
|
return None
|
|
if _analytics_db is None:
|
|
try:
|
|
from unit.database import get_analytics_db
|
|
_analytics_db = await get_analytics_db()
|
|
logger.info("Analytics database initialized")
|
|
except ImportError:
|
|
logger.warning("Analytics database module not available")
|
|
except Exception as e:
|
|
logger.warning(f"Analytics database not available: {e}")
|
|
return _analytics_db
|
|
|
|
|
|
# ==============================================
|
|
# Helper Functions
|
|
# ==============================================
|
|
|
|
def calculate_gain_distribution(gains: List[float]) -> Dict[str, int]:
|
|
"""Calculate distribution of learning gains into buckets."""
|
|
distribution = {
|
|
"< -20%": 0,
|
|
"-20% to -10%": 0,
|
|
"-10% to 0%": 0,
|
|
"0% to 10%": 0,
|
|
"10% to 20%": 0,
|
|
"> 20%": 0,
|
|
}
|
|
|
|
for gain in gains:
|
|
gain_percent = gain * 100
|
|
if gain_percent < -20:
|
|
distribution["< -20%"] += 1
|
|
elif gain_percent < -10:
|
|
distribution["-20% to -10%"] += 1
|
|
elif gain_percent < 0:
|
|
distribution["-10% to 0%"] += 1
|
|
elif gain_percent < 10:
|
|
distribution["0% to 10%"] += 1
|
|
elif gain_percent < 20:
|
|
distribution["10% to 20%"] += 1
|
|
else:
|
|
distribution["> 20%"] += 1
|
|
|
|
return distribution
|
|
|
|
|
|
def calculate_trend(scores: List[float]) -> str:
|
|
"""Calculate trend from a series of scores."""
|
|
if len(scores) < 3:
|
|
return "insufficient_data"
|
|
|
|
# Simple linear regression
|
|
n = len(scores)
|
|
x_mean = (n - 1) / 2
|
|
y_mean = sum(scores) / n
|
|
|
|
numerator = sum((i - x_mean) * (scores[i] - y_mean) for i in range(n))
|
|
denominator = sum((i - x_mean) ** 2 for i in range(n))
|
|
|
|
if denominator == 0:
|
|
return "stable"
|
|
|
|
slope = numerator / denominator
|
|
|
|
if slope > 0.05:
|
|
return "improving"
|
|
elif slope < -0.05:
|
|
return "declining"
|
|
else:
|
|
return "stable"
|
|
|
|
|
|
def calculate_difficulty_rating(success_rate: float, avg_attempts: float) -> float:
|
|
"""Calculate difficulty rating 1-5 based on success metrics."""
|
|
# Lower success rate and higher attempts = higher difficulty
|
|
base_difficulty = (1 - success_rate) * 3 + 1 # 1-4 range
|
|
attempt_modifier = min(avg_attempts - 1, 1) # 0-1 range
|
|
return min(5.0, base_difficulty + attempt_modifier)
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints - Learning Gain
|
|
# ==============================================
|
|
|
|
# NOTE: Static routes must come BEFORE dynamic routes like /{unit_id}
|
|
@router.get("/learning-gain/compare")
|
|
async def compare_learning_gains(
|
|
unit_ids: str = Query(..., description="Comma-separated unit IDs"),
|
|
class_id: Optional[str] = Query(None),
|
|
time_range: TimeRange = Query(TimeRange.MONTH),
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Compare learning gains across multiple units.
|
|
"""
|
|
unit_list = [u.strip() for u in unit_ids.split(",")]
|
|
comparisons = []
|
|
|
|
for unit_id in unit_list:
|
|
try:
|
|
summary = await get_learning_gain_analysis(unit_id, class_id, time_range)
|
|
comparisons.append({
|
|
"unit_id": unit_id,
|
|
"avg_gain": summary.avg_gain,
|
|
"median_gain": summary.median_gain,
|
|
"total_students": summary.total_students,
|
|
"positive_rate": summary.positive_gain_count / max(summary.total_students, 1),
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Failed to get comparison for {unit_id}: {e}")
|
|
|
|
return {
|
|
"time_range": time_range.value,
|
|
"class_id": class_id,
|
|
"comparisons": sorted(comparisons, key=lambda x: x["avg_gain"], reverse=True),
|
|
}
|
|
|
|
|
|
@router.get("/learning-gain/{unit_id}", response_model=LearningGainSummary)
|
|
async def get_learning_gain_analysis(
|
|
unit_id: str,
|
|
class_id: Optional[str] = Query(None, description="Filter by class"),
|
|
time_range: TimeRange = Query(TimeRange.MONTH, description="Time range for analysis"),
|
|
) -> LearningGainSummary:
|
|
"""
|
|
Get detailed pre/post learning gain analysis for a unit.
|
|
|
|
Shows individual gains, aggregated statistics, and distribution.
|
|
"""
|
|
db = await get_analytics_database()
|
|
individual_gains = []
|
|
|
|
if db:
|
|
try:
|
|
# Get all sessions with pre/post scores for this unit
|
|
sessions = await db.get_unit_sessions_with_scores(
|
|
unit_id=unit_id,
|
|
class_id=class_id,
|
|
time_range=time_range.value
|
|
)
|
|
|
|
for session in sessions:
|
|
if session.get("precheck_score") is not None and session.get("postcheck_score") is not None:
|
|
gain = session["postcheck_score"] - session["precheck_score"]
|
|
individual_gains.append(LearningGainData(
|
|
student_id=session["student_id"],
|
|
student_name=session.get("student_name", session["student_id"][:8]),
|
|
unit_id=unit_id,
|
|
precheck_score=session["precheck_score"],
|
|
postcheck_score=session["postcheck_score"],
|
|
learning_gain=gain,
|
|
))
|
|
except Exception as e:
|
|
logger.error(f"Failed to get learning gain data: {e}")
|
|
|
|
# Calculate statistics
|
|
if not individual_gains:
|
|
# Return empty summary
|
|
return LearningGainSummary(
|
|
unit_id=unit_id,
|
|
unit_title=f"Unit {unit_id}",
|
|
total_students=0,
|
|
avg_precheck=0.0,
|
|
avg_postcheck=0.0,
|
|
avg_gain=0.0,
|
|
median_gain=0.0,
|
|
std_deviation=0.0,
|
|
positive_gain_count=0,
|
|
negative_gain_count=0,
|
|
no_change_count=0,
|
|
gain_distribution={},
|
|
individual_gains=[],
|
|
)
|
|
|
|
gains = [g.learning_gain for g in individual_gains]
|
|
prechecks = [g.precheck_score for g in individual_gains]
|
|
postchecks = [g.postcheck_score for g in individual_gains]
|
|
|
|
avg_gain = statistics.mean(gains)
|
|
median_gain = statistics.median(gains)
|
|
std_dev = statistics.stdev(gains) if len(gains) > 1 else 0.0
|
|
|
|
# Calculate percentiles
|
|
sorted_gains = sorted(gains)
|
|
for data in individual_gains:
|
|
rank = sorted_gains.index(data.learning_gain) + 1
|
|
data.percentile = rank / len(sorted_gains) * 100
|
|
|
|
return LearningGainSummary(
|
|
unit_id=unit_id,
|
|
unit_title=f"Unit {unit_id}",
|
|
total_students=len(individual_gains),
|
|
avg_precheck=statistics.mean(prechecks),
|
|
avg_postcheck=statistics.mean(postchecks),
|
|
avg_gain=avg_gain,
|
|
median_gain=median_gain,
|
|
std_deviation=std_dev,
|
|
positive_gain_count=sum(1 for g in gains if g > 0.01),
|
|
negative_gain_count=sum(1 for g in gains if g < -0.01),
|
|
no_change_count=sum(1 for g in gains if -0.01 <= g <= 0.01),
|
|
gain_distribution=calculate_gain_distribution(gains),
|
|
individual_gains=sorted(individual_gains, key=lambda x: x.learning_gain, reverse=True),
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints - Stop-Level Analytics
|
|
# ==============================================
|
|
|
|
@router.get("/unit/{unit_id}/stops", response_model=UnitPerformanceDetail)
|
|
async def get_unit_stop_analytics(
|
|
unit_id: str,
|
|
class_id: Optional[str] = Query(None),
|
|
time_range: TimeRange = Query(TimeRange.MONTH),
|
|
) -> UnitPerformanceDetail:
|
|
"""
|
|
Get detailed stop-level performance analytics.
|
|
|
|
Identifies bottleneck stops where students struggle most.
|
|
"""
|
|
db = await get_analytics_database()
|
|
stops_data = []
|
|
|
|
if db:
|
|
try:
|
|
# Get stop-level telemetry
|
|
stop_stats = await db.get_stop_performance(
|
|
unit_id=unit_id,
|
|
class_id=class_id,
|
|
time_range=time_range.value
|
|
)
|
|
|
|
for stop in stop_stats:
|
|
difficulty = calculate_difficulty_rating(
|
|
stop.get("success_rate", 0.5),
|
|
stop.get("avg_attempts", 1.0)
|
|
)
|
|
stops_data.append(StopPerformance(
|
|
stop_id=stop["stop_id"],
|
|
stop_label=stop.get("stop_label", stop["stop_id"]),
|
|
attempts_total=stop.get("total_attempts", 0),
|
|
success_rate=stop.get("success_rate", 0.0),
|
|
avg_time_seconds=stop.get("avg_time_seconds", 0.0),
|
|
avg_attempts_before_success=stop.get("avg_attempts", 1.0),
|
|
common_errors=stop.get("common_errors", []),
|
|
difficulty_rating=difficulty,
|
|
))
|
|
|
|
# Get overall unit stats
|
|
unit_stats = await db.get_unit_overall_stats(unit_id, class_id, time_range.value)
|
|
except Exception as e:
|
|
logger.error(f"Failed to get stop analytics: {e}")
|
|
unit_stats = {}
|
|
else:
|
|
unit_stats = {}
|
|
|
|
# Identify bottleneck stops (difficulty > 3.5 or success rate < 0.6)
|
|
bottlenecks = [
|
|
s.stop_id for s in stops_data
|
|
if s.difficulty_rating > 3.5 or s.success_rate < 0.6
|
|
]
|
|
|
|
return UnitPerformanceDetail(
|
|
unit_id=unit_id,
|
|
unit_title=f"Unit {unit_id}",
|
|
template=unit_stats.get("template", "unknown"),
|
|
total_sessions=unit_stats.get("total_sessions", 0),
|
|
completed_sessions=unit_stats.get("completed_sessions", 0),
|
|
completion_rate=unit_stats.get("completion_rate", 0.0),
|
|
avg_duration_minutes=unit_stats.get("avg_duration_minutes", 0.0),
|
|
stops=stops_data,
|
|
bottleneck_stops=bottlenecks,
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints - Misconception Tracking
|
|
# ==============================================
|
|
|
|
@router.get("/misconceptions", response_model=MisconceptionReport)
|
|
async def get_misconception_report(
|
|
class_id: Optional[str] = Query(None),
|
|
unit_id: Optional[str] = Query(None),
|
|
time_range: TimeRange = Query(TimeRange.MONTH),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
) -> MisconceptionReport:
|
|
"""
|
|
Get comprehensive misconception report.
|
|
|
|
Shows most common misconceptions and their frequency.
|
|
"""
|
|
db = await get_analytics_database()
|
|
misconceptions = []
|
|
|
|
if db:
|
|
try:
|
|
raw_misconceptions = await db.get_misconceptions(
|
|
class_id=class_id,
|
|
unit_id=unit_id,
|
|
time_range=time_range.value,
|
|
limit=limit
|
|
)
|
|
|
|
for m in raw_misconceptions:
|
|
misconceptions.append(MisconceptionEntry(
|
|
concept_id=m["concept_id"],
|
|
concept_label=m["concept_label"],
|
|
misconception_text=m["misconception_text"],
|
|
frequency=m["frequency"],
|
|
affected_student_ids=m.get("student_ids", []),
|
|
unit_id=m["unit_id"],
|
|
stop_id=m["stop_id"],
|
|
detected_via=m.get("detected_via", "unknown"),
|
|
first_detected=m.get("first_detected", datetime.utcnow()),
|
|
last_detected=m.get("last_detected", datetime.utcnow()),
|
|
))
|
|
except Exception as e:
|
|
logger.error(f"Failed to get misconceptions: {e}")
|
|
|
|
# Group by unit
|
|
by_unit = {}
|
|
for m in misconceptions:
|
|
if m.unit_id not in by_unit:
|
|
by_unit[m.unit_id] = []
|
|
by_unit[m.unit_id].append(m)
|
|
|
|
# Identify trending misconceptions (would need historical comparison in production)
|
|
trending_up = misconceptions[:3] if misconceptions else []
|
|
resolved = [] # Would identify from historical data
|
|
|
|
return MisconceptionReport(
|
|
class_id=class_id,
|
|
time_range=time_range.value,
|
|
total_misconceptions=sum(m.frequency for m in misconceptions),
|
|
unique_concepts=len(set(m.concept_id for m in misconceptions)),
|
|
most_common=sorted(misconceptions, key=lambda x: x.frequency, reverse=True)[:10],
|
|
by_unit=by_unit,
|
|
trending_up=trending_up,
|
|
resolved=resolved,
|
|
)
|
|
|
|
|
|
@router.get("/misconceptions/student/{student_id}")
|
|
async def get_student_misconceptions(
|
|
student_id: str,
|
|
time_range: TimeRange = Query(TimeRange.ALL),
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get misconceptions for a specific student.
|
|
|
|
Useful for personalized remediation.
|
|
"""
|
|
db = await get_analytics_database()
|
|
|
|
if db:
|
|
try:
|
|
misconceptions = await db.get_student_misconceptions(
|
|
student_id=student_id,
|
|
time_range=time_range.value
|
|
)
|
|
return {
|
|
"student_id": student_id,
|
|
"misconceptions": misconceptions,
|
|
"recommended_remediation": [
|
|
{"concept": m["concept_label"], "activity": f"Review {m['unit_id']}/{m['stop_id']}"}
|
|
for m in misconceptions[:5]
|
|
]
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get student misconceptions: {e}")
|
|
|
|
return {
|
|
"student_id": student_id,
|
|
"misconceptions": [],
|
|
"recommended_remediation": [],
|
|
}
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints - Student Progress Timeline
|
|
# ==============================================
|
|
|
|
@router.get("/student/{student_id}/timeline", response_model=StudentProgressTimeline)
|
|
async def get_student_timeline(
|
|
student_id: str,
|
|
time_range: TimeRange = Query(TimeRange.ALL),
|
|
) -> StudentProgressTimeline:
|
|
"""
|
|
Get detailed progress timeline for a student.
|
|
|
|
Shows all unit sessions and performance trend.
|
|
"""
|
|
db = await get_analytics_database()
|
|
timeline = []
|
|
scores = []
|
|
|
|
if db:
|
|
try:
|
|
sessions = await db.get_student_sessions(
|
|
student_id=student_id,
|
|
time_range=time_range.value
|
|
)
|
|
|
|
for session in sessions:
|
|
timeline.append({
|
|
"date": session.get("started_at"),
|
|
"unit_id": session.get("unit_id"),
|
|
"completed": session.get("completed_at") is not None,
|
|
"precheck": session.get("precheck_score"),
|
|
"postcheck": session.get("postcheck_score"),
|
|
"duration_minutes": session.get("duration_seconds", 0) // 60,
|
|
})
|
|
if session.get("postcheck_score") is not None:
|
|
scores.append(session["postcheck_score"])
|
|
except Exception as e:
|
|
logger.error(f"Failed to get student timeline: {e}")
|
|
|
|
trend = calculate_trend(scores) if scores else "insufficient_data"
|
|
|
|
return StudentProgressTimeline(
|
|
student_id=student_id,
|
|
student_name=f"Student {student_id[:8]}", # Would load actual name
|
|
units_completed=sum(1 for t in timeline if t["completed"]),
|
|
total_time_minutes=sum(t["duration_minutes"] for t in timeline),
|
|
avg_score=statistics.mean(scores) if scores else 0.0,
|
|
trend=trend,
|
|
timeline=timeline,
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints - Class Comparison
|
|
# ==============================================
|
|
|
|
@router.get("/compare/classes", response_model=List[ClassComparisonData])
|
|
async def compare_classes(
|
|
class_ids: str = Query(..., description="Comma-separated class IDs"),
|
|
time_range: TimeRange = Query(TimeRange.MONTH),
|
|
) -> List[ClassComparisonData]:
|
|
"""
|
|
Compare performance across multiple classes.
|
|
"""
|
|
class_list = [c.strip() for c in class_ids.split(",")]
|
|
comparisons = []
|
|
|
|
db = await get_analytics_database()
|
|
if db:
|
|
for class_id in class_list:
|
|
try:
|
|
stats = await db.get_class_aggregate_stats(class_id, time_range.value)
|
|
comparisons.append(ClassComparisonData(
|
|
class_id=class_id,
|
|
class_name=stats.get("class_name", f"Klasse {class_id[:8]}"),
|
|
student_count=stats.get("student_count", 0),
|
|
units_assigned=stats.get("units_assigned", 0),
|
|
avg_completion_rate=stats.get("avg_completion_rate", 0.0),
|
|
avg_learning_gain=stats.get("avg_learning_gain", 0.0),
|
|
avg_time_per_unit=stats.get("avg_time_per_unit", 0.0),
|
|
))
|
|
except Exception as e:
|
|
logger.error(f"Failed to get stats for class {class_id}: {e}")
|
|
|
|
return sorted(comparisons, key=lambda x: x.avg_learning_gain, reverse=True)
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints - Export
|
|
# ==============================================
|
|
|
|
@router.get("/export/learning-gains")
|
|
async def export_learning_gains(
|
|
unit_id: Optional[str] = Query(None),
|
|
class_id: Optional[str] = Query(None),
|
|
time_range: TimeRange = Query(TimeRange.ALL),
|
|
format: ExportFormat = Query(ExportFormat.JSON),
|
|
) -> Any:
|
|
"""
|
|
Export learning gain data.
|
|
"""
|
|
from fastapi.responses import Response
|
|
|
|
db = await get_analytics_database()
|
|
data = []
|
|
|
|
if db:
|
|
try:
|
|
data = await db.export_learning_gains(
|
|
unit_id=unit_id,
|
|
class_id=class_id,
|
|
time_range=time_range.value
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to export data: {e}")
|
|
|
|
if format == ExportFormat.CSV:
|
|
# Convert to CSV
|
|
if not data:
|
|
csv_content = "student_id,unit_id,precheck,postcheck,gain\n"
|
|
else:
|
|
csv_content = "student_id,unit_id,precheck,postcheck,gain\n"
|
|
for row in data:
|
|
csv_content += f"{row['student_id']},{row['unit_id']},{row.get('precheck', '')},{row.get('postcheck', '')},{row.get('gain', '')}\n"
|
|
|
|
return Response(
|
|
content=csv_content,
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=learning_gains.csv"}
|
|
)
|
|
|
|
return {
|
|
"export_date": datetime.utcnow().isoformat(),
|
|
"filters": {
|
|
"unit_id": unit_id,
|
|
"class_id": class_id,
|
|
"time_range": time_range.value,
|
|
},
|
|
"data": data,
|
|
}
|
|
|
|
|
|
@router.get("/export/misconceptions")
|
|
async def export_misconceptions(
|
|
class_id: Optional[str] = Query(None),
|
|
format: ExportFormat = Query(ExportFormat.JSON),
|
|
) -> Any:
|
|
"""
|
|
Export misconception data for further analysis.
|
|
"""
|
|
report = await get_misconception_report(
|
|
class_id=class_id,
|
|
unit_id=None,
|
|
time_range=TimeRange.MONTH,
|
|
limit=100
|
|
)
|
|
|
|
if format == ExportFormat.CSV:
|
|
from fastapi.responses import Response
|
|
csv_content = "concept_id,concept_label,misconception,frequency,unit_id,stop_id\n"
|
|
for m in report.most_common:
|
|
csv_content += f'"{m.concept_id}","{m.concept_label}","{m.misconception_text}",{m.frequency},"{m.unit_id}","{m.stop_id}"\n'
|
|
|
|
return Response(
|
|
content=csv_content,
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=misconceptions.csv"}
|
|
)
|
|
|
|
return {
|
|
"export_date": datetime.utcnow().isoformat(),
|
|
"class_id": class_id,
|
|
"total_entries": len(report.most_common),
|
|
"data": [m.model_dump() for m in report.most_common],
|
|
}
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints - Dashboard Aggregates
|
|
# ==============================================
|
|
|
|
@router.get("/dashboard/overview")
|
|
async def get_analytics_overview(
|
|
time_range: TimeRange = Query(TimeRange.MONTH),
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get high-level analytics overview for dashboard.
|
|
"""
|
|
db = await get_analytics_database()
|
|
|
|
if db:
|
|
try:
|
|
overview = await db.get_analytics_overview(time_range.value)
|
|
return overview
|
|
except Exception as e:
|
|
logger.error(f"Failed to get analytics overview: {e}")
|
|
|
|
return {
|
|
"time_range": time_range.value,
|
|
"total_sessions": 0,
|
|
"unique_students": 0,
|
|
"avg_completion_rate": 0.0,
|
|
"avg_learning_gain": 0.0,
|
|
"most_played_units": [],
|
|
"struggling_concepts": [],
|
|
"active_classes": 0,
|
|
}
|
|
|
|
|
|
@router.get("/health")
|
|
async def health_check() -> Dict[str, Any]:
|
|
"""Health check for analytics API."""
|
|
db = await get_analytics_database()
|
|
return {
|
|
"status": "healthy",
|
|
"service": "unit-analytics",
|
|
"database": "connected" if db else "disconnected",
|
|
}
|