Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
156 lines
4.8 KiB
Python
156 lines
4.8 KiB
Python
"""
|
|
AI Processing - Leitner System.
|
|
|
|
Spaced Repetition System für Q&A-Paare.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def update_leitner_progress(qa_path: Path, item_id: str, correct: bool) -> dict:
|
|
"""
|
|
Aktualisiert den Lernfortschritt eines Q&A-Items nach dem Leitner-System.
|
|
|
|
Leitner-Boxen:
|
|
- Box 0: Neu (noch nicht gelernt)
|
|
- Box 1: Gelernt (bei Fehler → zurück zu Box 0)
|
|
- Box 2: Gefestigt (bei Fehler → zurück zu Box 1)
|
|
|
|
Bei korrekter Antwort: Box erhöhen (max 2)
|
|
Bei falscher Antwort: Box verringern (min 0)
|
|
|
|
Args:
|
|
qa_path: Pfad zur *_qa.json Datei
|
|
item_id: ID des Q&A-Items
|
|
correct: True wenn korrekt beantwortet
|
|
|
|
Returns:
|
|
Dict mit aktualisiertem Item und Status
|
|
"""
|
|
if not qa_path.exists():
|
|
raise FileNotFoundError(f"Q&A-Datei nicht gefunden: {qa_path}")
|
|
|
|
qa_data = json.loads(qa_path.read_text(encoding="utf-8"))
|
|
|
|
# Finde das Item
|
|
item = None
|
|
for qa_item in qa_data.get("qa_items", []):
|
|
if qa_item.get("id") == item_id:
|
|
item = qa_item
|
|
break
|
|
|
|
if not item:
|
|
return {"status": "NOT_FOUND", "message": f"Item {item_id} nicht gefunden"}
|
|
|
|
# Aktualisiere Statistiken
|
|
now = datetime.now().isoformat()
|
|
item["last_seen"] = now
|
|
|
|
if correct:
|
|
item["correct_count"] = item.get("correct_count", 0) + 1
|
|
# Box erhöhen (max 2)
|
|
current_box = item.get("leitner_box", 0)
|
|
if current_box < 2:
|
|
item["leitner_box"] = current_box + 1
|
|
# Nächste Wiederholung basierend auf Box
|
|
# Box 0→1: Nach 1 Tag, Box 1→2: Nach 3 Tagen, Box 2: Nach 7 Tagen
|
|
days = [1, 3, 7][item["leitner_box"]]
|
|
item["next_review"] = (datetime.now() + timedelta(days=days)).isoformat()
|
|
else:
|
|
item["incorrect_count"] = item.get("incorrect_count", 0) + 1
|
|
# Box verringern (min 0)
|
|
current_box = item.get("leitner_box", 0)
|
|
if current_box > 0:
|
|
item["leitner_box"] = current_box - 1
|
|
# Bei Fehler: Bald wiederholen
|
|
item["next_review"] = (datetime.now() + timedelta(hours=4)).isoformat()
|
|
|
|
# Speichere aktualisierte Daten
|
|
qa_path.write_text(json.dumps(qa_data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
box_names = ["Neu", "Gelernt", "Gefestigt"]
|
|
return {
|
|
"status": "OK",
|
|
"item_id": item_id,
|
|
"correct": correct,
|
|
"new_box": item["leitner_box"],
|
|
"box_name": box_names[item["leitner_box"]],
|
|
"correct_count": item["correct_count"],
|
|
"incorrect_count": item["incorrect_count"],
|
|
"next_review": item["next_review"]
|
|
}
|
|
|
|
|
|
def get_next_review_items(qa_path: Path, limit: int = 5) -> list:
|
|
"""
|
|
Gibt die nächsten zu wiederholenden Items zurück.
|
|
|
|
Priorisierung:
|
|
1. Falsch beantwortete Items (Box 0) - häufiger
|
|
2. Gelernte Items (Box 1) deren Review fällig ist
|
|
3. Gefestigte Items (Box 2) zur gelegentlichen Auffrischung
|
|
|
|
Args:
|
|
qa_path: Pfad zur *_qa.json Datei
|
|
limit: Maximale Anzahl Items
|
|
|
|
Returns:
|
|
Liste der zu wiederholenden Items (sortiert nach Priorität)
|
|
"""
|
|
if not qa_path.exists():
|
|
return []
|
|
|
|
qa_data = json.loads(qa_path.read_text(encoding="utf-8"))
|
|
items = qa_data.get("qa_items", [])
|
|
|
|
now = datetime.now()
|
|
review_items = []
|
|
|
|
for item in items:
|
|
box = item.get("leitner_box", 0)
|
|
next_review = item.get("next_review")
|
|
incorrect = item.get("incorrect_count", 0)
|
|
|
|
# Priorität berechnen (niedriger = wichtiger)
|
|
priority = box * 10 # Box 0 hat höchste Priorität
|
|
|
|
# Bonus für häufig falsch beantwortete
|
|
priority -= incorrect * 2
|
|
|
|
# Prüfe ob Review fällig
|
|
is_due = True
|
|
if next_review:
|
|
try:
|
|
review_time = datetime.fromisoformat(next_review)
|
|
is_due = now >= review_time
|
|
# Überfällige Items bekommen höhere Priorität
|
|
if is_due:
|
|
overdue_hours = (now - review_time).total_seconds() / 3600
|
|
priority -= overdue_hours
|
|
except (ValueError, TypeError):
|
|
is_due = True
|
|
|
|
# Neue Items (Box 0) immer einschließen
|
|
if box == 0 or is_due:
|
|
review_items.append({
|
|
**item,
|
|
"_priority": priority,
|
|
"_is_due": is_due
|
|
})
|
|
|
|
# Sortiere nach Priorität (niedrigste zuerst)
|
|
review_items.sort(key=lambda x: x["_priority"])
|
|
|
|
# Entferne interne Felder und limitiere
|
|
result = []
|
|
for item in review_items[:limit]:
|
|
clean_item = {k: v for k, v in item.items() if not k.startswith("_")}
|
|
result.append(clean_item)
|
|
|
|
return result
|