This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/unit_analytics_api.py
Benjamin Admin bfdaf63ba9 fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

752 lines
24 KiB
Python

# ==============================================
# Breakpilot Drive - Unit Analytics API
# ==============================================
# Erweiterte Analytics fuer Lernfortschritt:
# - Pre/Post Gain Visualisierung
# - Misconception-Tracking
# - Stop-Level Analytics
# - Aggregierte Klassen-Statistiken
# - Export-Funktionen
from fastapi import APIRouter, HTTPException, Query, Depends, Request
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from enum import Enum
import os
import logging
import statistics
logger = logging.getLogger(__name__)
# Feature flags
USE_DATABASE = os.getenv("GAME_USE_DATABASE", "true").lower() == "true"
router = APIRouter(prefix="/api/analytics", tags=["Unit Analytics"])
# ==============================================
# Pydantic Models
# ==============================================
class TimeRange(str, Enum):
"""Time range for analytics queries"""
WEEK = "week"
MONTH = "month"
QUARTER = "quarter"
ALL = "all"
class LearningGainData(BaseModel):
"""Pre/Post learning gain data point"""
student_id: str
student_name: str
unit_id: str
precheck_score: float
postcheck_score: float
learning_gain: float
percentile: Optional[float] = None
class LearningGainSummary(BaseModel):
"""Aggregated learning gain statistics"""
unit_id: str
unit_title: str
total_students: int
avg_precheck: float
avg_postcheck: float
avg_gain: float
median_gain: float
std_deviation: float
positive_gain_count: int
negative_gain_count: int
no_change_count: int
gain_distribution: Dict[str, int] # "-20+", "-10-0", "0-10", "10-20", "20+"
individual_gains: List[LearningGainData]
class StopPerformance(BaseModel):
"""Performance data for a single stop"""
stop_id: str
stop_label: str
attempts_total: int
success_rate: float
avg_time_seconds: float
avg_attempts_before_success: float
common_errors: List[str]
difficulty_rating: float # 1-5 based on performance
class UnitPerformanceDetail(BaseModel):
"""Detailed unit performance breakdown"""
unit_id: str
unit_title: str
template: str
total_sessions: int
completed_sessions: int
completion_rate: float
avg_duration_minutes: float
stops: List[StopPerformance]
bottleneck_stops: List[str] # Stops where students struggle most
class MisconceptionEntry(BaseModel):
"""Individual misconception tracking"""
concept_id: str
concept_label: str
misconception_text: str
frequency: int
affected_student_ids: List[str]
unit_id: str
stop_id: str
detected_via: str # "precheck", "postcheck", "interaction"
first_detected: datetime
last_detected: datetime
class MisconceptionReport(BaseModel):
"""Comprehensive misconception report"""
class_id: Optional[str]
time_range: str
total_misconceptions: int
unique_concepts: int
most_common: List[MisconceptionEntry]
by_unit: Dict[str, List[MisconceptionEntry]]
trending_up: List[MisconceptionEntry] # Getting more frequent
resolved: List[MisconceptionEntry] # No longer appearing
class StudentProgressTimeline(BaseModel):
"""Timeline of student progress"""
student_id: str
student_name: str
units_completed: int
total_time_minutes: int
avg_score: float
trend: str # "improving", "stable", "declining"
timeline: List[Dict[str, Any]] # List of session events
class ClassComparisonData(BaseModel):
"""Data for comparing class performance"""
class_id: str
class_name: str
student_count: int
units_assigned: int
avg_completion_rate: float
avg_learning_gain: float
avg_time_per_unit: float
class ExportFormat(str, Enum):
"""Export format options"""
JSON = "json"
CSV = "csv"
# ==============================================
# Database Integration
# ==============================================
_analytics_db = None
async def get_analytics_database():
"""Get analytics database instance."""
global _analytics_db
if not USE_DATABASE:
return None
if _analytics_db is None:
try:
from unit.database import get_analytics_db
_analytics_db = await get_analytics_db()
logger.info("Analytics database initialized")
except ImportError:
logger.warning("Analytics database module not available")
except Exception as e:
logger.warning(f"Analytics database not available: {e}")
return _analytics_db
# ==============================================
# Helper Functions
# ==============================================
def calculate_gain_distribution(gains: List[float]) -> Dict[str, int]:
"""Calculate distribution of learning gains into buckets."""
distribution = {
"< -20%": 0,
"-20% to -10%": 0,
"-10% to 0%": 0,
"0% to 10%": 0,
"10% to 20%": 0,
"> 20%": 0,
}
for gain in gains:
gain_percent = gain * 100
if gain_percent < -20:
distribution["< -20%"] += 1
elif gain_percent < -10:
distribution["-20% to -10%"] += 1
elif gain_percent < 0:
distribution["-10% to 0%"] += 1
elif gain_percent < 10:
distribution["0% to 10%"] += 1
elif gain_percent < 20:
distribution["10% to 20%"] += 1
else:
distribution["> 20%"] += 1
return distribution
def calculate_trend(scores: List[float]) -> str:
"""Calculate trend from a series of scores."""
if len(scores) < 3:
return "insufficient_data"
# Simple linear regression
n = len(scores)
x_mean = (n - 1) / 2
y_mean = sum(scores) / n
numerator = sum((i - x_mean) * (scores[i] - y_mean) for i in range(n))
denominator = sum((i - x_mean) ** 2 for i in range(n))
if denominator == 0:
return "stable"
slope = numerator / denominator
if slope > 0.05:
return "improving"
elif slope < -0.05:
return "declining"
else:
return "stable"
def calculate_difficulty_rating(success_rate: float, avg_attempts: float) -> float:
"""Calculate difficulty rating 1-5 based on success metrics."""
# Lower success rate and higher attempts = higher difficulty
base_difficulty = (1 - success_rate) * 3 + 1 # 1-4 range
attempt_modifier = min(avg_attempts - 1, 1) # 0-1 range
return min(5.0, base_difficulty + attempt_modifier)
# ==============================================
# API Endpoints - Learning Gain
# ==============================================
# NOTE: Static routes must come BEFORE dynamic routes like /{unit_id}
@router.get("/learning-gain/compare")
async def compare_learning_gains(
unit_ids: str = Query(..., description="Comma-separated unit IDs"),
class_id: Optional[str] = Query(None),
time_range: TimeRange = Query(TimeRange.MONTH),
) -> Dict[str, Any]:
"""
Compare learning gains across multiple units.
"""
unit_list = [u.strip() for u in unit_ids.split(",")]
comparisons = []
for unit_id in unit_list:
try:
summary = await get_learning_gain_analysis(unit_id, class_id, time_range)
comparisons.append({
"unit_id": unit_id,
"avg_gain": summary.avg_gain,
"median_gain": summary.median_gain,
"total_students": summary.total_students,
"positive_rate": summary.positive_gain_count / max(summary.total_students, 1),
})
except Exception as e:
logger.error(f"Failed to get comparison for {unit_id}: {e}")
return {
"time_range": time_range.value,
"class_id": class_id,
"comparisons": sorted(comparisons, key=lambda x: x["avg_gain"], reverse=True),
}
@router.get("/learning-gain/{unit_id}", response_model=LearningGainSummary)
async def get_learning_gain_analysis(
unit_id: str,
class_id: Optional[str] = Query(None, description="Filter by class"),
time_range: TimeRange = Query(TimeRange.MONTH, description="Time range for analysis"),
) -> LearningGainSummary:
"""
Get detailed pre/post learning gain analysis for a unit.
Shows individual gains, aggregated statistics, and distribution.
"""
db = await get_analytics_database()
individual_gains = []
if db:
try:
# Get all sessions with pre/post scores for this unit
sessions = await db.get_unit_sessions_with_scores(
unit_id=unit_id,
class_id=class_id,
time_range=time_range.value
)
for session in sessions:
if session.get("precheck_score") is not None and session.get("postcheck_score") is not None:
gain = session["postcheck_score"] - session["precheck_score"]
individual_gains.append(LearningGainData(
student_id=session["student_id"],
student_name=session.get("student_name", session["student_id"][:8]),
unit_id=unit_id,
precheck_score=session["precheck_score"],
postcheck_score=session["postcheck_score"],
learning_gain=gain,
))
except Exception as e:
logger.error(f"Failed to get learning gain data: {e}")
# Calculate statistics
if not individual_gains:
# Return empty summary
return LearningGainSummary(
unit_id=unit_id,
unit_title=f"Unit {unit_id}",
total_students=0,
avg_precheck=0.0,
avg_postcheck=0.0,
avg_gain=0.0,
median_gain=0.0,
std_deviation=0.0,
positive_gain_count=0,
negative_gain_count=0,
no_change_count=0,
gain_distribution={},
individual_gains=[],
)
gains = [g.learning_gain for g in individual_gains]
prechecks = [g.precheck_score for g in individual_gains]
postchecks = [g.postcheck_score for g in individual_gains]
avg_gain = statistics.mean(gains)
median_gain = statistics.median(gains)
std_dev = statistics.stdev(gains) if len(gains) > 1 else 0.0
# Calculate percentiles
sorted_gains = sorted(gains)
for data in individual_gains:
rank = sorted_gains.index(data.learning_gain) + 1
data.percentile = rank / len(sorted_gains) * 100
return LearningGainSummary(
unit_id=unit_id,
unit_title=f"Unit {unit_id}",
total_students=len(individual_gains),
avg_precheck=statistics.mean(prechecks),
avg_postcheck=statistics.mean(postchecks),
avg_gain=avg_gain,
median_gain=median_gain,
std_deviation=std_dev,
positive_gain_count=sum(1 for g in gains if g > 0.01),
negative_gain_count=sum(1 for g in gains if g < -0.01),
no_change_count=sum(1 for g in gains if -0.01 <= g <= 0.01),
gain_distribution=calculate_gain_distribution(gains),
individual_gains=sorted(individual_gains, key=lambda x: x.learning_gain, reverse=True),
)
# ==============================================
# API Endpoints - Stop-Level Analytics
# ==============================================
@router.get("/unit/{unit_id}/stops", response_model=UnitPerformanceDetail)
async def get_unit_stop_analytics(
unit_id: str,
class_id: Optional[str] = Query(None),
time_range: TimeRange = Query(TimeRange.MONTH),
) -> UnitPerformanceDetail:
"""
Get detailed stop-level performance analytics.
Identifies bottleneck stops where students struggle most.
"""
db = await get_analytics_database()
stops_data = []
if db:
try:
# Get stop-level telemetry
stop_stats = await db.get_stop_performance(
unit_id=unit_id,
class_id=class_id,
time_range=time_range.value
)
for stop in stop_stats:
difficulty = calculate_difficulty_rating(
stop.get("success_rate", 0.5),
stop.get("avg_attempts", 1.0)
)
stops_data.append(StopPerformance(
stop_id=stop["stop_id"],
stop_label=stop.get("stop_label", stop["stop_id"]),
attempts_total=stop.get("total_attempts", 0),
success_rate=stop.get("success_rate", 0.0),
avg_time_seconds=stop.get("avg_time_seconds", 0.0),
avg_attempts_before_success=stop.get("avg_attempts", 1.0),
common_errors=stop.get("common_errors", []),
difficulty_rating=difficulty,
))
# Get overall unit stats
unit_stats = await db.get_unit_overall_stats(unit_id, class_id, time_range.value)
except Exception as e:
logger.error(f"Failed to get stop analytics: {e}")
unit_stats = {}
else:
unit_stats = {}
# Identify bottleneck stops (difficulty > 3.5 or success rate < 0.6)
bottlenecks = [
s.stop_id for s in stops_data
if s.difficulty_rating > 3.5 or s.success_rate < 0.6
]
return UnitPerformanceDetail(
unit_id=unit_id,
unit_title=f"Unit {unit_id}",
template=unit_stats.get("template", "unknown"),
total_sessions=unit_stats.get("total_sessions", 0),
completed_sessions=unit_stats.get("completed_sessions", 0),
completion_rate=unit_stats.get("completion_rate", 0.0),
avg_duration_minutes=unit_stats.get("avg_duration_minutes", 0.0),
stops=stops_data,
bottleneck_stops=bottlenecks,
)
# ==============================================
# API Endpoints - Misconception Tracking
# ==============================================
@router.get("/misconceptions", response_model=MisconceptionReport)
async def get_misconception_report(
class_id: Optional[str] = Query(None),
unit_id: Optional[str] = Query(None),
time_range: TimeRange = Query(TimeRange.MONTH),
limit: int = Query(20, ge=1, le=100),
) -> MisconceptionReport:
"""
Get comprehensive misconception report.
Shows most common misconceptions and their frequency.
"""
db = await get_analytics_database()
misconceptions = []
if db:
try:
raw_misconceptions = await db.get_misconceptions(
class_id=class_id,
unit_id=unit_id,
time_range=time_range.value,
limit=limit
)
for m in raw_misconceptions:
misconceptions.append(MisconceptionEntry(
concept_id=m["concept_id"],
concept_label=m["concept_label"],
misconception_text=m["misconception_text"],
frequency=m["frequency"],
affected_student_ids=m.get("student_ids", []),
unit_id=m["unit_id"],
stop_id=m["stop_id"],
detected_via=m.get("detected_via", "unknown"),
first_detected=m.get("first_detected", datetime.utcnow()),
last_detected=m.get("last_detected", datetime.utcnow()),
))
except Exception as e:
logger.error(f"Failed to get misconceptions: {e}")
# Group by unit
by_unit = {}
for m in misconceptions:
if m.unit_id not in by_unit:
by_unit[m.unit_id] = []
by_unit[m.unit_id].append(m)
# Identify trending misconceptions (would need historical comparison in production)
trending_up = misconceptions[:3] if misconceptions else []
resolved = [] # Would identify from historical data
return MisconceptionReport(
class_id=class_id,
time_range=time_range.value,
total_misconceptions=sum(m.frequency for m in misconceptions),
unique_concepts=len(set(m.concept_id for m in misconceptions)),
most_common=sorted(misconceptions, key=lambda x: x.frequency, reverse=True)[:10],
by_unit=by_unit,
trending_up=trending_up,
resolved=resolved,
)
@router.get("/misconceptions/student/{student_id}")
async def get_student_misconceptions(
student_id: str,
time_range: TimeRange = Query(TimeRange.ALL),
) -> Dict[str, Any]:
"""
Get misconceptions for a specific student.
Useful for personalized remediation.
"""
db = await get_analytics_database()
if db:
try:
misconceptions = await db.get_student_misconceptions(
student_id=student_id,
time_range=time_range.value
)
return {
"student_id": student_id,
"misconceptions": misconceptions,
"recommended_remediation": [
{"concept": m["concept_label"], "activity": f"Review {m['unit_id']}/{m['stop_id']}"}
for m in misconceptions[:5]
]
}
except Exception as e:
logger.error(f"Failed to get student misconceptions: {e}")
return {
"student_id": student_id,
"misconceptions": [],
"recommended_remediation": [],
}
# ==============================================
# API Endpoints - Student Progress Timeline
# ==============================================
@router.get("/student/{student_id}/timeline", response_model=StudentProgressTimeline)
async def get_student_timeline(
student_id: str,
time_range: TimeRange = Query(TimeRange.ALL),
) -> StudentProgressTimeline:
"""
Get detailed progress timeline for a student.
Shows all unit sessions and performance trend.
"""
db = await get_analytics_database()
timeline = []
scores = []
if db:
try:
sessions = await db.get_student_sessions(
student_id=student_id,
time_range=time_range.value
)
for session in sessions:
timeline.append({
"date": session.get("started_at"),
"unit_id": session.get("unit_id"),
"completed": session.get("completed_at") is not None,
"precheck": session.get("precheck_score"),
"postcheck": session.get("postcheck_score"),
"duration_minutes": session.get("duration_seconds", 0) // 60,
})
if session.get("postcheck_score") is not None:
scores.append(session["postcheck_score"])
except Exception as e:
logger.error(f"Failed to get student timeline: {e}")
trend = calculate_trend(scores) if scores else "insufficient_data"
return StudentProgressTimeline(
student_id=student_id,
student_name=f"Student {student_id[:8]}", # Would load actual name
units_completed=sum(1 for t in timeline if t["completed"]),
total_time_minutes=sum(t["duration_minutes"] for t in timeline),
avg_score=statistics.mean(scores) if scores else 0.0,
trend=trend,
timeline=timeline,
)
# ==============================================
# API Endpoints - Class Comparison
# ==============================================
@router.get("/compare/classes", response_model=List[ClassComparisonData])
async def compare_classes(
class_ids: str = Query(..., description="Comma-separated class IDs"),
time_range: TimeRange = Query(TimeRange.MONTH),
) -> List[ClassComparisonData]:
"""
Compare performance across multiple classes.
"""
class_list = [c.strip() for c in class_ids.split(",")]
comparisons = []
db = await get_analytics_database()
if db:
for class_id in class_list:
try:
stats = await db.get_class_aggregate_stats(class_id, time_range.value)
comparisons.append(ClassComparisonData(
class_id=class_id,
class_name=stats.get("class_name", f"Klasse {class_id[:8]}"),
student_count=stats.get("student_count", 0),
units_assigned=stats.get("units_assigned", 0),
avg_completion_rate=stats.get("avg_completion_rate", 0.0),
avg_learning_gain=stats.get("avg_learning_gain", 0.0),
avg_time_per_unit=stats.get("avg_time_per_unit", 0.0),
))
except Exception as e:
logger.error(f"Failed to get stats for class {class_id}: {e}")
return sorted(comparisons, key=lambda x: x.avg_learning_gain, reverse=True)
# ==============================================
# API Endpoints - Export
# ==============================================
@router.get("/export/learning-gains")
async def export_learning_gains(
unit_id: Optional[str] = Query(None),
class_id: Optional[str] = Query(None),
time_range: TimeRange = Query(TimeRange.ALL),
format: ExportFormat = Query(ExportFormat.JSON),
) -> Any:
"""
Export learning gain data.
"""
from fastapi.responses import Response
db = await get_analytics_database()
data = []
if db:
try:
data = await db.export_learning_gains(
unit_id=unit_id,
class_id=class_id,
time_range=time_range.value
)
except Exception as e:
logger.error(f"Failed to export data: {e}")
if format == ExportFormat.CSV:
# Convert to CSV
if not data:
csv_content = "student_id,unit_id,precheck,postcheck,gain\n"
else:
csv_content = "student_id,unit_id,precheck,postcheck,gain\n"
for row in data:
csv_content += f"{row['student_id']},{row['unit_id']},{row.get('precheck', '')},{row.get('postcheck', '')},{row.get('gain', '')}\n"
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=learning_gains.csv"}
)
return {
"export_date": datetime.utcnow().isoformat(),
"filters": {
"unit_id": unit_id,
"class_id": class_id,
"time_range": time_range.value,
},
"data": data,
}
@router.get("/export/misconceptions")
async def export_misconceptions(
class_id: Optional[str] = Query(None),
format: ExportFormat = Query(ExportFormat.JSON),
) -> Any:
"""
Export misconception data for further analysis.
"""
report = await get_misconception_report(
class_id=class_id,
unit_id=None,
time_range=TimeRange.MONTH,
limit=100
)
if format == ExportFormat.CSV:
from fastapi.responses import Response
csv_content = "concept_id,concept_label,misconception,frequency,unit_id,stop_id\n"
for m in report.most_common:
csv_content += f'"{m.concept_id}","{m.concept_label}","{m.misconception_text}",{m.frequency},"{m.unit_id}","{m.stop_id}"\n'
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=misconceptions.csv"}
)
return {
"export_date": datetime.utcnow().isoformat(),
"class_id": class_id,
"total_entries": len(report.most_common),
"data": [m.model_dump() for m in report.most_common],
}
# ==============================================
# API Endpoints - Dashboard Aggregates
# ==============================================
@router.get("/dashboard/overview")
async def get_analytics_overview(
time_range: TimeRange = Query(TimeRange.MONTH),
) -> Dict[str, Any]:
"""
Get high-level analytics overview for dashboard.
"""
db = await get_analytics_database()
if db:
try:
overview = await db.get_analytics_overview(time_range.value)
return overview
except Exception as e:
logger.error(f"Failed to get analytics overview: {e}")
return {
"time_range": time_range.value,
"total_sessions": 0,
"unique_students": 0,
"avg_completion_rate": 0.0,
"avg_learning_gain": 0.0,
"most_played_units": [],
"struggling_concepts": [],
"active_classes": 0,
}
@router.get("/health")
async def health_check() -> Dict[str, Any]:
"""Health check for analytics API."""
db = await get_analytics_database()
return {
"status": "healthy",
"service": "unit-analytics",
"database": "connected" if db else "disconnected",
}