""" AI Processor - Q&A Generator Generate question-answer pairs with Leitner system for spaced repetition. """ from pathlib import Path from datetime import datetime, timedelta import json import logging import os import requests from ..config import VISION_API, BEREINIGT_DIR, get_openai_api_key logger = logging.getLogger(__name__) def _generate_qa_with_openai(analysis_data: dict, num_questions: int = 8) -> dict: """ Generate question-answer pairs based on worksheet analysis. Important didactic requirements: - Questions based almost verbatim on the existing material - Only minimal rephrasing allowed - Key terms/technical terms marked as important - Difficulty level matches the original (grade_level) Args: analysis_data: The analysis JSON of the worksheet num_questions: Number of questions to generate (default: 8) Returns: Dict with qa_items and metadata """ api_key = get_openai_api_key() title = analysis_data.get("title") or "Arbeitsblatt" subject = analysis_data.get("subject") or "Allgemein" grade_level = analysis_data.get("grade_level") or "unbekannt" canonical_text = analysis_data.get("canonical_text") or "" printed_blocks = analysis_data.get("printed_blocks") or [] tasks = analysis_data.get("tasks") or [] content_parts = [] if canonical_text: content_parts.append(canonical_text) for block in printed_blocks: text = block.get("text", "").strip() if text and text not in content_parts: content_parts.append(text) for task in tasks: desc = task.get("description", "").strip() text = task.get("text_with_gaps", "").strip() if desc: content_parts.append(f"Aufgabe: {desc}") if text: content_parts.append(text) worksheet_content = "\n\n".join(content_parts) if not worksheet_content.strip(): logger.warning("Kein Textinhalt fuer Q&A-Generierung gefunden") return {"qa_items": [], "metadata": {"error": "Kein Textinhalt gefunden"}} url = "https://api.openai.com/v1/chat/completions" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} system_prompt = f"""Du bist ein erfahrener Paedagoge, der Frage-Antwort-Paare fuer Schueler erstellt. WICHTIGE REGELN: 1. INHALTE NUR AUS DEM TEXT: - Verwende FAST WOERTLICH den vorhandenen Stoff - KEINE neuen Fakten oder Inhalte einfuehren! - Alles muss aus dem gegebenen Text ableitbar sein 2. SCHWIERIGKEITSGRAD: - Niveau muss exakt "{grade_level}" entsprechen 3. SCHLUESSELWOERTER MARKIEREN: - Identifiziere wichtige Fachbegriffe als "key_terms" 4. FRAGETYPEN: - Wissensfragen: "Was ist...?", "Nenne..." - Verstaendnisfragen: "Erklaere...", "Beschreibe..." - Anwendungsfragen: "Warum...?", "Was passiert, wenn...?" 5. ANTWORT-FORMAT: - Kurze, praezise Antworten (1-3 Saetze) 6. AUSGABE: Nur gueltiges JSON, kein Markdown.""" user_prompt = f"""Erstelle {num_questions} Frage-Antwort-Paare aus diesem Arbeitsblatt: TITEL: {title} FACH: {subject} KLASSENSTUFE: {grade_level} TEXT: {worksheet_content} Gib das Ergebnis als JSON zurueck: {{ "qa_items": [ {{ "id": "qa1", "question": "Die Frage hier (fast woertlich aus dem Text)", "answer": "Die korrekte Antwort (direkt aus dem Text)", "question_type": "knowledge" | "understanding" | "application", "key_terms": ["wichtiger Begriff 1", "wichtiger Begriff 2"], "difficulty": 1-3, "source_hint": "Kurzer Hinweis, wo im Text die Antwort steht", "leitner_box": 0 }} ], "metadata": {{ "subject": "{subject}", "grade_level": "{grade_level}", "source_title": "{title}", "total_questions": {num_questions}, "key_terms_summary": ["alle", "wichtigen", "Fachbegriffe", "gesammelt"] }} }} WICHTIG: - Alle Antworten muessen aus dem Text ableitbar sein! - "leitner_box": 0 bedeutet "neu" (noch nicht gelernt) - "difficulty": 1=leicht, 2=mittel, 3=schwer""" payload = { "model": "gpt-4o-mini", "response_format": {"type": "json_object"}, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "max_tokens": 3000, "temperature": 0.5, } response = requests.post(url, headers=headers, json=payload) response.raise_for_status() data = response.json() try: content = data["choices"][0]["message"]["content"] qa_data = json.loads(content) except (KeyError, json.JSONDecodeError) as e: raise RuntimeError(f"Fehler bei Q&A-Generierung: {e}") # Initialize Leitner-Box fields for all items _initialize_leitner_fields(qa_data) return qa_data def _generate_qa_with_claude(analysis_data: dict, num_questions: int = 8) -> dict: """Generate question-answer pairs with Claude API.""" import anthropic api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: raise RuntimeError("ANTHROPIC_API_KEY ist nicht gesetzt.") client = anthropic.Anthropic(api_key=api_key) title = analysis_data.get("title") or "Arbeitsblatt" subject = analysis_data.get("subject") or "Allgemein" grade_level = analysis_data.get("grade_level") or "unbekannt" canonical_text = analysis_data.get("canonical_text") or "" printed_blocks = analysis_data.get("printed_blocks") or [] tasks = analysis_data.get("tasks") or [] content_parts = [] if canonical_text: content_parts.append(canonical_text) for block in printed_blocks: text = block.get("text", "").strip() if text and text not in content_parts: content_parts.append(text) for task in tasks: desc = task.get("description", "").strip() if desc: content_parts.append(f"Aufgabe: {desc}") worksheet_content = "\n\n".join(content_parts) if not worksheet_content.strip(): return {"qa_items": [], "metadata": {"error": "Kein Textinhalt gefunden"}} prompt = f"""Erstelle {num_questions} Frage-Antwort-Paare aus diesem Arbeitsblatt. WICHTIGE REGELN: 1. Verwende FAST WOERTLICH den vorhandenen Stoff - KEINE neuen Fakten! 2. Schwierigkeitsgrad: exakt "{grade_level}" 3. Markiere wichtige Fachbegriffe als "key_terms" TITEL: {title} FACH: {subject} KLASSENSTUFE: {grade_level} TEXT: {worksheet_content} Antworte NUR mit diesem JSON: {{ "qa_items": [ {{ "id": "qa1", "question": "Frage (fast woertlich aus Text)", "answer": "Antwort (direkt aus Text)", "question_type": "knowledge", "key_terms": ["Begriff1", "Begriff2"], "difficulty": 1, "source_hint": "Wo im Text", "leitner_box": 0 }} ], "metadata": {{ "subject": "{subject}", "grade_level": "{grade_level}", "source_title": "{title}", "total_questions": {num_questions}, "key_terms_summary": ["alle", "Fachbegriffe"] }} }}""" message = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=3000, messages=[{"role": "user", "content": prompt}] ) content = message.content[0].text try: if "```json" in content: content = content.split("```json")[1].split("```")[0] elif "```" in content: content = content.split("```")[1].split("```")[0] qa_data = json.loads(content.strip()) except json.JSONDecodeError as e: raise RuntimeError(f"Claude hat ungueltiges JSON geliefert: {e}") # Initialize Leitner-Box fields _initialize_leitner_fields(qa_data) return qa_data def _initialize_leitner_fields(qa_data: dict) -> None: """Initialize Leitner-Box fields for all Q&A items.""" for item in qa_data.get("qa_items", []): if "leitner_box" not in item: item["leitner_box"] = 0 if "correct_count" not in item: item["correct_count"] = 0 if "incorrect_count" not in item: item["incorrect_count"] = 0 if "last_seen" not in item: item["last_seen"] = None if "next_review" not in item: item["next_review"] = None def generate_qa_from_analysis(analysis_path: Path, num_questions: int = 8) -> Path: """ Generate question-answer pairs from an analysis JSON file. The Q&A pairs will: - Be based almost verbatim on the original text - Be prepared with Leitner-Box system for repetition - Have key terms marked for reinforcement Args: analysis_path: Path to *_analyse.json file num_questions: Number of questions to generate Returns: Path to generated *_qa.json file """ if not analysis_path.exists(): raise FileNotFoundError(f"Analysedatei nicht gefunden: {analysis_path}") try: analysis_data = json.loads(analysis_path.read_text(encoding="utf-8")) except json.JSONDecodeError as e: raise RuntimeError(f"Ungueltige Analyse-JSON: {e}") logger.info(f"Generiere Q&A-Paare fuer: {analysis_path.name}") # Generate Q&A (use configured API) if VISION_API == "claude": try: qa_data = _generate_qa_with_claude(analysis_data, num_questions) except Exception as e: logger.warning(f"Claude Q&A-Generierung fehlgeschlagen, nutze OpenAI: {e}") qa_data = _generate_qa_with_openai(analysis_data, num_questions) else: qa_data = _generate_qa_with_openai(analysis_data, num_questions) # Save Q&A data out_name = analysis_path.stem.replace("_analyse", "") + "_qa.json" out_path = BEREINIGT_DIR / out_name out_path.write_text(json.dumps(qa_data, ensure_ascii=False, indent=2), encoding="utf-8") logger.info(f"Q&A-Paare gespeichert: {out_path.name}") return out_path # --------------------------------------------------------------------------- # Leitner-Box System for Spaced Repetition # --------------------------------------------------------------------------- def update_leitner_progress(qa_path: Path, item_id: str, correct: bool) -> dict: """ Update the learning progress of a Q&A item using the Leitner system. Leitner Boxes: - Box 0: New (not yet learned) - Box 1: Learned (on error → back to Box 0) - Box 2: Consolidated (on error → back to Box 1) On correct answer: Increase box (max 2) On wrong answer: Decrease box (min 0) Args: qa_path: Path to *_qa.json file item_id: ID of the Q&A item correct: True if answered correctly Returns: Dict with updated item and status """ if not qa_path.exists(): raise FileNotFoundError(f"Q&A-Datei nicht gefunden: {qa_path}") qa_data = json.loads(qa_path.read_text(encoding="utf-8")) # Find the item item = None for qa_item in qa_data.get("qa_items", []): if qa_item.get("id") == item_id: item = qa_item break if not item: return {"status": "NOT_FOUND", "message": f"Item {item_id} nicht gefunden"} # Update statistics now = datetime.now().isoformat() item["last_seen"] = now if correct: item["correct_count"] = item.get("correct_count", 0) + 1 # Increase box (max 2) current_box = item.get("leitner_box", 0) if current_box < 2: item["leitner_box"] = current_box + 1 # Next review based on box # Box 0→1: After 1 day, Box 1→2: After 3 days, Box 2: After 7 days days = [1, 3, 7][item["leitner_box"]] item["next_review"] = (datetime.now() + timedelta(days=days)).isoformat() else: item["incorrect_count"] = item.get("incorrect_count", 0) + 1 # Decrease box (min 0) current_box = item.get("leitner_box", 0) if current_box > 0: item["leitner_box"] = current_box - 1 # On error: review soon item["next_review"] = (datetime.now() + timedelta(hours=4)).isoformat() # Save updated data qa_path.write_text(json.dumps(qa_data, ensure_ascii=False, indent=2), encoding="utf-8") box_names = ["Neu", "Gelernt", "Gefestigt"] return { "status": "OK", "item_id": item_id, "correct": correct, "new_box": item["leitner_box"], "box_name": box_names[item["leitner_box"]], "correct_count": item["correct_count"], "incorrect_count": item["incorrect_count"], "next_review": item["next_review"] } def get_next_review_items(qa_path: Path, limit: int = 5) -> list: """ Get the next items to review. Prioritization: 1. Wrongly answered items (Box 0) - more frequent 2. Learned items (Box 1) whose review is due 3. Consolidated items (Box 2) for occasional refresh Args: qa_path: Path to *_qa.json file limit: Maximum number of items Returns: List of items to review (sorted by priority) """ if not qa_path.exists(): return [] qa_data = json.loads(qa_path.read_text(encoding="utf-8")) items = qa_data.get("qa_items", []) now = datetime.now() review_items = [] for item in items: box = item.get("leitner_box", 0) next_review = item.get("next_review") incorrect = item.get("incorrect_count", 0) # Calculate priority (lower = more important) priority = box * 10 # Box 0 has highest priority # Bonus for frequently wrong answers priority -= incorrect * 2 # Check if review is due is_due = True if next_review: try: review_time = datetime.fromisoformat(next_review) is_due = now >= review_time # Overdue items get higher priority if is_due: overdue_hours = (now - review_time).total_seconds() / 3600 priority -= overdue_hours except (ValueError, TypeError): is_due = True # New items (Box 0) always included if box == 0 or is_due: review_items.append({ **item, "_priority": priority, "_is_due": is_due }) # Sort by priority (lowest first) review_items.sort(key=lambda x: x["_priority"]) # Remove internal fields and limit result = [] for item in review_items[:limit]: clean_item = {k: v for k, v in item.items() if not k.startswith("_")} result.append(clean_item) return result