""" CV Review Spell — Rule-based OCR spell correction (no LLM). Provides dictionary-backed digit-to-letter substitution, umlaut correction, general spell correction, merged-word splitting, and page-ref normalization. Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import logging import re import time from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) try: from spellchecker import SpellChecker as _SpellChecker _en_spell = _SpellChecker(language='en', distance=1) _de_spell = _SpellChecker(language='de', distance=1) _SPELL_AVAILABLE = True logger.info("pyspellchecker loaded (EN+DE)") except ImportError: _SPELL_AVAILABLE = False _en_spell = None # type: ignore[assignment] _de_spell = None # type: ignore[assignment] logger.warning("pyspellchecker not installed") # ---- Page-Ref Normalization ---- # Normalizes OCR variants like "p-60", "p 61", "p60" -> "p.60" _PAGE_REF_RE = re.compile(r'\bp[\s\-]?(\d+)', re.IGNORECASE) def _normalize_page_ref(text: str) -> str: """Normalize page references: 'p-60' / 'p 61' / 'p60' -> 'p.60'.""" if not text: return text return _PAGE_REF_RE.sub(lambda m: f"p.{m.group(1)}", text) # Suspicious OCR chars -> ordered list of most-likely correct replacements _SPELL_SUBS: Dict[str, List[str]] = { '0': ['O', 'o'], '1': ['l', 'I'], '5': ['S', 's'], '6': ['G', 'g'], '8': ['B', 'b'], '|': ['I', 'l', '1'], } _SPELL_SUSPICIOUS = frozenset(_SPELL_SUBS.keys()) # Tokenizer: word tokens (letters + pipe) alternating with separators _SPELL_TOKEN_RE = re.compile(r'([A-Za-z\u00c4\u00d6\u00dc\u00e4\u00f6\u00fc\u00df|]+)([^A-Za-z\u00c4\u00d6\u00dc\u00e4\u00f6\u00fc\u00df|]*)') def _spell_dict_knows(word: str) -> bool: """True if word is known in EN or DE dictionary.""" if not _SPELL_AVAILABLE: return False w = word.lower() return bool(_en_spell.known([w])) or bool(_de_spell.known([w])) def _try_split_merged_word(token: str) -> Optional[str]: """Try to split a merged word like 'atmyschool' into 'at my school'. Uses dynamic programming to find the shortest sequence of dictionary words that covers the entire token. Only returns a result when the split produces at least 2 words and ALL parts are known dictionary words. Preserves original capitalisation by mapping back to the input string. """ if not _SPELL_AVAILABLE or len(token) < 4: return None lower = token.lower() n = len(lower) # dp[i] = (word_lengths_list, score) for best split of lower[:i], or None dp: list = [None] * (n + 1) dp[0] = ([], 0) for i in range(1, n + 1): for j in range(max(0, i - 20), i): if dp[j] is None: continue candidate = lower[j:i] word_len = i - j if word_len == 1 and candidate not in ('a', 'i'): continue if _spell_dict_knows(candidate): prev_words, prev_sq = dp[j] new_words = prev_words + [word_len] new_sq = prev_sq + word_len * word_len new_key = (-len(new_words), new_sq) if dp[i] is None: dp[i] = (new_words, new_sq) else: old_key = (-len(dp[i][0]), dp[i][1]) if new_key >= old_key: dp[i] = (new_words, new_sq) if dp[n] is None or len(dp[n][0]) < 2: return None result = [] pos = 0 for wlen in dp[n][0]: result.append(token[pos:pos + wlen]) pos += wlen logger.debug("Split merged word: %r -> %r", token, " ".join(result)) return " ".join(result) def _spell_fix_token(token: str, field: str = "") -> Optional[str]: """Return corrected form of token, or None if no fix needed/possible. *field* is 'english' or 'german' -- used to pick the right dictionary. """ has_suspicious = any(ch in _SPELL_SUSPICIOUS for ch in token) # 1. Already known word -> no fix needed if _spell_dict_knows(token): return None # 2. Digit/pipe substitution if has_suspicious: if token == '|': return 'I' for i, ch in enumerate(token): if ch not in _SPELL_SUBS: continue for replacement in _SPELL_SUBS[ch]: candidate = token[:i] + replacement + token[i + 1:] if _spell_dict_knows(candidate): return candidate first = token[0] if first in _SPELL_SUBS and len(token) >= 2: rest = token[1:] if rest.isalpha() and rest.islower(): candidate = _SPELL_SUBS[first][0] + rest if not candidate[0].isdigit(): return candidate # 3. OCR umlaut confusion if len(token) >= 3 and token.isalpha() and field == "german": _UMLAUT_SUBS = {'a': '\u00e4', 'o': '\u00f6', 'u': '\u00fc', 'i': '\u00fc', 'A': '\u00c4', 'O': '\u00d6', 'U': '\u00dc', 'I': '\u00dc'} for i, ch in enumerate(token): if ch in _UMLAUT_SUBS: candidate = token[:i] + _UMLAUT_SUBS[ch] + token[i + 1:] if _spell_dict_knows(candidate): return candidate # 4. General spell correction for unknown words (no digits/pipes) if not has_suspicious and len(token) >= 3 and token.isalpha(): spell = _en_spell if field == "english" else _de_spell if field == "german" else None if spell is not None: correction = spell.correction(token.lower()) if correction and correction != token.lower(): if token[0].isupper(): correction = correction[0].upper() + correction[1:] if _spell_dict_knows(correction): return correction # 5. Merged-word split if len(token) >= 4 and token.isalpha(): split = _try_split_merged_word(token) if split: return split return None def _spell_fix_field(text: str, field: str = "") -> Tuple[str, bool]: """Apply OCR corrections to a text field. Returns (fixed_text, was_changed).""" if not text: return text, False has_suspicious = any(ch in text for ch in _SPELL_SUSPICIOUS) if not has_suspicious and not any(c.isalpha() for c in text): return text, False # Pattern: | immediately before . or , -> numbered list prefix fixed = re.sub(r'(? Dict: """Rule-based OCR correction: spell-checker + structural heuristics. Deterministic -- never translates, never touches IPA, never hallucinates. Uses SmartSpellChecker for language-aware corrections with context-based disambiguation (a/I), multi-digit substitution, and cross-language guard. """ from cv_review_llm import _entry_needs_review t0 = time.time() changes: List[Dict] = [] all_corrected: List[Dict] = [] # Use SmartSpellChecker if available _smart = None try: from smart_spell import SmartSpellChecker _smart = SmartSpellChecker() logger.debug("spell_review: using SmartSpellChecker") except Exception: logger.debug("spell_review: SmartSpellChecker not available, using legacy") _LANG_MAP = {"english": "en", "german": "de", "example": "auto"} for i, entry in enumerate(entries): e = dict(entry) # Page-ref normalization old_ref = (e.get("source_page") or "").strip() if old_ref: new_ref = _normalize_page_ref(old_ref) if new_ref != old_ref: changes.append({ "row_index": e.get("row_index", i), "field": "source_page", "old": old_ref, "new": new_ref, }) e["source_page"] = new_ref e["llm_corrected"] = True if not _entry_needs_review(e): all_corrected.append(e) continue for field_name in ("english", "german", "example"): old_val = (e.get(field_name) or "").strip() if not old_val: continue if _smart: lang_code = _LANG_MAP.get(field_name, "en") result = _smart.correct_text(old_val, lang=lang_code) new_val = result.corrected was_changed = result.changed else: lang = "german" if field_name in ("german", "example") else "english" new_val, was_changed = _spell_fix_field(old_val, field=lang) if was_changed and new_val != old_val: changes.append({ "row_index": e.get("row_index", i), "field": field_name, "old": old_val, "new": new_val, }) e[field_name] = new_val e["llm_corrected"] = True all_corrected.append(e) duration_ms = int((time.time() - t0) * 1000) model_name = "smart-spell-checker" if _smart else "spell-checker" return { "entries_original": entries, "entries_corrected": all_corrected, "changes": changes, "skipped_count": 0, "model_used": model_name, "duration_ms": duration_ms, } async def spell_review_entries_streaming(entries: List[Dict], batch_size: int = 50): """Async generator yielding SSE-compatible events for spell-checker review.""" total = len(entries) yield { "type": "meta", "total_entries": total, "to_review": total, "skipped": 0, "model": "spell-checker", "batch_size": batch_size, } result = spell_review_entries_sync(entries) changes = result["changes"] yield { "type": "batch", "batch_index": 0, "entries_reviewed": [e.get("row_index", i) for i, e in enumerate(entries)], "changes": changes, "duration_ms": result["duration_ms"], "progress": {"current": total, "total": total}, } yield { "type": "complete", "changes": changes, "model_used": "spell-checker", "duration_ms": result["duration_ms"], "total_entries": total, "reviewed": total, "skipped": 0, "corrections_found": len(changes), "entries_corrected": result["entries_corrected"], }