Files
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

786 lines
29 KiB
Python

# ==============================================
# Breakpilot Drive - Game Database
# ==============================================
# Async PostgreSQL database access for game sessions
# and student learning state.
import os
import json
import logging
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from enum import IntEnum
logger = logging.getLogger(__name__)
# Database URL from environment
GAME_DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@localhost:5432/breakpilot"
)
class LearningLevel(IntEnum):
"""Learning level enum mapping to grade ranges."""
BEGINNER = 1 # Klasse 2-3
ELEMENTARY = 2 # Klasse 3-4
INTERMEDIATE = 3 # Klasse 4-5
ADVANCED = 4 # Klasse 5-6
EXPERT = 5 # Klasse 6+
@dataclass
class StudentLearningState:
"""Student learning state data model."""
id: Optional[str] = None
student_id: str = ""
overall_level: int = 3
math_level: float = 3.0
german_level: float = 3.0
english_level: float = 3.0
total_play_time_minutes: int = 0
total_sessions: int = 0
questions_answered: int = 0
questions_correct: int = 0
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"id": self.id,
"student_id": self.student_id,
"overall_level": self.overall_level,
"math_level": self.math_level,
"german_level": self.german_level,
"english_level": self.english_level,
"total_play_time_minutes": self.total_play_time_minutes,
"total_sessions": self.total_sessions,
"questions_answered": self.questions_answered,
"questions_correct": self.questions_correct,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
@property
def accuracy(self) -> float:
"""Calculate overall accuracy percentage."""
if self.questions_answered == 0:
return 0.0
return self.questions_correct / self.questions_answered
@dataclass
class GameSessionRecord:
"""Game session record for database storage."""
id: Optional[str] = None
student_id: str = ""
game_mode: str = "video"
duration_seconds: int = 0
distance_traveled: float = 0.0
score: int = 0
questions_answered: int = 0
questions_correct: int = 0
difficulty_level: int = 3
started_at: Optional[datetime] = None
ended_at: Optional[datetime] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class GameQuizAnswer:
"""Individual quiz answer record."""
id: Optional[str] = None
session_id: Optional[str] = None
question_id: str = ""
subject: str = ""
difficulty: int = 3
is_correct: bool = False
answer_time_ms: int = 0
created_at: Optional[datetime] = None
@dataclass
class Achievement:
"""Achievement definition and unlock status."""
id: str
name: str
description: str
icon: str = "star"
category: str = "general" # general, streak, accuracy, time, score
threshold: int = 1
unlocked: bool = False
unlocked_at: Optional[datetime] = None
progress: int = 0
# Achievement definitions (static, not in DB)
ACHIEVEMENTS = [
# Erste Schritte
Achievement(id="first_game", name="Erste Fahrt", description="Spiele dein erstes Spiel", icon="rocket", category="general", threshold=1),
Achievement(id="five_games", name="Regelmaessiger Fahrer", description="Spiele 5 Spiele", icon="car", category="general", threshold=5),
Achievement(id="twenty_games", name="Erfahrener Pilot", description="Spiele 20 Spiele", icon="trophy", category="general", threshold=20),
# Serien
Achievement(id="streak_3", name="Guter Start", description="3 richtige Antworten hintereinander", icon="fire", category="streak", threshold=3),
Achievement(id="streak_5", name="Auf Feuer", description="5 richtige Antworten hintereinander", icon="fire", category="streak", threshold=5),
Achievement(id="streak_10", name="Unaufhaltsam", description="10 richtige Antworten hintereinander", icon="fire", category="streak", threshold=10),
# Genauigkeit
Achievement(id="perfect_game", name="Perfektes Spiel", description="100% richtig in einem Spiel (min. 5 Fragen)", icon="star", category="accuracy", threshold=100),
Achievement(id="accuracy_80", name="Scharfschuetze", description="80% Gesamtgenauigkeit (min. 50 Fragen)", icon="target", category="accuracy", threshold=80),
# Zeit
Achievement(id="play_30min", name="Ausdauer", description="30 Minuten Gesamtspielzeit", icon="clock", category="time", threshold=30),
Achievement(id="play_60min", name="Marathon", description="60 Minuten Gesamtspielzeit", icon="clock", category="time", threshold=60),
# Score
Achievement(id="score_5000", name="Punktejaeger", description="5.000 Punkte in einem Spiel", icon="gem", category="score", threshold=5000),
Achievement(id="score_10000", name="Highscore Hero", description="10.000 Punkte in einem Spiel", icon="crown", category="score", threshold=10000),
# Level
Achievement(id="level_up", name="Aufsteiger", description="Erreiche Level 2", icon="arrow-up", category="level", threshold=2),
Achievement(id="master", name="Meister", description="Erreiche Level 5", icon="medal", category="level", threshold=5),
]
class GameDatabase:
"""
Async database access for Breakpilot Drive game data.
Uses asyncpg for PostgreSQL access with connection pooling.
"""
def __init__(self, database_url: Optional[str] = None):
self.database_url = database_url or GAME_DB_URL
self._pool = None
self._connected = False
async def connect(self):
"""Initialize connection pool."""
if self._connected:
return
try:
import asyncpg
self._pool = await asyncpg.create_pool(
self.database_url,
min_size=2,
max_size=10,
)
self._connected = True
logger.info("Game database connected")
except ImportError:
logger.warning("asyncpg not installed, database features disabled")
except Exception as e:
logger.error(f"Game database connection failed: {e}")
async def close(self):
"""Close connection pool."""
if self._pool:
await self._pool.close()
self._connected = False
async def _ensure_connected(self):
"""Ensure database is connected."""
if not self._connected:
await self.connect()
# ==============================================
# Learning State Methods
# ==============================================
async def get_learning_state(self, student_id: str) -> Optional[StudentLearningState]:
"""Get learning state for a student."""
await self._ensure_connected()
if not self._pool:
return None
try:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT id, student_id, overall_level, math_level, german_level,
english_level, total_play_time_minutes, total_sessions,
questions_answered, questions_correct, created_at, updated_at
FROM student_learning_state
WHERE student_id = $1
""",
student_id
)
if row:
return StudentLearningState(
id=str(row["id"]),
student_id=str(row["student_id"]),
overall_level=row["overall_level"],
math_level=float(row["math_level"]),
german_level=float(row["german_level"]),
english_level=float(row["english_level"]),
total_play_time_minutes=row["total_play_time_minutes"],
total_sessions=row["total_sessions"],
questions_answered=row["questions_answered"] or 0,
questions_correct=row["questions_correct"] or 0,
created_at=row["created_at"],
updated_at=row["updated_at"],
)
except Exception as e:
logger.error(f"Failed to get learning state: {e}")
return None
async def create_or_update_learning_state(
self,
student_id: str,
overall_level: int = 3,
math_level: float = 3.0,
german_level: float = 3.0,
english_level: float = 3.0,
) -> Optional[StudentLearningState]:
"""Create or update learning state for a student."""
await self._ensure_connected()
if not self._pool:
return None
try:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO student_learning_state (
student_id, overall_level, math_level, german_level, english_level
) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (student_id) DO UPDATE SET
overall_level = EXCLUDED.overall_level,
math_level = EXCLUDED.math_level,
german_level = EXCLUDED.german_level,
english_level = EXCLUDED.english_level,
updated_at = NOW()
RETURNING id, student_id, overall_level, math_level, german_level,
english_level, total_play_time_minutes, total_sessions,
questions_answered, questions_correct, created_at, updated_at
""",
student_id, overall_level, math_level, german_level, english_level
)
if row:
return StudentLearningState(
id=str(row["id"]),
student_id=str(row["student_id"]),
overall_level=row["overall_level"],
math_level=float(row["math_level"]),
german_level=float(row["german_level"]),
english_level=float(row["english_level"]),
total_play_time_minutes=row["total_play_time_minutes"],
total_sessions=row["total_sessions"],
questions_answered=row["questions_answered"] or 0,
questions_correct=row["questions_correct"] or 0,
created_at=row["created_at"],
updated_at=row["updated_at"],
)
except Exception as e:
logger.error(f"Failed to create/update learning state: {e}")
return None
async def update_learning_stats(
self,
student_id: str,
duration_minutes: int,
questions_answered: int,
questions_correct: int,
new_level: Optional[int] = None,
) -> bool:
"""Update learning stats after a game session."""
await self._ensure_connected()
if not self._pool:
return False
try:
async with self._pool.acquire() as conn:
if new_level is not None:
await conn.execute(
"""
UPDATE student_learning_state SET
total_play_time_minutes = total_play_time_minutes + $2,
total_sessions = total_sessions + 1,
questions_answered = COALESCE(questions_answered, 0) + $3,
questions_correct = COALESCE(questions_correct, 0) + $4,
overall_level = $5,
updated_at = NOW()
WHERE student_id = $1
""",
student_id, duration_minutes, questions_answered,
questions_correct, new_level
)
else:
await conn.execute(
"""
UPDATE student_learning_state SET
total_play_time_minutes = total_play_time_minutes + $2,
total_sessions = total_sessions + 1,
questions_answered = COALESCE(questions_answered, 0) + $3,
questions_correct = COALESCE(questions_correct, 0) + $4,
updated_at = NOW()
WHERE student_id = $1
""",
student_id, duration_minutes, questions_answered, questions_correct
)
return True
except Exception as e:
logger.error(f"Failed to update learning stats: {e}")
return False
# ==============================================
# Game Session Methods
# ==============================================
async def save_game_session(
self,
student_id: str,
game_mode: str,
duration_seconds: int,
distance_traveled: float,
score: int,
questions_answered: int,
questions_correct: int,
difficulty_level: int,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
"""Save a game session and return the session ID."""
await self._ensure_connected()
if not self._pool:
return None
try:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO game_sessions (
student_id, game_mode, duration_seconds, distance_traveled,
score, questions_answered, questions_correct, difficulty_level,
started_at, ended_at, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8,
NOW() - make_interval(secs => $3), NOW(), $9)
RETURNING id
""",
student_id, game_mode, duration_seconds, distance_traveled,
score, questions_answered, questions_correct, difficulty_level,
json.dumps(metadata) if metadata else None
)
if row:
return str(row["id"])
except Exception as e:
logger.error(f"Failed to save game session: {e}")
return None
async def get_user_sessions(
self,
student_id: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get recent game sessions for a user."""
await self._ensure_connected()
if not self._pool:
return []
try:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, student_id, game_mode, duration_seconds, distance_traveled,
score, questions_answered, questions_correct, difficulty_level,
started_at, ended_at, metadata
FROM game_sessions
WHERE student_id = $1
ORDER BY ended_at DESC
LIMIT $2
""",
student_id, limit
)
return [
{
"session_id": str(row["id"]),
"user_id": str(row["student_id"]),
"game_mode": row["game_mode"],
"duration_seconds": row["duration_seconds"],
"distance_traveled": float(row["distance_traveled"]) if row["distance_traveled"] else 0.0,
"score": row["score"],
"questions_answered": row["questions_answered"],
"questions_correct": row["questions_correct"],
"difficulty_level": row["difficulty_level"],
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
"ended_at": row["ended_at"].isoformat() if row["ended_at"] else None,
}
for row in rows
]
except Exception as e:
logger.error(f"Failed to get user sessions: {e}")
return []
async def get_leaderboard(
self,
timeframe: str = "day",
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get leaderboard data."""
await self._ensure_connected()
if not self._pool:
return []
# Timeframe filter
timeframe_sql = {
"day": "ended_at > NOW() - INTERVAL '1 day'",
"week": "ended_at > NOW() - INTERVAL '7 days'",
"month": "ended_at > NOW() - INTERVAL '30 days'",
"all": "1=1",
}.get(timeframe, "1=1")
try:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT student_id, SUM(score) as total_score
FROM game_sessions
WHERE {timeframe_sql}
GROUP BY student_id
ORDER BY total_score DESC
LIMIT $1
""",
limit
)
return [
{
"rank": i + 1,
"user_id": str(row["student_id"]),
"total_score": int(row["total_score"]),
}
for i, row in enumerate(rows)
]
except Exception as e:
logger.error(f"Failed to get leaderboard: {e}")
return []
# ==============================================
# Quiz Answer Methods
# ==============================================
async def save_quiz_answer(
self,
session_id: str,
question_id: str,
subject: str,
difficulty: int,
is_correct: bool,
answer_time_ms: int,
) -> bool:
"""Save an individual quiz answer."""
await self._ensure_connected()
if not self._pool:
return False
try:
async with self._pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO game_quiz_answers (
session_id, question_id, subject, difficulty,
is_correct, answer_time_ms
) VALUES ($1, $2, $3, $4, $5, $6)
""",
session_id, question_id, subject, difficulty,
is_correct, answer_time_ms
)
return True
except Exception as e:
logger.error(f"Failed to save quiz answer: {e}")
return False
async def get_subject_stats(
self,
student_id: str
) -> Dict[str, Dict[str, Any]]:
"""Get per-subject statistics for a student."""
await self._ensure_connected()
if not self._pool:
return {}
try:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT
qa.subject,
COUNT(*) as total,
SUM(CASE WHEN qa.is_correct THEN 1 ELSE 0 END) as correct,
AVG(qa.answer_time_ms) as avg_time_ms
FROM game_quiz_answers qa
JOIN game_sessions gs ON qa.session_id = gs.id
WHERE gs.student_id = $1
GROUP BY qa.subject
""",
student_id
)
return {
row["subject"]: {
"total": row["total"],
"correct": row["correct"],
"accuracy": row["correct"] / row["total"] if row["total"] > 0 else 0.0,
"avg_time_ms": int(row["avg_time_ms"]) if row["avg_time_ms"] else 0,
}
for row in rows
}
except Exception as e:
logger.error(f"Failed to get subject stats: {e}")
return {}
# ==============================================
# Extended Leaderboard Methods
# ==============================================
async def get_class_leaderboard(
self,
class_id: str,
timeframe: str = "week",
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Get leaderboard filtered by class.
Note: Requires class_id to be stored in user metadata or
a separate class_memberships table. For now, this is a
placeholder that can be extended.
"""
# For now, fall back to regular leaderboard
# TODO: Join with class_memberships table when available
return await self.get_leaderboard(timeframe, limit)
async def get_leaderboard_with_names(
self,
timeframe: str = "day",
limit: int = 10,
anonymize: bool = True
) -> List[Dict[str, Any]]:
"""Get leaderboard with anonymized display names."""
leaderboard = await self.get_leaderboard(timeframe, limit)
# Anonymize names for privacy (e.g., "Spieler 1", "Spieler 2")
if anonymize:
for entry in leaderboard:
entry["display_name"] = f"Spieler {entry['rank']}"
else:
# In production: Join with users table to get real names
for entry in leaderboard:
entry["display_name"] = f"Spieler {entry['rank']}"
return leaderboard
# ==============================================
# Parent Dashboard Methods
# ==============================================
async def get_children_stats(
self,
children_ids: List[str]
) -> List[Dict[str, Any]]:
"""Get stats for multiple children (parent dashboard)."""
if not children_ids:
return []
results = []
for child_id in children_ids:
state = await self.get_learning_state(child_id)
sessions = await self.get_user_sessions(child_id, limit=5)
results.append({
"student_id": child_id,
"learning_state": state.to_dict() if state else None,
"recent_sessions": sessions,
"has_played": state is not None and state.total_sessions > 0,
})
return results
async def get_progress_over_time(
self,
student_id: str,
days: int = 30
) -> List[Dict[str, Any]]:
"""Get learning progress over time for charts."""
await self._ensure_connected()
if not self._pool:
return []
try:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT
DATE(ended_at) as date,
COUNT(*) as sessions,
SUM(score) as total_score,
SUM(questions_answered) as questions,
SUM(questions_correct) as correct,
AVG(difficulty_level) as avg_difficulty
FROM game_sessions
WHERE student_id = $1
AND ended_at > NOW() - make_interval(days => $2)
GROUP BY DATE(ended_at)
ORDER BY date ASC
""",
student_id, days
)
return [
{
"date": row["date"].isoformat(),
"sessions": row["sessions"],
"total_score": int(row["total_score"]),
"questions": row["questions"],
"correct": row["correct"],
"accuracy": row["correct"] / row["questions"] if row["questions"] > 0 else 0,
"avg_difficulty": float(row["avg_difficulty"]) if row["avg_difficulty"] else 3.0,
}
for row in rows
]
except Exception as e:
logger.error(f"Failed to get progress over time: {e}")
return []
# ==============================================
# Achievement Methods
# ==============================================
async def get_student_achievements(
self,
student_id: str
) -> List[Achievement]:
"""Get achievements with unlock status for a student."""
await self._ensure_connected()
# Get student stats for progress calculation
state = await self.get_learning_state(student_id)
# Calculate progress for each achievement
achievements = []
for a in ACHIEVEMENTS:
achievement = Achievement(
id=a.id,
name=a.name,
description=a.description,
icon=a.icon,
category=a.category,
threshold=a.threshold,
)
# Calculate progress based on category
if state:
if a.category == "general":
achievement.progress = state.total_sessions
achievement.unlocked = state.total_sessions >= a.threshold
elif a.category == "time":
achievement.progress = state.total_play_time_minutes
achievement.unlocked = state.total_play_time_minutes >= a.threshold
elif a.category == "level":
achievement.progress = state.overall_level
achievement.unlocked = state.overall_level >= a.threshold
elif a.category == "accuracy":
if a.id == "accuracy_80" and state.questions_answered >= 50:
achievement.progress = int(state.accuracy * 100)
achievement.unlocked = state.accuracy >= 0.8
achievements.append(achievement)
# Check DB for unlocked achievements (streak, score, perfect game)
if self._pool:
try:
async with self._pool.acquire() as conn:
# Check for score achievements
max_score = await conn.fetchval(
"SELECT MAX(score) FROM game_sessions WHERE student_id = $1",
student_id
)
if max_score:
for a in achievements:
if a.category == "score":
a.progress = max_score
a.unlocked = max_score >= a.threshold
# Check for perfect game
perfect = await conn.fetchval(
"""
SELECT COUNT(*) FROM game_sessions
WHERE student_id = $1
AND questions_answered >= 5
AND questions_correct = questions_answered
""",
student_id
)
for a in achievements:
if a.id == "perfect_game":
a.progress = 100 if perfect and perfect > 0 else 0
a.unlocked = perfect is not None and perfect > 0
except Exception as e:
logger.error(f"Failed to check achievements: {e}")
return achievements
async def check_new_achievements(
self,
student_id: str,
session_score: int,
session_accuracy: float,
streak: int
) -> List[Achievement]:
"""
Check for newly unlocked achievements after a session.
Returns list of newly unlocked achievements.
"""
all_achievements = await self.get_student_achievements(student_id)
newly_unlocked = []
for a in all_achievements:
# Check streak achievements
if a.category == "streak" and streak >= a.threshold and not a.unlocked:
a.unlocked = True
newly_unlocked.append(a)
# Check score achievements
if a.category == "score" and session_score >= a.threshold and not a.unlocked:
a.unlocked = True
newly_unlocked.append(a)
# Check perfect game
if a.id == "perfect_game" and session_accuracy == 1.0:
if not a.unlocked:
a.unlocked = True
newly_unlocked.append(a)
return newly_unlocked
# Global database instance
_game_db: Optional[GameDatabase] = None
async def get_game_db() -> GameDatabase:
"""Get or create the global game database instance."""
global _game_db
if _game_db is None:
_game_db = GameDatabase()
await _game_db.connect()
return _game_db