""" Vocabulary Unit API — Create learning units, translate words, manage language pairs. Endpoints for teachers to build vocabulary learning units with custom words, auto-translation via Kaikki dictionary, and flexible language pair support. """ import json import logging import os from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from .db import get_word, VocabularyWord, get_pool from units.learning import LearningUnitCreate, create_learning_unit logger = logging.getLogger(__name__) router = APIRouter(prefix="/vocabulary", tags=["vocabulary"]) # All supported language codes SUPPORTED_LANGS = { "en", "de", "fr", "es", "it", "pt", "nl", "tr", "ru", "ar", "uk", "pl", "sv", "fi", "da", "ro", "el", "hu", "cs", "bg", "lv", "lt", "sk", "et", "sl", "hr", } # --------------------------------------------------------------------------- # Translation Lookup (auto-suggest) # --------------------------------------------------------------------------- @router.get("/lookup-translation") async def api_lookup_translation( word: str = Query("", min_length=1, description="Word to translate"), source: str = Query("en", description="Source language code"), target: str = Query("de", description="Target language code"), limit: int = Query(5, ge=1, le=20), ): """Look up translations between any two languages via Kaikki dictionary. Uses EN entries as a hub: all EN words have translations to 24 languages. - EN → X: direct lookup (word in EN, translation from JSONB) - X → EN: reverse lookup (search EN entries where translations.X matches) - X → Y: bridge via EN (find EN word via X, then get Y translation) """ if source not in SUPPORTED_LANGS or target not in SUPPORTED_LANGS: raise HTTPException(status_code=400, detail="Sprache nicht unterstuetzt") if source == target: return {"results": [], "word": word, "source": source, "target": target} pool = await get_pool() q = word.strip() results = [] async with pool.acquire() as conn: if source == "en": # Direct: search EN word, return target translation rows = await conn.fetch( """SELECT word, pos, ipa, translations FROM vocabulary_kaikki WHERE lang = 'en' AND lower(word) LIKE $1 ORDER BY length(word), lower(word) LIMIT $2""", f"{q.lower()}%", limit, ) for r in rows: tr = _parse_translations(r["translations"]) target_text = tr.get(target, {}).get("text", "") if target_text: results.append({ "source_text": r["word"], "target_text": target_text, "pos": r["pos"], "ipa": r["ipa"] or "", }) elif target == "en": # Reverse: search EN entries where translations.source matches rows = await conn.fetch( """SELECT word, pos, ipa, translations->'%s'->>'text' as src_text FROM vocabulary_kaikki WHERE lang = 'en' AND translations->'%s'->>'text' ILIKE $1 ORDER BY length(word) LIMIT $2""" % (source, source), f"{q}%", limit, ) for r in rows: results.append({ "source_text": r["src_text"], "target_text": r["word"], "pos": r["pos"], "ipa": r["ipa"] or "", }) else: # Bridge via EN: find EN word via source, then get target translation rows = await conn.fetch( """SELECT word, pos, ipa, translations FROM vocabulary_kaikki WHERE lang = 'en' AND translations->'%s'->>'text' ILIKE $1 ORDER BY length(word) LIMIT $2""" % source, f"{q}%", limit, ) for r in rows: tr = _parse_translations(r["translations"]) src_text = tr.get(source, {}).get("text", "") target_text = tr.get(target, {}).get("text", "") if src_text and target_text: results.append({ "source_text": src_text, "target_text": target_text, "pos": r["pos"], "ipa": "", }) return {"results": results, "word": q, "source": source, "target": target} def _parse_translations(tr) -> dict: """Parse translations field (may be JSONB dict or JSON string).""" if isinstance(tr, str): return json.loads(tr) return tr or {} # --------------------------------------------------------------------------- # Unit Creation (with custom words + language pair) # --------------------------------------------------------------------------- class CustomWord(BaseModel): source_text: str target_text: str class CreateUnitPayload(BaseModel): title: str word_ids: List[str] = [] custom_words: List[CustomWord] = [] source_lang: str = "en" target_lang: str = "de" grade: Optional[str] = None @router.post("/units") async def api_create_unit_from_words(payload: CreateUnitPayload): """Create a learning unit from dictionary words and/or custom word pairs. Supports any language pair. Words can come from: 1. word_ids — looked up in Kaikki dictionary 2. custom_words — manually entered source/target pairs """ if not payload.word_ids and not payload.custom_words: raise HTTPException(status_code=400, detail="Keine Woerter ausgewaehlt") qa_items = [] vocab_data = [] idx = 0 # 1. Process dictionary words for wid in payload.word_ids: word = await get_word(wid) if not word: # Try Kaikki lookup kaikki_word = await _get_kaikki_word(wid, payload.source_lang, payload.target_lang) if kaikki_word: qa_items.append(_make_qa_item(idx, kaikki_word, payload.source_lang, payload.target_lang)) vocab_data.append(kaikki_word) idx += 1 continue # Manual vocabulary_words entry source_text, target_text = _get_word_pair(word, payload.source_lang, payload.target_lang) qa_items.append({ "id": f"qa_{idx+1}", "question": source_text, "answer": target_text, "question_type": "knowledge", "key_terms": [source_text], "difficulty": word.difficulty, "source_hint": word.part_of_speech, "leitner_box": 0, "correct_count": 0, "incorrect_count": 0, "last_seen": None, "next_review": None, "ipa_en": word.ipa_en, "ipa_de": word.ipa_de, "syllables_en": word.syllables_en, "syllables_de": word.syllables_de, "example_en": word.example_en, "example_de": word.example_de, "image_url": word.image_url, "audio_url_en": word.audio_url_en, "audio_url_de": word.audio_url_de, "part_of_speech": word.part_of_speech, "translations": word.translations, }) vocab_data.append(word.to_dict()) idx += 1 # 2. Process custom words (manually entered by teacher) for cw in payload.custom_words: qa_items.append({ "id": f"qa_{idx+1}", "question": cw.source_text, "answer": cw.target_text, "question_type": "knowledge", "key_terms": [cw.source_text], "difficulty": 1, "source_hint": "", "leitner_box": 0, "correct_count": 0, "incorrect_count": 0, "last_seen": None, "next_review": None, "part_of_speech": "", "translations": {}, }) vocab_data.append({ "english": cw.source_text if payload.source_lang == "en" else cw.target_text if payload.target_lang == "en" else "", "german": cw.source_text if payload.source_lang == "de" else cw.target_text if payload.target_lang == "de" else "", "word": cw.source_text, "translation": cw.target_text, "source_lang": payload.source_lang, "target_lang": payload.target_lang, }) idx += 1 if not qa_items: raise HTTPException(status_code=400, detail="Keine gültigen Woerter") # Create learning unit lang_label = f"{payload.source_lang.upper()}→{payload.target_lang.upper()}" lu = create_learning_unit(LearningUnitCreate( title=payload.title, topic="Vocabulary", grade_level=payload.grade or "5-8", language=payload.target_lang, status="raw", )) # Save files analysis_dir = os.path.expanduser("~/Arbeitsblaetter/Lerneinheiten") os.makedirs(analysis_dir, exist_ok=True) with open(os.path.join(analysis_dir, f"{lu.id}_vocab.json"), "w", encoding="utf-8") as f: json.dump({"words": vocab_data, "title": payload.title}, f, ensure_ascii=False, indent=2) with open(os.path.join(analysis_dir, f"{lu.id}_qa.json"), "w", encoding="utf-8") as f: json.dump({ "qa_items": qa_items, "metadata": { "subject": f"Vocabulary {lang_label}", "grade_level": payload.grade or "5-8", "source_title": payload.title, "total_questions": len(qa_items), "source_lang": payload.source_lang, "target_lang": payload.target_lang, }, }, f, ensure_ascii=False, indent=2) # Auto-enrich images for dictionary words dict_ids = [wid for wid in payload.word_ids] if dict_ids: try: from services.image_service import enrich_words_with_images await enrich_words_with_images(dict_ids) except Exception as e: logger.warning(f"Image enrichment failed (non-critical): {e}") logger.info(f"Created vocab unit {lu.id} ({lang_label}) with {len(qa_items)} words") return { "unit_id": lu.id, "title": payload.title, "word_count": len(qa_items), "source_lang": payload.source_lang, "target_lang": payload.target_lang, "status": "created", } def _get_word_pair(word: VocabularyWord, source_lang: str, target_lang: str): """Extract source/target text from a VocabularyWord for the given language pair.""" lang_map = {"en": word.english, "de": word.german} # Check translations for other languages if source_lang not in lang_map: tr = word.translations or {} lang_map[source_lang] = tr.get(source_lang, {}).get("text", word.english) if target_lang not in lang_map: tr = word.translations or {} lang_map[target_lang] = tr.get(target_lang, {}).get("text", word.german) return lang_map.get(source_lang, word.english), lang_map.get(target_lang, word.german) async def _get_kaikki_word(word_id: str, source_lang: str, target_lang: str) -> Optional[dict]: """Look up a word by ID in the Kaikki table and return a vocab dict.""" pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT id, word, lang, pos, ipa, translations, example FROM vocabulary_kaikki WHERE id = $1", _to_uuid(word_id), ) if not row: return None tr = _parse_translations(row["translations"]) src = row["word"] if row["lang"] == source_lang else tr.get(source_lang, {}).get("text", "") tgt = tr.get(target_lang, {}).get("text", "") if row["lang"] != target_lang else row["word"] return { "id": str(row["id"]), "word": row["word"], "lang": row["lang"], "source_text": src or row["word"], "target_text": tgt, "pos": row["pos"], "ipa": row["ipa"] or "", "example": row["example"] or "", "translations": tr, } def _make_qa_item(idx: int, kw: dict, source_lang: str, target_lang: str) -> dict: """Create a QA item from a Kaikki word dict.""" return { "id": f"qa_{idx+1}", "question": kw.get("source_text", kw.get("word", "")), "answer": kw.get("target_text", ""), "question_type": "knowledge", "key_terms": [kw.get("source_text", kw.get("word", ""))], "difficulty": 0, "source_hint": kw.get("pos", ""), "leitner_box": 0, "correct_count": 0, "incorrect_count": 0, "last_seen": None, "next_review": None, "ipa_en": kw.get("ipa", "") if source_lang == "en" else "", "ipa_de": kw.get("ipa", "") if source_lang == "de" else "", "part_of_speech": kw.get("pos", ""), "translations": kw.get("translations", {}), } def _to_uuid(s: str): """Convert string to UUID, return as-is if already valid.""" import uuid try: return uuid.UUID(s) except (ValueError, AttributeError): return s