Files
breakpilot-lehrer/backend-lehrer/vocabulary_db.py
Benjamin Admin 0dbfa87058
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 1m9s
CI / test-go-edu-search (push) Successful in 1m4s
CI / test-python-klausur (push) Failing after 2m59s
CI / test-python-agent-core (push) Successful in 33s
CI / test-nodejs-website (push) Successful in 28s
Fix: pg_trgm optional, table creation no longer fails without it
Trigram extension and index are now created in a separate try/catch
so table creation succeeds even without pg_trgm. Search falls back
to ILIKE when trigram functions are not available.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 13:51:09 +02:00

297 lines
10 KiB
Python

"""
Vocabulary Database — PostgreSQL storage for the vocabulary word catalog.
Stores 160k+ words with translations, IPA, syllables, examples, images, audio.
Uses asyncpg for async PostgreSQL access (same pattern as game/database.py).
Schema: lehrer.vocabulary_words (search_path set in main.py)
"""
import logging
import os
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
_RAW_DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot@postgres:5432/breakpilot",
)
# Strip SQLAlchemy dialect prefix (asyncpg needs plain postgresql://)
DATABASE_URL = _RAW_DB_URL.replace("postgresql+asyncpg://", "postgresql://")
# Strip search_path options (set via SET after connect)
if "options=" in DATABASE_URL:
DATABASE_URL = DATABASE_URL.split("?")[0] if "options=" in DATABASE_URL.split("?")[-1] else DATABASE_URL
_pool = None
async def get_pool():
"""Get or create the asyncpg connection pool."""
global _pool
if _pool is None:
import asyncpg
_pool = await asyncpg.create_pool(
DATABASE_URL, min_size=2, max_size=10,
server_settings={"search_path": "lehrer,core,public"},
)
return _pool
async def init_vocabulary_tables():
"""Create vocabulary tables if they don't exist."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS vocabulary_words (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
english TEXT NOT NULL,
german TEXT NOT NULL DEFAULT '',
ipa_en TEXT NOT NULL DEFAULT '',
ipa_de TEXT NOT NULL DEFAULT '',
part_of_speech TEXT NOT NULL DEFAULT '',
syllables_en TEXT[] NOT NULL DEFAULT '{}',
syllables_de TEXT[] NOT NULL DEFAULT '{}',
example_en TEXT NOT NULL DEFAULT '',
example_de TEXT NOT NULL DEFAULT '',
image_url TEXT NOT NULL DEFAULT '',
audio_url_en TEXT NOT NULL DEFAULT '',
audio_url_de TEXT NOT NULL DEFAULT '',
difficulty INT NOT NULL DEFAULT 1,
tags TEXT[] NOT NULL DEFAULT '{}',
translations JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_vocab_english
ON vocabulary_words (lower(english));
CREATE INDEX IF NOT EXISTS idx_vocab_german
ON vocabulary_words (lower(german));
CREATE INDEX IF NOT EXISTS idx_vocab_pos
ON vocabulary_words (part_of_speech);
CREATE INDEX IF NOT EXISTS idx_vocab_difficulty
ON vocabulary_words (difficulty);
CREATE INDEX IF NOT EXISTS idx_vocab_tags
ON vocabulary_words USING GIN (tags);
""")
# Enable trigram extension for fuzzy search (optional)
try:
await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm;")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_vocab_english_trgm
ON vocabulary_words USING GIN (english gin_trgm_ops);
""")
except Exception:
logger.info("pg_trgm not available — trigram search disabled, using LIKE fallback")
logger.info("vocabulary_words table initialized")
@dataclass
class VocabularyWord:
"""A single vocabulary word with all metadata."""
id: str = ""
english: str = ""
german: str = ""
ipa_en: str = ""
ipa_de: str = ""
part_of_speech: str = ""
syllables_en: List[str] = field(default_factory=list)
syllables_de: List[str] = field(default_factory=list)
example_en: str = ""
example_de: str = ""
image_url: str = ""
audio_url_en: str = ""
audio_url_de: str = ""
difficulty: int = 1
tags: List[str] = field(default_factory=list)
translations: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def _row_to_word(row) -> VocabularyWord:
"""Convert an asyncpg Record to VocabularyWord."""
import json
translations = row["translations"]
if isinstance(translations, str):
translations = json.loads(translations)
return VocabularyWord(
id=str(row["id"]),
english=row["english"],
german=row["german"],
ipa_en=row["ipa_en"],
ipa_de=row["ipa_de"],
part_of_speech=row["part_of_speech"],
syllables_en=list(row["syllables_en"] or []),
syllables_de=list(row["syllables_de"] or []),
example_en=row["example_en"],
example_de=row["example_de"],
image_url=row["image_url"],
audio_url_en=row["audio_url_en"],
audio_url_de=row["audio_url_de"],
difficulty=row["difficulty"],
tags=list(row["tags"] or []),
translations=translations or {},
)
async def search_words(
query: str, lang: str = "en", limit: int = 20, offset: int = 0,
) -> List[VocabularyWord]:
"""Full-text search for words. Uses trigram similarity if available, else ILIKE."""
pool = await get_pool()
col = "english" if lang == "en" else "german"
async with pool.acquire() as conn:
# Try trigram search first, fall back to ILIKE
try:
rows = await conn.fetch(
f"""
SELECT * FROM vocabulary_words
WHERE lower({col}) LIKE $1 OR {col} % $2
ORDER BY similarity({col}, $2) DESC, lower({col})
LIMIT $3 OFFSET $4
""",
f"%{query.lower()}%", query, limit, offset,
)
except Exception:
rows = await conn.fetch(
f"""
SELECT * FROM vocabulary_words
WHERE lower({col}) LIKE $1
ORDER BY lower({col})
LIMIT $2 OFFSET $3
""",
f"%{query.lower()}%", limit, offset,
)
return [_row_to_word(r) for r in rows]
async def get_word(word_id: str) -> Optional[VocabularyWord]:
"""Get a single word by ID."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM vocabulary_words WHERE id = $1", uuid.UUID(word_id),
)
return _row_to_word(row) if row else None
async def browse_words(
pos: str = "", difficulty: int = 0, tag: str = "",
limit: int = 50, offset: int = 0,
) -> List[VocabularyWord]:
"""Browse words with filters."""
pool = await get_pool()
conditions = []
params: List[Any] = []
idx = 1
if pos:
conditions.append(f"part_of_speech = ${idx}")
params.append(pos)
idx += 1
if difficulty > 0:
conditions.append(f"difficulty = ${idx}")
params.append(difficulty)
idx += 1
if tag:
conditions.append(f"${idx} = ANY(tags)")
params.append(tag)
idx += 1
where = "WHERE " + " AND ".join(conditions) if conditions else ""
params.extend([limit, offset])
async with pool.acquire() as conn:
rows = await conn.fetch(
f"SELECT * FROM vocabulary_words {where} ORDER BY english LIMIT ${idx} OFFSET ${idx+1}",
*params,
)
return [_row_to_word(r) for r in rows]
async def insert_word(word: VocabularyWord) -> str:
"""Insert a new word, returns the ID."""
pool = await get_pool()
import json
word_id = word.id or str(uuid.uuid4())
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO vocabulary_words
(id, english, german, ipa_en, ipa_de, part_of_speech,
syllables_en, syllables_de, example_en, example_de,
image_url, audio_url_en, audio_url_de, difficulty, tags, translations)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)
ON CONFLICT (id) DO NOTHING
""",
uuid.UUID(word_id), word.english, word.german,
word.ipa_en, word.ipa_de, word.part_of_speech,
word.syllables_en, word.syllables_de,
word.example_en, word.example_de,
word.image_url, word.audio_url_en, word.audio_url_de,
word.difficulty, word.tags, json.dumps(word.translations),
)
return word_id
async def insert_words_bulk(words: List[VocabularyWord]) -> int:
"""Bulk insert words. Returns count of inserted rows."""
pool = await get_pool()
import json
records = []
for w in words:
wid = w.id or str(uuid.uuid4())
records.append((
uuid.UUID(wid), w.english, w.german,
w.ipa_en, w.ipa_de, w.part_of_speech,
w.syllables_en, w.syllables_de,
w.example_en, w.example_de,
w.image_url, w.audio_url_en, w.audio_url_de,
w.difficulty, w.tags, json.dumps(w.translations),
))
async with pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO vocabulary_words
(id, english, german, ipa_en, ipa_de, part_of_speech,
syllables_en, syllables_de, example_en, example_de,
image_url, audio_url_en, audio_url_de, difficulty, tags, translations)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)
ON CONFLICT (id) DO NOTHING
""",
records,
)
return len(records)
async def count_words() -> int:
"""Count total words in the database."""
pool = await get_pool()
async with pool.acquire() as conn:
return await conn.fetchval("SELECT COUNT(*) FROM vocabulary_words")
async def get_all_tags() -> List[str]:
"""Get all unique tags."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT DISTINCT unnest(tags) AS tag FROM vocabulary_words ORDER BY tag"
)
return [r["tag"] for r in rows]
async def get_all_pos() -> List[str]:
"""Get all unique parts of speech."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT DISTINCT part_of_speech FROM vocabulary_words WHERE part_of_speech != '' ORDER BY part_of_speech"
)
return [r["part_of_speech"] for r in rows]