This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

459 lines
14 KiB
Python

"""
AI Processor - Q&A Generator
Generate question-answer pairs with Leitner system for spaced repetition.
"""
from pathlib import Path
from datetime import datetime, timedelta
import json
import logging
import os
import requests
from ..config import VISION_API, BEREINIGT_DIR, get_openai_api_key
logger = logging.getLogger(__name__)
def _generate_qa_with_openai(analysis_data: dict, num_questions: int = 8) -> dict:
"""
Generate question-answer pairs based on worksheet analysis.
Important didactic requirements:
- Questions based almost verbatim on the existing material
- Only minimal rephrasing allowed
- Key terms/technical terms marked as important
- Difficulty level matches the original (grade_level)
Args:
analysis_data: The analysis JSON of the worksheet
num_questions: Number of questions to generate (default: 8)
Returns:
Dict with qa_items and metadata
"""
api_key = get_openai_api_key()
title = analysis_data.get("title") or "Arbeitsblatt"
subject = analysis_data.get("subject") or "Allgemein"
grade_level = analysis_data.get("grade_level") or "unbekannt"
canonical_text = analysis_data.get("canonical_text") or ""
printed_blocks = analysis_data.get("printed_blocks") or []
tasks = analysis_data.get("tasks") or []
content_parts = []
if canonical_text:
content_parts.append(canonical_text)
for block in printed_blocks:
text = block.get("text", "").strip()
if text and text not in content_parts:
content_parts.append(text)
for task in tasks:
desc = task.get("description", "").strip()
text = task.get("text_with_gaps", "").strip()
if desc:
content_parts.append(f"Aufgabe: {desc}")
if text:
content_parts.append(text)
worksheet_content = "\n\n".join(content_parts)
if not worksheet_content.strip():
logger.warning("Kein Textinhalt fuer Q&A-Generierung gefunden")
return {"qa_items": [], "metadata": {"error": "Kein Textinhalt gefunden"}}
url = "https://api.openai.com/v1/chat/completions"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
system_prompt = f"""Du bist ein erfahrener Paedagoge, der Frage-Antwort-Paare fuer Schueler erstellt.
WICHTIGE REGELN:
1. INHALTE NUR AUS DEM TEXT:
- Verwende FAST WOERTLICH den vorhandenen Stoff
- KEINE neuen Fakten oder Inhalte einfuehren!
- Alles muss aus dem gegebenen Text ableitbar sein
2. SCHWIERIGKEITSGRAD:
- Niveau muss exakt "{grade_level}" entsprechen
3. SCHLUESSELWOERTER MARKIEREN:
- Identifiziere wichtige Fachbegriffe als "key_terms"
4. FRAGETYPEN:
- Wissensfragen: "Was ist...?", "Nenne..."
- Verstaendnisfragen: "Erklaere...", "Beschreibe..."
- Anwendungsfragen: "Warum...?", "Was passiert, wenn...?"
5. ANTWORT-FORMAT:
- Kurze, praezise Antworten (1-3 Saetze)
6. AUSGABE: Nur gueltiges JSON, kein Markdown."""
user_prompt = f"""Erstelle {num_questions} Frage-Antwort-Paare aus diesem Arbeitsblatt:
TITEL: {title}
FACH: {subject}
KLASSENSTUFE: {grade_level}
TEXT:
{worksheet_content}
Gib das Ergebnis als JSON zurueck:
{{
"qa_items": [
{{
"id": "qa1",
"question": "Die Frage hier (fast woertlich aus dem Text)",
"answer": "Die korrekte Antwort (direkt aus dem Text)",
"question_type": "knowledge" | "understanding" | "application",
"key_terms": ["wichtiger Begriff 1", "wichtiger Begriff 2"],
"difficulty": 1-3,
"source_hint": "Kurzer Hinweis, wo im Text die Antwort steht",
"leitner_box": 0
}}
],
"metadata": {{
"subject": "{subject}",
"grade_level": "{grade_level}",
"source_title": "{title}",
"total_questions": {num_questions},
"key_terms_summary": ["alle", "wichtigen", "Fachbegriffe", "gesammelt"]
}}
}}
WICHTIG:
- Alle Antworten muessen aus dem Text ableitbar sein!
- "leitner_box": 0 bedeutet "neu" (noch nicht gelernt)
- "difficulty": 1=leicht, 2=mittel, 3=schwer"""
payload = {
"model": "gpt-4o-mini",
"response_format": {"type": "json_object"},
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"max_tokens": 3000,
"temperature": 0.5,
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
try:
content = data["choices"][0]["message"]["content"]
qa_data = json.loads(content)
except (KeyError, json.JSONDecodeError) as e:
raise RuntimeError(f"Fehler bei Q&A-Generierung: {e}")
# Initialize Leitner-Box fields for all items
_initialize_leitner_fields(qa_data)
return qa_data
def _generate_qa_with_claude(analysis_data: dict, num_questions: int = 8) -> dict:
"""Generate question-answer pairs with Claude API."""
import anthropic
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise RuntimeError("ANTHROPIC_API_KEY ist nicht gesetzt.")
client = anthropic.Anthropic(api_key=api_key)
title = analysis_data.get("title") or "Arbeitsblatt"
subject = analysis_data.get("subject") or "Allgemein"
grade_level = analysis_data.get("grade_level") or "unbekannt"
canonical_text = analysis_data.get("canonical_text") or ""
printed_blocks = analysis_data.get("printed_blocks") or []
tasks = analysis_data.get("tasks") or []
content_parts = []
if canonical_text:
content_parts.append(canonical_text)
for block in printed_blocks:
text = block.get("text", "").strip()
if text and text not in content_parts:
content_parts.append(text)
for task in tasks:
desc = task.get("description", "").strip()
if desc:
content_parts.append(f"Aufgabe: {desc}")
worksheet_content = "\n\n".join(content_parts)
if not worksheet_content.strip():
return {"qa_items": [], "metadata": {"error": "Kein Textinhalt gefunden"}}
prompt = f"""Erstelle {num_questions} Frage-Antwort-Paare aus diesem Arbeitsblatt.
WICHTIGE REGELN:
1. Verwende FAST WOERTLICH den vorhandenen Stoff - KEINE neuen Fakten!
2. Schwierigkeitsgrad: exakt "{grade_level}"
3. Markiere wichtige Fachbegriffe als "key_terms"
TITEL: {title}
FACH: {subject}
KLASSENSTUFE: {grade_level}
TEXT:
{worksheet_content}
Antworte NUR mit diesem JSON:
{{
"qa_items": [
{{
"id": "qa1",
"question": "Frage (fast woertlich aus Text)",
"answer": "Antwort (direkt aus Text)",
"question_type": "knowledge",
"key_terms": ["Begriff1", "Begriff2"],
"difficulty": 1,
"source_hint": "Wo im Text",
"leitner_box": 0
}}
],
"metadata": {{
"subject": "{subject}",
"grade_level": "{grade_level}",
"source_title": "{title}",
"total_questions": {num_questions},
"key_terms_summary": ["alle", "Fachbegriffe"]
}}
}}"""
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=3000,
messages=[{"role": "user", "content": prompt}]
)
content = message.content[0].text
try:
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]
qa_data = json.loads(content.strip())
except json.JSONDecodeError as e:
raise RuntimeError(f"Claude hat ungueltiges JSON geliefert: {e}")
# Initialize Leitner-Box fields
_initialize_leitner_fields(qa_data)
return qa_data
def _initialize_leitner_fields(qa_data: dict) -> None:
"""Initialize Leitner-Box fields for all Q&A items."""
for item in qa_data.get("qa_items", []):
if "leitner_box" not in item:
item["leitner_box"] = 0
if "correct_count" not in item:
item["correct_count"] = 0
if "incorrect_count" not in item:
item["incorrect_count"] = 0
if "last_seen" not in item:
item["last_seen"] = None
if "next_review" not in item:
item["next_review"] = None
def generate_qa_from_analysis(analysis_path: Path, num_questions: int = 8) -> Path:
"""
Generate question-answer pairs from an analysis JSON file.
The Q&A pairs will:
- Be based almost verbatim on the original text
- Be prepared with Leitner-Box system for repetition
- Have key terms marked for reinforcement
Args:
analysis_path: Path to *_analyse.json file
num_questions: Number of questions to generate
Returns:
Path to generated *_qa.json file
"""
if not analysis_path.exists():
raise FileNotFoundError(f"Analysedatei nicht gefunden: {analysis_path}")
try:
analysis_data = json.loads(analysis_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
raise RuntimeError(f"Ungueltige Analyse-JSON: {e}")
logger.info(f"Generiere Q&A-Paare fuer: {analysis_path.name}")
# Generate Q&A (use configured API)
if VISION_API == "claude":
try:
qa_data = _generate_qa_with_claude(analysis_data, num_questions)
except Exception as e:
logger.warning(f"Claude Q&A-Generierung fehlgeschlagen, nutze OpenAI: {e}")
qa_data = _generate_qa_with_openai(analysis_data, num_questions)
else:
qa_data = _generate_qa_with_openai(analysis_data, num_questions)
# Save Q&A data
out_name = analysis_path.stem.replace("_analyse", "") + "_qa.json"
out_path = BEREINIGT_DIR / out_name
out_path.write_text(json.dumps(qa_data, ensure_ascii=False, indent=2), encoding="utf-8")
logger.info(f"Q&A-Paare gespeichert: {out_path.name}")
return out_path
# ---------------------------------------------------------------------------
# Leitner-Box System for Spaced Repetition
# ---------------------------------------------------------------------------
def update_leitner_progress(qa_path: Path, item_id: str, correct: bool) -> dict:
"""
Update the learning progress of a Q&A item using the Leitner system.
Leitner Boxes:
- Box 0: New (not yet learned)
- Box 1: Learned (on error → back to Box 0)
- Box 2: Consolidated (on error → back to Box 1)
On correct answer: Increase box (max 2)
On wrong answer: Decrease box (min 0)
Args:
qa_path: Path to *_qa.json file
item_id: ID of the Q&A item
correct: True if answered correctly
Returns:
Dict with updated item and status
"""
if not qa_path.exists():
raise FileNotFoundError(f"Q&A-Datei nicht gefunden: {qa_path}")
qa_data = json.loads(qa_path.read_text(encoding="utf-8"))
# Find the item
item = None
for qa_item in qa_data.get("qa_items", []):
if qa_item.get("id") == item_id:
item = qa_item
break
if not item:
return {"status": "NOT_FOUND", "message": f"Item {item_id} nicht gefunden"}
# Update statistics
now = datetime.now().isoformat()
item["last_seen"] = now
if correct:
item["correct_count"] = item.get("correct_count", 0) + 1
# Increase box (max 2)
current_box = item.get("leitner_box", 0)
if current_box < 2:
item["leitner_box"] = current_box + 1
# Next review based on box
# Box 0→1: After 1 day, Box 1→2: After 3 days, Box 2: After 7 days
days = [1, 3, 7][item["leitner_box"]]
item["next_review"] = (datetime.now() + timedelta(days=days)).isoformat()
else:
item["incorrect_count"] = item.get("incorrect_count", 0) + 1
# Decrease box (min 0)
current_box = item.get("leitner_box", 0)
if current_box > 0:
item["leitner_box"] = current_box - 1
# On error: review soon
item["next_review"] = (datetime.now() + timedelta(hours=4)).isoformat()
# Save updated data
qa_path.write_text(json.dumps(qa_data, ensure_ascii=False, indent=2), encoding="utf-8")
box_names = ["Neu", "Gelernt", "Gefestigt"]
return {
"status": "OK",
"item_id": item_id,
"correct": correct,
"new_box": item["leitner_box"],
"box_name": box_names[item["leitner_box"]],
"correct_count": item["correct_count"],
"incorrect_count": item["incorrect_count"],
"next_review": item["next_review"]
}
def get_next_review_items(qa_path: Path, limit: int = 5) -> list:
"""
Get the next items to review.
Prioritization:
1. Wrongly answered items (Box 0) - more frequent
2. Learned items (Box 1) whose review is due
3. Consolidated items (Box 2) for occasional refresh
Args:
qa_path: Path to *_qa.json file
limit: Maximum number of items
Returns:
List of items to review (sorted by priority)
"""
if not qa_path.exists():
return []
qa_data = json.loads(qa_path.read_text(encoding="utf-8"))
items = qa_data.get("qa_items", [])
now = datetime.now()
review_items = []
for item in items:
box = item.get("leitner_box", 0)
next_review = item.get("next_review")
incorrect = item.get("incorrect_count", 0)
# Calculate priority (lower = more important)
priority = box * 10 # Box 0 has highest priority
# Bonus for frequently wrong answers
priority -= incorrect * 2
# Check if review is due
is_due = True
if next_review:
try:
review_time = datetime.fromisoformat(next_review)
is_due = now >= review_time
# Overdue items get higher priority
if is_due:
overdue_hours = (now - review_time).total_seconds() / 3600
priority -= overdue_hours
except (ValueError, TypeError):
is_due = True
# New items (Box 0) always included
if box == 0 or is_due:
review_items.append({
**item,
"_priority": priority,
"_is_due": is_due
})
# Sort by priority (lowest first)
review_items.sort(key=lambda x: x["_priority"])
# Remove internal fields and limit
result = []
for item in review_items[:limit]:
clean_item = {k: v for k, v in item.items() if not k.startswith("_")}
result.append(clean_item)
return result