Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 51s
CI / test-go-edu-search (push) Successful in 47s
CI / test-python-klausur (push) Failing after 2m54s
CI / test-python-agent-core (push) Successful in 35s
CI / test-nodejs-website (push) Successful in 35s
New features: - Boundary repair: "ats th." → "at sth." (shifted OCR word boundaries) Tries shifting 1-2 chars between adjacent words, accepts if result includes a known abbreviation or produces better dictionary matches - Context split: "anew book" → "a new book" (ambiguous word merges) Explicit allow/deny list for article+word patterns (alive, alone, etc.) - Abbreviation awareness: 120+ known abbreviations (sth, sb, adj, etc.) are now recognized as valid words, preventing false corrections - Quality gate: boundary repairs only accepted when result scores higher than original (known words + abbreviations) 40 tests passing, all edge cases covered. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
545 lines
21 KiB
Python
545 lines
21 KiB
Python
"""
|
|
SmartSpellChecker — Language-aware OCR post-correction without LLMs.
|
|
|
|
Uses pyspellchecker (MIT) with dual EN+DE dictionaries for:
|
|
- Automatic language detection per word (dual-dictionary heuristic)
|
|
- OCR error correction (digit↔letter, umlauts, transpositions)
|
|
- Context-based disambiguation (a/I, l/I) via bigram lookup
|
|
- Mixed-language support for example sentences
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Literal, Optional, Set, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Init
|
|
# ---------------------------------------------------------------------------
|
|
|
|
try:
|
|
from spellchecker import SpellChecker as _SpellChecker
|
|
_en_spell = _SpellChecker(language='en', distance=1)
|
|
_de_spell = _SpellChecker(language='de', distance=1)
|
|
_AVAILABLE = True
|
|
except ImportError:
|
|
_AVAILABLE = False
|
|
logger.warning("pyspellchecker not installed — SmartSpellChecker disabled")
|
|
|
|
Lang = Literal["en", "de", "both", "unknown"]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bigram context for a/I disambiguation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Words that commonly follow "I" (subject pronoun → verb/modal)
|
|
_I_FOLLOWERS: frozenset = frozenset({
|
|
"am", "was", "have", "had", "do", "did", "will", "would", "can",
|
|
"could", "should", "shall", "may", "might", "must",
|
|
"think", "know", "see", "want", "need", "like", "love", "hate",
|
|
"go", "went", "come", "came", "say", "said", "get", "got",
|
|
"make", "made", "take", "took", "give", "gave", "tell", "told",
|
|
"feel", "felt", "find", "found", "believe", "hope", "wish",
|
|
"remember", "forget", "understand", "mean", "meant",
|
|
"don't", "didn't", "can't", "won't", "couldn't", "wouldn't",
|
|
"shouldn't", "haven't", "hadn't", "isn't", "wasn't",
|
|
"really", "just", "also", "always", "never", "often", "sometimes",
|
|
})
|
|
|
|
# Words that commonly follow "a" (article → noun/adjective)
|
|
_A_FOLLOWERS: frozenset = frozenset({
|
|
"lot", "few", "little", "bit", "good", "bad", "great", "new", "old",
|
|
"long", "short", "big", "small", "large", "huge", "tiny",
|
|
"nice", "beautiful", "wonderful", "terrible", "horrible",
|
|
"man", "woman", "boy", "girl", "child", "dog", "cat", "bird",
|
|
"book", "car", "house", "room", "school", "teacher", "student",
|
|
"day", "week", "month", "year", "time", "place", "way",
|
|
"friend", "family", "person", "problem", "question", "story",
|
|
"very", "really", "quite", "rather", "pretty", "single",
|
|
})
|
|
|
|
# Digit→letter substitutions (OCR confusion)
|
|
_DIGIT_SUBS: Dict[str, List[str]] = {
|
|
'0': ['o', 'O'],
|
|
'1': ['l', 'I'],
|
|
'5': ['s', 'S'],
|
|
'6': ['g', 'G'],
|
|
'8': ['b', 'B'],
|
|
'|': ['I', 'l'],
|
|
}
|
|
_SUSPICIOUS_CHARS = frozenset(_DIGIT_SUBS.keys())
|
|
|
|
# Umlaut confusion: OCR drops dots (ü→u, ä→a, ö→o)
|
|
_UMLAUT_MAP = {
|
|
'a': 'ä', 'o': 'ö', 'u': 'ü', 'i': 'ü',
|
|
'A': 'Ä', 'O': 'Ö', 'U': 'Ü', 'I': 'Ü',
|
|
}
|
|
|
|
# Tokenizer
|
|
_TOKEN_RE = re.compile(r"([A-Za-zÄÖÜäöüß'|]+)([^A-Za-zÄÖÜäöüß'|]*)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data types
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class CorrectionResult:
|
|
original: str
|
|
corrected: str
|
|
lang_detected: Lang
|
|
changed: bool
|
|
changes: List[str] = field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core class
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class SmartSpellChecker:
|
|
"""Language-aware OCR spell checker using pyspellchecker (no LLM)."""
|
|
|
|
def __init__(self):
|
|
if not _AVAILABLE:
|
|
raise RuntimeError("pyspellchecker not installed")
|
|
self.en = _en_spell
|
|
self.de = _de_spell
|
|
|
|
# --- Language detection ---
|
|
|
|
def detect_word_lang(self, word: str) -> Lang:
|
|
"""Detect language of a single word using dual-dict heuristic."""
|
|
w = word.lower().strip(".,;:!?\"'()")
|
|
if not w:
|
|
return "unknown"
|
|
in_en = bool(self.en.known([w]))
|
|
in_de = bool(self.de.known([w]))
|
|
if in_en and in_de:
|
|
return "both"
|
|
if in_en:
|
|
return "en"
|
|
if in_de:
|
|
return "de"
|
|
return "unknown"
|
|
|
|
def detect_text_lang(self, text: str) -> Lang:
|
|
"""Detect dominant language of a text string (sentence/phrase)."""
|
|
words = re.findall(r"[A-Za-zÄÖÜäöüß]+", text)
|
|
if not words:
|
|
return "unknown"
|
|
|
|
en_count = 0
|
|
de_count = 0
|
|
for w in words:
|
|
lang = self.detect_word_lang(w)
|
|
if lang == "en":
|
|
en_count += 1
|
|
elif lang == "de":
|
|
de_count += 1
|
|
# "both" doesn't count for either
|
|
|
|
if en_count > de_count:
|
|
return "en"
|
|
if de_count > en_count:
|
|
return "de"
|
|
if en_count == de_count and en_count > 0:
|
|
return "both"
|
|
return "unknown"
|
|
|
|
# --- Single-word correction ---
|
|
|
|
def _known(self, word: str) -> bool:
|
|
"""True if word is known in EN or DE dictionary, or is a known abbreviation."""
|
|
w = word.lower()
|
|
if bool(self.en.known([w])) or bool(self.de.known([w])):
|
|
return True
|
|
# Also accept known abbreviations (sth, sb, adj, etc.)
|
|
try:
|
|
from cv_ocr_engines import _KNOWN_ABBREVIATIONS
|
|
if w in _KNOWN_ABBREVIATIONS:
|
|
return True
|
|
except ImportError:
|
|
pass
|
|
return False
|
|
|
|
def _known_in(self, word: str, lang: str) -> bool:
|
|
"""True if word is known in a specific language dictionary."""
|
|
w = word.lower()
|
|
spell = self.en if lang == "en" else self.de
|
|
return bool(spell.known([w]))
|
|
|
|
def correct_word(self, word: str, lang: str = "en",
|
|
prev_word: str = "", next_word: str = "") -> Optional[str]:
|
|
"""Correct a single word for the given language.
|
|
|
|
Returns None if no correction needed, or the corrected string.
|
|
|
|
Args:
|
|
word: The word to check/correct
|
|
lang: Expected language ("en" or "de")
|
|
prev_word: Previous word (for context)
|
|
next_word: Next word (for context)
|
|
"""
|
|
if not word or not word.strip():
|
|
return None
|
|
|
|
# Skip numbers, abbreviations with dots, very short tokens
|
|
if word.isdigit() or '.' in word:
|
|
return None
|
|
|
|
has_suspicious = any(ch in _SUSPICIOUS_CHARS for ch in word)
|
|
|
|
# 1. Already known → no fix
|
|
if self._known(word):
|
|
# But check a/I disambiguation for single-char words
|
|
if word.lower() in ('l', '|') and next_word:
|
|
return self._disambiguate_a_I(word, next_word)
|
|
return None
|
|
|
|
# 2. Digit/pipe substitution
|
|
if has_suspicious:
|
|
if word == '|':
|
|
return 'I'
|
|
# Try single-char substitutions
|
|
for i, ch in enumerate(word):
|
|
if ch not in _DIGIT_SUBS:
|
|
continue
|
|
for replacement in _DIGIT_SUBS[ch]:
|
|
candidate = word[:i] + replacement + word[i + 1:]
|
|
if self._known(candidate):
|
|
return candidate
|
|
# Try multi-char substitution (e.g., "sch00l" → "school")
|
|
multi = self._try_multi_digit_sub(word)
|
|
if multi:
|
|
return multi
|
|
|
|
# 3. Umlaut correction (German)
|
|
if lang == "de" and len(word) >= 3 and word.isalpha():
|
|
umlaut_fix = self._try_umlaut_fix(word)
|
|
if umlaut_fix:
|
|
return umlaut_fix
|
|
|
|
# 4. General spell correction
|
|
if not has_suspicious and len(word) >= 3 and word.isalpha():
|
|
# Safety: don't correct if the word is valid in the OTHER language
|
|
# (either directly or via umlaut fix)
|
|
other_lang = "de" if lang == "en" else "en"
|
|
if self._known_in(word, other_lang):
|
|
return None
|
|
if other_lang == "de" and self._try_umlaut_fix(word):
|
|
return None # has a valid DE umlaut variant → don't touch
|
|
|
|
spell = self.en if lang == "en" else self.de
|
|
correction = spell.correction(word.lower())
|
|
if correction and correction != word.lower():
|
|
if word[0].isupper():
|
|
correction = correction[0].upper() + correction[1:]
|
|
if self._known(correction):
|
|
return correction
|
|
|
|
return None
|
|
|
|
# --- Multi-digit substitution ---
|
|
|
|
def _try_multi_digit_sub(self, word: str) -> Optional[str]:
|
|
"""Try replacing multiple digits simultaneously."""
|
|
positions = [(i, ch) for i, ch in enumerate(word) if ch in _DIGIT_SUBS]
|
|
if len(positions) < 1 or len(positions) > 4:
|
|
return None
|
|
|
|
# Try all combinations (max 2^4 = 16 for 4 positions)
|
|
chars = list(word)
|
|
best = None
|
|
self._multi_sub_recurse(chars, positions, 0, best_result=[None])
|
|
return self._multi_sub_recurse_result
|
|
|
|
_multi_sub_recurse_result: Optional[str] = None
|
|
|
|
def _try_multi_digit_sub(self, word: str) -> Optional[str]:
|
|
"""Try replacing multiple digits simultaneously using BFS."""
|
|
positions = [(i, ch) for i, ch in enumerate(word) if ch in _DIGIT_SUBS]
|
|
if not positions or len(positions) > 4:
|
|
return None
|
|
|
|
# BFS over substitution combinations
|
|
queue = [list(word)]
|
|
for pos, ch in positions:
|
|
next_queue = []
|
|
for current in queue:
|
|
# Keep original
|
|
next_queue.append(current[:])
|
|
# Try each substitution
|
|
for repl in _DIGIT_SUBS[ch]:
|
|
variant = current[:]
|
|
variant[pos] = repl
|
|
next_queue.append(variant)
|
|
queue = next_queue
|
|
|
|
# Check which combinations produce known words
|
|
for combo in queue:
|
|
candidate = "".join(combo)
|
|
if candidate != word and self._known(candidate):
|
|
return candidate
|
|
|
|
return None
|
|
|
|
# --- Umlaut fix ---
|
|
|
|
def _try_umlaut_fix(self, word: str) -> Optional[str]:
|
|
"""Try single-char umlaut substitutions for German words."""
|
|
for i, ch in enumerate(word):
|
|
if ch in _UMLAUT_MAP:
|
|
candidate = word[:i] + _UMLAUT_MAP[ch] + word[i + 1:]
|
|
if self._known(candidate):
|
|
return candidate
|
|
return None
|
|
|
|
# --- Boundary repair (shifted word boundaries) ---
|
|
|
|
def _try_boundary_repair(self, word1: str, word2: str) -> Optional[Tuple[str, str]]:
|
|
"""Fix shifted word boundaries between adjacent tokens.
|
|
|
|
OCR sometimes shifts the boundary: "at sth." → "ats th."
|
|
Try moving 1-2 chars from end of word1 to start of word2 and vice versa.
|
|
Returns (fixed_word1, fixed_word2) or None.
|
|
"""
|
|
# Import known abbreviations for vocabulary context
|
|
try:
|
|
from cv_ocr_engines import _KNOWN_ABBREVIATIONS
|
|
except ImportError:
|
|
_KNOWN_ABBREVIATIONS = set()
|
|
|
|
# Strip trailing punctuation for checking, preserve for result
|
|
w2_stripped = word2.rstrip(".,;:!?")
|
|
w2_punct = word2[len(w2_stripped):]
|
|
|
|
# Try shifting 1-2 chars from word1 → word2
|
|
for shift in (1, 2):
|
|
if len(word1) <= shift:
|
|
continue
|
|
new_w1 = word1[:-shift]
|
|
new_w2_base = word1[-shift:] + w2_stripped
|
|
|
|
w1_ok = self._known(new_w1) or new_w1.lower() in _KNOWN_ABBREVIATIONS
|
|
w2_ok = self._known(new_w2_base) or new_w2_base.lower() in _KNOWN_ABBREVIATIONS
|
|
|
|
if w1_ok and w2_ok:
|
|
return (new_w1, new_w2_base + w2_punct)
|
|
|
|
# Try shifting 1-2 chars from word2 → word1
|
|
for shift in (1, 2):
|
|
if len(w2_stripped) <= shift:
|
|
continue
|
|
new_w1 = word1 + w2_stripped[:shift]
|
|
new_w2_base = w2_stripped[shift:]
|
|
|
|
w1_ok = self._known(new_w1) or new_w1.lower() in _KNOWN_ABBREVIATIONS
|
|
w2_ok = self._known(new_w2_base) or new_w2_base.lower() in _KNOWN_ABBREVIATIONS
|
|
|
|
if w1_ok and w2_ok:
|
|
return (new_w1, new_w2_base + w2_punct)
|
|
|
|
return None
|
|
|
|
# --- Context-based word split for ambiguous merges ---
|
|
|
|
# Patterns where a valid word is actually "a" + adjective/noun
|
|
_ARTICLE_SPLIT_CANDIDATES = {
|
|
# word → (article, remainder) — only when followed by a compatible word
|
|
"anew": ("a", "new"),
|
|
"areal": ("a", "real"),
|
|
"alive": None, # genuinely one word, never split
|
|
"alone": None,
|
|
"aware": None,
|
|
"alike": None,
|
|
"apart": None,
|
|
"aside": None,
|
|
"above": None,
|
|
"about": None,
|
|
"among": None,
|
|
"along": None,
|
|
}
|
|
|
|
def _try_context_split(self, word: str, next_word: str,
|
|
prev_word: str) -> Optional[str]:
|
|
"""Split words like 'anew' → 'a new' when context indicates a merge.
|
|
|
|
Only splits when:
|
|
- The word is in the split candidates list
|
|
- The following word makes sense as a noun (for "a + adj + noun" pattern)
|
|
- OR the word is unknown and can be split into article + known word
|
|
"""
|
|
w_lower = word.lower()
|
|
|
|
# Check explicit candidates
|
|
if w_lower in self._ARTICLE_SPLIT_CANDIDATES:
|
|
split = self._ARTICLE_SPLIT_CANDIDATES[w_lower]
|
|
if split is None:
|
|
return None # explicitly marked as "don't split"
|
|
article, remainder = split
|
|
# Only split if followed by a word (noun pattern)
|
|
if next_word and next_word[0].islower():
|
|
return f"{article} {remainder}"
|
|
# Also split if remainder + next_word makes a common phrase
|
|
if next_word and self._known(next_word):
|
|
return f"{article} {remainder}"
|
|
|
|
# Generic: if word starts with 'a' and rest is a known adjective/word
|
|
if (len(word) >= 4 and word[0].lower() == 'a'
|
|
and not self._known(word) # only for UNKNOWN words
|
|
and self._known(word[1:])):
|
|
return f"a {word[1:]}"
|
|
|
|
return None
|
|
|
|
# --- a/I disambiguation ---
|
|
|
|
def _disambiguate_a_I(self, token: str, next_word: str) -> Optional[str]:
|
|
"""Disambiguate 'a' vs 'I' (and OCR variants like 'l', '|')."""
|
|
nw = next_word.lower().strip(".,;:!?")
|
|
if nw in _I_FOLLOWERS:
|
|
return "I"
|
|
if nw in _A_FOLLOWERS:
|
|
return "a"
|
|
# Fallback: check if next word is more commonly a verb (→I) or noun/adj (→a)
|
|
# Simple heuristic: if next word starts with uppercase (and isn't first in sentence)
|
|
# it's likely a German noun following "I"... but in English context, uppercase
|
|
# after "I" is unusual.
|
|
return None # uncertain, don't change
|
|
|
|
# --- Full text correction ---
|
|
|
|
def correct_text(self, text: str, lang: str = "en") -> CorrectionResult:
|
|
"""Correct a full text string (field value).
|
|
|
|
Three passes:
|
|
1. Boundary repair — fix shifted word boundaries between adjacent tokens
|
|
2. Context split — split ambiguous merges (anew → a new)
|
|
3. Per-word correction — spell check individual words
|
|
|
|
Args:
|
|
text: The text to correct
|
|
lang: Expected language ("en" or "de")
|
|
"""
|
|
if not text or not text.strip():
|
|
return CorrectionResult(text, text, "unknown", False)
|
|
|
|
detected = self.detect_text_lang(text) if lang == "auto" else lang
|
|
effective_lang = detected if detected in ("en", "de") else "en"
|
|
|
|
changes: List[str] = []
|
|
tokens = list(_TOKEN_RE.finditer(text))
|
|
|
|
# Extract token list: [(word, separator), ...]
|
|
token_list: List[List[str]] = [] # [[word, sep], ...]
|
|
for m in tokens:
|
|
token_list.append([m.group(1), m.group(2)])
|
|
|
|
# --- Pass 1: Boundary repair between adjacent unknown words ---
|
|
# Import abbreviations for the heuristic below
|
|
try:
|
|
from cv_ocr_engines import _KNOWN_ABBREVIATIONS as _ABBREVS
|
|
except ImportError:
|
|
_ABBREVS = set()
|
|
|
|
for i in range(len(token_list) - 1):
|
|
w1 = token_list[i][0]
|
|
w2_raw = token_list[i + 1][0]
|
|
# Include trailing punct from separator in w2 for abbreviation matching
|
|
# e.g., "ats" + " " + "th" + "." → try repair("ats", "th.")
|
|
w2_with_punct = w2_raw + token_list[i + 1][1].rstrip(" ")
|
|
# Skip if both are known AND neither is suspiciously short (≤3 chars)
|
|
# Short known words like "ats", "th" may be OCR boundary errors
|
|
both_known = self._known(w1) and self._known(w2_raw)
|
|
both_long = len(w1) > 3 and len(w2_raw) > 3
|
|
if both_known and both_long:
|
|
continue
|
|
# Try with punctuation first (for abbreviations like "sth.")
|
|
repair = self._try_boundary_repair(w1, w2_with_punct)
|
|
if not repair and w2_with_punct != w2_raw:
|
|
repair = self._try_boundary_repair(w1, w2_raw)
|
|
if repair:
|
|
new_w1, new_w2_full = repair
|
|
# Quality gate: only accept if repair is actually better
|
|
# Better = at least one result is a known abbreviation, or
|
|
# both results are longer/more common than originals
|
|
new_w2_base = new_w2_full.rstrip(".,;:!?")
|
|
old_score = (len(w1) >= 3) + (len(w2_raw) >= 3)
|
|
new_score = (
|
|
(self._known(new_w1) or new_w1.lower() in _ABBREVS)
|
|
+ (self._known(new_w2_base) or new_w2_base.lower() in _ABBREVS)
|
|
)
|
|
# Accept if new pair scores higher, or if it includes an abbreviation
|
|
has_abbrev = new_w1.lower() in _ABBREVS or new_w2_base.lower() in _ABBREVS
|
|
if new_score >= old_score or has_abbrev:
|
|
new_w2_punct = new_w2_full[len(new_w2_base):]
|
|
changes.append(f"{w1} {w2_raw}→{new_w1} {new_w2_base}")
|
|
token_list[i][0] = new_w1
|
|
token_list[i + 1][0] = new_w2_base
|
|
if new_w2_punct:
|
|
token_list[i + 1][1] = new_w2_punct + token_list[i + 1][1].lstrip(".,;:!?")
|
|
|
|
# --- Pass 2: Context split (anew → a new) ---
|
|
expanded: List[List[str]] = []
|
|
for i, (word, sep) in enumerate(token_list):
|
|
next_word = token_list[i + 1][0] if i + 1 < len(token_list) else ""
|
|
prev_word = token_list[i - 1][0] if i > 0 else ""
|
|
split = self._try_context_split(word, next_word, prev_word)
|
|
if split and split != word:
|
|
changes.append(f"{word}→{split}")
|
|
expanded.append([split, sep])
|
|
else:
|
|
expanded.append([word, sep])
|
|
token_list = expanded
|
|
|
|
# --- Pass 3: Per-word correction ---
|
|
parts: List[str] = []
|
|
for i, (word, sep) in enumerate(token_list):
|
|
next_word = token_list[i + 1][0] if i + 1 < len(token_list) else ""
|
|
prev_word = token_list[i - 1][0] if i > 0 else ""
|
|
|
|
correction = self.correct_word(
|
|
word, lang=effective_lang,
|
|
prev_word=prev_word, next_word=next_word,
|
|
)
|
|
if correction and correction != word:
|
|
changes.append(f"{word}→{correction}")
|
|
parts.append(correction)
|
|
else:
|
|
parts.append(word)
|
|
parts.append(sep)
|
|
|
|
# Append any trailing text
|
|
last_end = tokens[-1].end() if tokens else 0
|
|
if last_end < len(text):
|
|
parts.append(text[last_end:])
|
|
|
|
corrected = "".join(parts)
|
|
return CorrectionResult(
|
|
original=text,
|
|
corrected=corrected,
|
|
lang_detected=detected,
|
|
changed=corrected != text,
|
|
changes=changes,
|
|
)
|
|
|
|
# --- Vocabulary entry correction ---
|
|
|
|
def correct_vocab_entry(self, english: str, german: str,
|
|
example: str = "") -> Dict[str, CorrectionResult]:
|
|
"""Correct a full vocabulary entry (EN + DE + example).
|
|
|
|
Uses column position to determine language — the most reliable signal.
|
|
"""
|
|
results = {}
|
|
results["english"] = self.correct_text(english, lang="en")
|
|
results["german"] = self.correct_text(german, lang="de")
|
|
if example:
|
|
# For examples, auto-detect language
|
|
results["example"] = self.correct_text(example, lang="auto")
|
|
return results
|