klausur-service (11 files): - cv_gutter_repair, ocr_pipeline_regression, upload_api - ocr_pipeline_sessions, smart_spell, nru_worksheet_generator - ocr_pipeline_overlays, mail/aggregator, zeugnis_api - cv_syllable_detect, self_rag backend-lehrer (17 files): - classroom_engine/suggestions, generators/quiz_generator - worksheets_api, llm_gateway/comparison, state_engine_api - classroom/models (→ 4 submodules), services/file_processor - alerts_agent/api/wizard+digests+routes, content_generators/pdf - classroom/routes/sessions, llm_gateway/inference - classroom_engine/analytics, auth/keycloak_auth - alerts_agent/processing/rule_engine, ai_processor/print_versions agent-core (5 files): - brain/memory_store, brain/knowledge_graph, brain/context_manager - orchestrator/supervisor, sessions/session_manager admin-lehrer (5 components): - GridOverlay, StepGridReview, DevOpsPipelineSidebar - DataFlowDiagram, sbom/wizard/page website (2 files): - DependencyMap, lehrer/abitur-archiv Other: nibis_ingestion, grid_detection_service, export-doclayout-onnx Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
290 lines
11 KiB
Python
290 lines
11 KiB
Python
"""
|
|
SmartSpellChecker Text — full text correction, boundary repair, context split.
|
|
|
|
Extracted from smart_spell.py for modularity.
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
"""
|
|
|
|
import re
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
from smart_spell_core import (
|
|
_SmartSpellCoreBase,
|
|
_TOKEN_RE,
|
|
CorrectionResult,
|
|
Lang,
|
|
)
|
|
|
|
|
|
class SmartSpellChecker(_SmartSpellCoreBase):
|
|
"""Language-aware OCR spell checker using pyspellchecker (no LLM).
|
|
|
|
Inherits single-word correction from _SmartSpellCoreBase.
|
|
Adds text-level passes: boundary repair, context split, full correction.
|
|
"""
|
|
|
|
# --- Boundary repair (shifted word boundaries) ---
|
|
|
|
def _try_boundary_repair(self, word1: str, word2: str) -> Optional[Tuple[str, str]]:
|
|
"""Fix shifted word boundaries between adjacent tokens.
|
|
|
|
OCR sometimes shifts the boundary: "at sth." -> "ats th."
|
|
Try moving 1-2 chars from end of word1 to start of word2 and vice versa.
|
|
Returns (fixed_word1, fixed_word2) or None.
|
|
"""
|
|
# Import known abbreviations for vocabulary context
|
|
try:
|
|
from cv_ocr_engines import _KNOWN_ABBREVIATIONS
|
|
except ImportError:
|
|
_KNOWN_ABBREVIATIONS = set()
|
|
|
|
# Strip trailing punctuation for checking, preserve for result
|
|
w2_stripped = word2.rstrip(".,;:!?")
|
|
w2_punct = word2[len(w2_stripped):]
|
|
|
|
# Try shifting 1-2 chars from word1 -> word2
|
|
for shift in (1, 2):
|
|
if len(word1) <= shift:
|
|
continue
|
|
new_w1 = word1[:-shift]
|
|
new_w2_base = word1[-shift:] + w2_stripped
|
|
|
|
w1_ok = self._known(new_w1) or new_w1.lower() in _KNOWN_ABBREVIATIONS
|
|
w2_ok = self._known(new_w2_base) or new_w2_base.lower() in _KNOWN_ABBREVIATIONS
|
|
|
|
if w1_ok and w2_ok:
|
|
return (new_w1, new_w2_base + w2_punct)
|
|
|
|
# Try shifting 1-2 chars from word2 -> word1
|
|
for shift in (1, 2):
|
|
if len(w2_stripped) <= shift:
|
|
continue
|
|
new_w1 = word1 + w2_stripped[:shift]
|
|
new_w2_base = w2_stripped[shift:]
|
|
|
|
w1_ok = self._known(new_w1) or new_w1.lower() in _KNOWN_ABBREVIATIONS
|
|
w2_ok = self._known(new_w2_base) or new_w2_base.lower() in _KNOWN_ABBREVIATIONS
|
|
|
|
if w1_ok and w2_ok:
|
|
return (new_w1, new_w2_base + w2_punct)
|
|
|
|
return None
|
|
|
|
# --- Context-based word split for ambiguous merges ---
|
|
|
|
# Patterns where a valid word is actually "a" + adjective/noun
|
|
_ARTICLE_SPLIT_CANDIDATES = {
|
|
# word -> (article, remainder) -- only when followed by a compatible word
|
|
"anew": ("a", "new"),
|
|
"areal": ("a", "real"),
|
|
"alive": None, # genuinely one word, never split
|
|
"alone": None,
|
|
"aware": None,
|
|
"alike": None,
|
|
"apart": None,
|
|
"aside": None,
|
|
"above": None,
|
|
"about": None,
|
|
"among": None,
|
|
"along": None,
|
|
}
|
|
|
|
def _try_context_split(self, word: str, next_word: str,
|
|
prev_word: str) -> Optional[str]:
|
|
"""Split words like 'anew' -> 'a new' when context indicates a merge.
|
|
|
|
Only splits when:
|
|
- The word is in the split candidates list
|
|
- The following word makes sense as a noun (for "a + adj + noun" pattern)
|
|
- OR the word is unknown and can be split into article + known word
|
|
"""
|
|
w_lower = word.lower()
|
|
|
|
# Check explicit candidates
|
|
if w_lower in self._ARTICLE_SPLIT_CANDIDATES:
|
|
split = self._ARTICLE_SPLIT_CANDIDATES[w_lower]
|
|
if split is None:
|
|
return None # explicitly marked as "don't split"
|
|
article, remainder = split
|
|
# Only split if followed by a word (noun pattern)
|
|
if next_word and next_word[0].islower():
|
|
return f"{article} {remainder}"
|
|
# Also split if remainder + next_word makes a common phrase
|
|
if next_word and self._known(next_word):
|
|
return f"{article} {remainder}"
|
|
|
|
# Generic: if word starts with 'a' and rest is a known adjective/word
|
|
if (len(word) >= 4 and word[0].lower() == 'a'
|
|
and not self._known(word) # only for UNKNOWN words
|
|
and self._known(word[1:])):
|
|
return f"a {word[1:]}"
|
|
|
|
return None
|
|
|
|
# --- Full text correction ---
|
|
|
|
def correct_text(self, text: str, lang: str = "en") -> CorrectionResult:
|
|
"""Correct a full text string (field value).
|
|
|
|
Three passes:
|
|
1. Boundary repair -- fix shifted word boundaries between adjacent tokens
|
|
2. Context split -- split ambiguous merges (anew -> a new)
|
|
3. Per-word correction -- spell check individual words
|
|
"""
|
|
if not text or not text.strip():
|
|
return CorrectionResult(text, text, "unknown", False)
|
|
|
|
detected = self.detect_text_lang(text) if lang == "auto" else lang
|
|
effective_lang = detected if detected in ("en", "de") else "en"
|
|
|
|
changes: List[str] = []
|
|
tokens = list(_TOKEN_RE.finditer(text))
|
|
|
|
# Extract token list: [(word, separator), ...]
|
|
token_list: List[List[str]] = [] # [[word, sep], ...]
|
|
for m in tokens:
|
|
token_list.append([m.group(1), m.group(2)])
|
|
|
|
# --- Pass 1: Boundary repair between adjacent unknown words ---
|
|
# Import abbreviations for the heuristic below
|
|
try:
|
|
from cv_ocr_engines import _KNOWN_ABBREVIATIONS as _ABBREVS
|
|
except ImportError:
|
|
_ABBREVS = set()
|
|
|
|
for i in range(len(token_list) - 1):
|
|
w1 = token_list[i][0]
|
|
w2_raw = token_list[i + 1][0]
|
|
|
|
# Skip boundary repair for IPA/bracket content
|
|
# Brackets may be in the token OR in the adjacent separators
|
|
sep_before_w1 = token_list[i - 1][1] if i > 0 else ""
|
|
sep_after_w1 = token_list[i][1]
|
|
sep_after_w2 = token_list[i + 1][1]
|
|
has_bracket = (
|
|
'[' in w1 or ']' in w1 or '[' in w2_raw or ']' in w2_raw
|
|
or ']' in sep_after_w1 # w1 text was inside [brackets]
|
|
or '[' in sep_after_w1 # w2 starts a bracket
|
|
or ']' in sep_after_w2 # w2 text was inside [brackets]
|
|
or '[' in sep_before_w1 # w1 starts a bracket
|
|
)
|
|
if has_bracket:
|
|
continue
|
|
|
|
# Include trailing punct from separator in w2 for abbreviation matching
|
|
w2_with_punct = w2_raw + token_list[i + 1][1].rstrip(" ")
|
|
|
|
# Try boundary repair -- always, even if both words are valid.
|
|
# Use word-frequency scoring to decide if repair is better.
|
|
repair = self._try_boundary_repair(w1, w2_with_punct)
|
|
if not repair and w2_with_punct != w2_raw:
|
|
repair = self._try_boundary_repair(w1, w2_raw)
|
|
if repair:
|
|
new_w1, new_w2_full = repair
|
|
new_w2_base = new_w2_full.rstrip(".,;:!?")
|
|
|
|
# Frequency-based scoring: product of word frequencies
|
|
# Higher product = more common word pair = better
|
|
old_freq = self._word_freq(w1) * self._word_freq(w2_raw)
|
|
new_freq = self._word_freq(new_w1) * self._word_freq(new_w2_base)
|
|
|
|
# Abbreviation bonus: if repair produces a known abbreviation
|
|
has_abbrev = new_w1.lower() in _ABBREVS or new_w2_base.lower() in _ABBREVS
|
|
if has_abbrev:
|
|
# Accept abbreviation repair ONLY if at least one of the
|
|
# original words is rare/unknown (prevents "Can I" -> "Ca nI"
|
|
# where both original words are common and correct).
|
|
RARE_THRESHOLD = 1e-6
|
|
orig_both_common = (
|
|
self._word_freq(w1) > RARE_THRESHOLD
|
|
and self._word_freq(w2_raw) > RARE_THRESHOLD
|
|
)
|
|
if not orig_both_common:
|
|
new_freq = max(new_freq, old_freq * 10)
|
|
else:
|
|
has_abbrev = False # both originals common -> don't trust
|
|
|
|
# Accept if repair produces a more frequent word pair
|
|
# (threshold: at least 5x more frequent to avoid false positives)
|
|
if new_freq > old_freq * 5:
|
|
new_w2_punct = new_w2_full[len(new_w2_base):]
|
|
changes.append(f"{w1} {w2_raw}\u2192{new_w1} {new_w2_base}")
|
|
token_list[i][0] = new_w1
|
|
token_list[i + 1][0] = new_w2_base
|
|
if new_w2_punct:
|
|
token_list[i + 1][1] = new_w2_punct + token_list[i + 1][1].lstrip(".,;:!?")
|
|
|
|
# --- Pass 2: Context split (anew -> a new) ---
|
|
expanded: List[List[str]] = []
|
|
for i, (word, sep) in enumerate(token_list):
|
|
next_word = token_list[i + 1][0] if i + 1 < len(token_list) else ""
|
|
prev_word = token_list[i - 1][0] if i > 0 else ""
|
|
split = self._try_context_split(word, next_word, prev_word)
|
|
if split and split != word:
|
|
changes.append(f"{word}\u2192{split}")
|
|
expanded.append([split, sep])
|
|
else:
|
|
expanded.append([word, sep])
|
|
token_list = expanded
|
|
|
|
# --- Pass 3: Per-word correction ---
|
|
parts: List[str] = []
|
|
|
|
# Preserve any leading text before the first token match
|
|
first_start = tokens[0].start() if tokens else 0
|
|
if first_start > 0:
|
|
parts.append(text[:first_start])
|
|
|
|
for i, (word, sep) in enumerate(token_list):
|
|
# Skip words inside IPA brackets (brackets land in separators)
|
|
prev_sep = token_list[i - 1][1] if i > 0 else ""
|
|
if '[' in prev_sep or ']' in sep:
|
|
parts.append(word)
|
|
parts.append(sep)
|
|
continue
|
|
|
|
next_word = token_list[i + 1][0] if i + 1 < len(token_list) else ""
|
|
prev_word = token_list[i - 1][0] if i > 0 else ""
|
|
|
|
correction = self.correct_word(
|
|
word, lang=effective_lang,
|
|
prev_word=prev_word, next_word=next_word,
|
|
)
|
|
if correction and correction != word:
|
|
changes.append(f"{word}\u2192{correction}")
|
|
parts.append(correction)
|
|
else:
|
|
parts.append(word)
|
|
parts.append(sep)
|
|
|
|
# Append any trailing text
|
|
last_end = tokens[-1].end() if tokens else 0
|
|
if last_end < len(text):
|
|
parts.append(text[last_end:])
|
|
|
|
corrected = "".join(parts)
|
|
return CorrectionResult(
|
|
original=text,
|
|
corrected=corrected,
|
|
lang_detected=detected,
|
|
changed=corrected != text,
|
|
changes=changes,
|
|
)
|
|
|
|
# --- Vocabulary entry correction ---
|
|
|
|
def correct_vocab_entry(self, english: str, german: str,
|
|
example: str = "") -> Dict[str, CorrectionResult]:
|
|
"""Correct a full vocabulary entry (EN + DE + example).
|
|
|
|
Uses column position to determine language -- the most reliable signal.
|
|
"""
|
|
results = {}
|
|
results["english"] = self.correct_text(english, lang="en")
|
|
results["german"] = self.correct_text(german, lang="de")
|
|
if example:
|
|
# For examples, auto-detect language
|
|
results["example"] = self.correct_text(example, lang="auto")
|
|
return results
|