""" Gutter Repair — detects and fixes words truncated or blurred at the book gutter. When scanning double-page spreads, the binding area (gutter) causes: 1. Blurry/garbled trailing characters ("stammeli" → "stammeln") 2. Words split across lines with a hyphen lost in the gutter ("ve" + "künden" → "verkünden") This module analyses grid cells, identifies gutter-edge candidates, and proposes corrections using pyspellchecker (DE + EN). Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import itertools import logging import re import time import uuid from dataclasses import dataclass, field, asdict from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Spellchecker setup (lazy, cached) # --------------------------------------------------------------------------- _spell_de = None _spell_en = None _SPELL_AVAILABLE = False def _init_spellcheckers(): """Lazy-load DE + EN spellcheckers (cached across calls).""" global _spell_de, _spell_en, _SPELL_AVAILABLE if _spell_de is not None: return try: from spellchecker import SpellChecker _spell_de = SpellChecker(language='de', distance=1) _spell_en = SpellChecker(language='en', distance=1) _SPELL_AVAILABLE = True logger.info("Gutter repair: spellcheckers loaded (DE + EN)") except ImportError: logger.warning("pyspellchecker not installed — gutter repair unavailable") def _is_known(word: str) -> bool: """Check if a word is known in DE or EN dictionary.""" _init_spellcheckers() if not _SPELL_AVAILABLE: return False w = word.lower() return bool(_spell_de.known([w])) or bool(_spell_en.known([w])) def _spell_candidates(word: str, lang: str = "both") -> List[str]: """Get all plausible spellchecker candidates for a word (deduplicated).""" _init_spellcheckers() if not _SPELL_AVAILABLE: return [] w = word.lower() seen: set = set() results: List[str] = [] for checker in ([_spell_de, _spell_en] if lang == "both" else [_spell_de] if lang == "de" else [_spell_en]): if checker is None: continue cands = checker.candidates(w) if cands: for c in cands: if c and c != w and c not in seen: seen.add(c) results.append(c) return results # --------------------------------------------------------------------------- # Gutter position detection # --------------------------------------------------------------------------- # Minimum word length for spell-fix (very short words are often legitimate) _MIN_WORD_LEN_SPELL = 3 # Minimum word length for hyphen-join candidates (fragments at the gutter # can be as short as 1-2 chars, e.g. "ve" from "ver-künden") _MIN_WORD_LEN_HYPHEN = 2 # How close to the right column edge a word must be to count as "gutter-adjacent". # Expressed as fraction of column width (e.g. 0.75 = rightmost 25%). _GUTTER_EDGE_THRESHOLD = 0.70 # Small common words / abbreviations that should NOT be repaired _STOPWORDS = frozenset([ # German "ab", "an", "am", "da", "er", "es", "im", "in", "ja", "ob", "so", "um", "zu", "wo", "du", "eh", "ei", "je", "na", "nu", "oh", # English "a", "am", "an", "as", "at", "be", "by", "do", "go", "he", "if", "in", "is", "it", "me", "my", "no", "of", "on", "or", "so", "to", "up", "us", "we", ]) # IPA / phonetic patterns — skip these cells _IPA_RE = re.compile(r'[\[\]/ˈˌːʃʒθðŋɑɒæɔəɛɪʊʌ]') def _is_ipa_text(text: str) -> bool: """True if text looks like IPA transcription.""" return bool(_IPA_RE.search(text)) def _word_is_at_gutter_edge(word_bbox: Dict, col_x: float, col_width: float) -> bool: """Check if a word's right edge is near the right boundary of its column.""" if col_width <= 0: return False word_right = word_bbox.get("left", 0) + word_bbox.get("width", 0) col_right = col_x + col_width # Word's right edge within the rightmost portion of the column relative_pos = (word_right - col_x) / col_width return relative_pos >= _GUTTER_EDGE_THRESHOLD # --------------------------------------------------------------------------- # Suggestion types # --------------------------------------------------------------------------- @dataclass class GutterSuggestion: """A single correction suggestion.""" id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) type: str = "" # "hyphen_join" | "spell_fix" zone_index: int = 0 row_index: int = 0 col_index: int = 0 col_type: str = "" cell_id: str = "" original_text: str = "" suggested_text: str = "" # For hyphen_join: next_row_index: int = -1 next_row_cell_id: str = "" next_row_text: str = "" missing_chars: str = "" display_parts: List[str] = field(default_factory=list) # Alternatives (other plausible corrections the user can pick from) alternatives: List[str] = field(default_factory=list) # Meta: confidence: float = 0.0 reason: str = "" # "gutter_truncation" | "gutter_blur" | "hyphen_continuation" def to_dict(self) -> Dict[str, Any]: return asdict(self) # --------------------------------------------------------------------------- # Core repair logic # --------------------------------------------------------------------------- _TRAILING_PUNCT_RE = re.compile(r'[.,;:!?\)\]]+$') def _try_hyphen_join( word_text: str, next_word_text: str, max_missing: int = 3, ) -> Optional[Tuple[str, str, float]]: """Try joining two fragments with 0..max_missing interpolated chars. Strips trailing punctuation from the continuation word before testing (e.g. "künden," → "künden") so dictionary lookup succeeds. Returns (joined_word, missing_chars, confidence) or None. """ base = word_text.rstrip("-").rstrip() # Strip trailing punctuation from continuation (commas, periods, etc.) raw_continuation = next_word_text.lstrip() continuation = _TRAILING_PUNCT_RE.sub('', raw_continuation) if not base or not continuation: return None # 1. Direct join (no missing chars) direct = base + continuation if _is_known(direct): return (direct, "", 0.95) # 2. Try with 1..max_missing missing characters # Use common letters, weighted by frequency in German/English _COMMON_CHARS = "enristaldhgcmobwfkzpvjyxqu" for n_missing in range(1, max_missing + 1): for chars in itertools.product(_COMMON_CHARS[:15], repeat=n_missing): candidate = base + "".join(chars) + continuation if _is_known(candidate): missing = "".join(chars) # Confidence decreases with more missing chars conf = 0.90 - (n_missing - 1) * 0.10 return (candidate, missing, conf) return None def _try_spell_fix( word_text: str, col_type: str = "", ) -> Optional[Tuple[str, float, List[str]]]: """Try to fix a single garbled gutter word via spellchecker. Returns (best_correction, confidence, alternatives_list) or None. The alternatives list contains other plausible corrections the user can choose from (e.g. "stammelt" vs "stammeln"). """ if len(word_text) < _MIN_WORD_LEN_SPELL: return None # Determine language priority from column type if "en" in col_type: lang = "en" elif "de" in col_type: lang = "de" else: lang = "both" candidates = _spell_candidates(word_text, lang=lang) if not candidates and lang != "both": candidates = _spell_candidates(word_text, lang="both") if not candidates: return None # Preserve original casing is_upper = word_text[0].isupper() def _preserve_case(w: str) -> str: if is_upper and w: return w[0].upper() + w[1:] return w # Sort candidates by edit distance (closest first) scored = [] for c in candidates: dist = _edit_distance(word_text.lower(), c.lower()) scored.append((dist, c)) scored.sort(key=lambda x: x[0]) best_dist, best = scored[0] best = _preserve_case(best) conf = max(0.5, 1.0 - best_dist * 0.15) # Build alternatives (all other candidates, also case-preserved) alts = [_preserve_case(c) for _, c in scored[1:] if c.lower() != best.lower()] # Limit to top 5 alternatives alts = alts[:5] return (best, conf, alts) def _edit_distance(a: str, b: str) -> int: """Simple Levenshtein distance.""" if len(a) < len(b): return _edit_distance(b, a) if len(b) == 0: return len(a) prev = list(range(len(b) + 1)) for i, ca in enumerate(a): curr = [i + 1] for j, cb in enumerate(b): cost = 0 if ca == cb else 1 curr.append(min(curr[j] + 1, prev[j + 1] + 1, prev[j] + cost)) prev = curr return prev[len(b)] # --------------------------------------------------------------------------- # Grid analysis # --------------------------------------------------------------------------- def analyse_grid_for_gutter_repair( grid_data: Dict[str, Any], image_width: int = 0, ) -> Dict[str, Any]: """Analyse a structured grid and return gutter repair suggestions. Args: grid_data: The grid_editor_result from the session (zones→cells structure). image_width: Image width in pixels (for determining gutter side). Returns: Dict with "suggestions" list and "stats". """ t0 = time.time() _init_spellcheckers() if not _SPELL_AVAILABLE: return { "suggestions": [], "stats": {"error": "pyspellchecker not installed"}, "duration_seconds": 0, } zones = grid_data.get("zones", []) suggestions: List[GutterSuggestion] = [] words_checked = 0 gutter_candidates = 0 for zi, zone in enumerate(zones): columns = zone.get("columns", []) cells = zone.get("cells", []) if not columns or not cells: continue # Build column lookup: col_index → {x, width, type} col_info: Dict[int, Dict] = {} for col in columns: ci = col.get("index", col.get("col_index", -1)) col_info[ci] = { "x": col.get("x_min_px", col.get("x", 0)), "width": col.get("x_max_px", col.get("width", 0)) - col.get("x_min_px", col.get("x", 0)), "type": col.get("type", col.get("col_type", "")), } # Build row→col→cell lookup cell_map: Dict[Tuple[int, int], Dict] = {} max_row = 0 for cell in cells: ri = cell.get("row_index", 0) ci = cell.get("col_index", 0) cell_map[(ri, ci)] = cell if ri > max_row: max_row = ri # Determine which columns are at the gutter edge. # For a left page: rightmost content columns. # For now, check ALL columns — a word is a candidate if it's at the # right edge of its column AND not a known word. for (ri, ci), cell in cell_map.items(): text = (cell.get("text") or "").strip() if not text: continue if _is_ipa_text(text): continue words_checked += 1 col = col_info.get(ci, {}) col_type = col.get("type", "") # Get word boxes to check position word_boxes = cell.get("word_boxes", []) # Check the LAST word in the cell (rightmost, closest to gutter) cell_words = text.split() if not cell_words: continue last_word = cell_words[-1] # Skip stopwords if last_word.lower().rstrip(".,;:!?-") in _STOPWORDS: continue last_word_clean = last_word.rstrip(".,;:!?") if len(last_word_clean) < _MIN_WORD_LEN_HYPHEN: continue # Check if the last word is at the gutter edge is_at_edge = False if word_boxes: last_wb = word_boxes[-1] is_at_edge = _word_is_at_gutter_edge( last_wb, col.get("x", 0), col.get("width", 1) ) else: # No word boxes — use cell bbox bbox = cell.get("bbox_px", {}) is_at_edge = _word_is_at_gutter_edge( {"left": bbox.get("x", 0), "width": bbox.get("w", 0)}, col.get("x", 0), col.get("width", 1) ) if not is_at_edge: continue # Word is at gutter edge — check if it's a known word if _is_known(last_word_clean): continue # Check if the word ends with "-" (explicit hyphen break) ends_with_hyphen = last_word.endswith("-") # If the word already ends with "-" and the stem (without # the hyphen) is a known word, this is a VALID line-break # hyphenation — not a gutter error. Gutter problems cause # the hyphen to be LOST ("ve" instead of "ver-"), so a # visible hyphen + known stem = intentional word-wrap. # Example: "wunder-" → "wunder" is known → skip. if ends_with_hyphen: stem = last_word_clean.rstrip("-") if stem and _is_known(stem): continue gutter_candidates += 1 # --- Strategy 1: Hyphen join with next row --- next_cell = cell_map.get((ri + 1, ci)) if next_cell: next_text = (next_cell.get("text") or "").strip() next_words = next_text.split() if next_words: first_next = next_words[0] first_next_clean = _TRAILING_PUNCT_RE.sub('', first_next) first_alpha = next((c for c in first_next if c.isalpha()), "") # Also skip if the joined word is known (covers compound # words where the stem alone might not be in the dictionary) if ends_with_hyphen and first_next_clean: direct = last_word_clean.rstrip("-") + first_next_clean if _is_known(direct): continue # Continuation likely if: # - explicit hyphen, OR # - next row starts lowercase (= not a new entry) if ends_with_hyphen or (first_alpha and first_alpha.islower()): result = _try_hyphen_join(last_word_clean, first_next) if result: joined, missing, conf = result # Build display parts: show hyphenation for original layout if ends_with_hyphen: display_p1 = last_word_clean.rstrip("-") if missing: display_p1 += missing display_p1 += "-" else: display_p1 = last_word_clean if missing: display_p1 += missing + "-" else: display_p1 += "-" suggestion = GutterSuggestion( type="hyphen_join", zone_index=zi, row_index=ri, col_index=ci, col_type=col_type, cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"), original_text=last_word, suggested_text=joined, next_row_index=ri + 1, next_row_cell_id=next_cell.get("cell_id", f"R{ri+1:02d}_C{ci}"), next_row_text=next_text, missing_chars=missing, display_parts=[display_p1, first_next], confidence=conf, reason="gutter_truncation" if missing else "hyphen_continuation", ) suggestions.append(suggestion) continue # skip spell_fix if hyphen_join found # --- Strategy 2: Single-word spell fix (only for longer words) --- fix_result = _try_spell_fix(last_word_clean, col_type) if fix_result: corrected, conf, alts = fix_result suggestion = GutterSuggestion( type="spell_fix", zone_index=zi, row_index=ri, col_index=ci, col_type=col_type, cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"), original_text=last_word, suggested_text=corrected, alternatives=alts, confidence=conf, reason="gutter_blur", ) suggestions.append(suggestion) duration = round(time.time() - t0, 3) logger.info( "Gutter repair: checked %d words, %d gutter candidates, %d suggestions (%.2fs)", words_checked, gutter_candidates, len(suggestions), duration, ) return { "suggestions": [s.to_dict() for s in suggestions], "stats": { "words_checked": words_checked, "gutter_candidates": gutter_candidates, "suggestions_found": len(suggestions), }, "duration_seconds": duration, } def apply_gutter_suggestions( grid_data: Dict[str, Any], accepted_ids: List[str], suggestions: List[Dict[str, Any]], ) -> Dict[str, Any]: """Apply accepted gutter repair suggestions to the grid data. Modifies cells in-place and returns summary of changes. Args: grid_data: The grid_editor_result (zones→cells). accepted_ids: List of suggestion IDs the user accepted. suggestions: The full suggestions list (from analyse_grid_for_gutter_repair). Returns: Dict with "applied_count" and "changes" list. """ accepted_set = set(accepted_ids) accepted_suggestions = [s for s in suggestions if s.get("id") in accepted_set] zones = grid_data.get("zones", []) changes: List[Dict[str, Any]] = [] for s in accepted_suggestions: zi = s.get("zone_index", 0) ri = s.get("row_index", 0) ci = s.get("col_index", 0) stype = s.get("type", "") if zi >= len(zones): continue zone_cells = zones[zi].get("cells", []) # Find the target cell target_cell = None for cell in zone_cells: if cell.get("row_index") == ri and cell.get("col_index") == ci: target_cell = cell break if not target_cell: continue old_text = target_cell.get("text", "") if stype == "spell_fix": # Replace the last word in the cell text original_word = s.get("original_text", "") corrected = s.get("suggested_text", "") if original_word and corrected: # Replace from the right (last occurrence) idx = old_text.rfind(original_word) if idx >= 0: new_text = old_text[:idx] + corrected + old_text[idx + len(original_word):] target_cell["text"] = new_text changes.append({ "type": "spell_fix", "zone_index": zi, "row_index": ri, "col_index": ci, "cell_id": target_cell.get("cell_id", ""), "old_text": old_text, "new_text": new_text, }) elif stype == "hyphen_join": # Current cell: replace last word with the hyphenated first part original_word = s.get("original_text", "") joined = s.get("suggested_text", "") display_parts = s.get("display_parts", []) next_ri = s.get("next_row_index", -1) if not original_word or not joined or not display_parts: continue # The first display part is what goes in the current row first_part = display_parts[0] if display_parts else "" # Replace the last word in current cell with the restored form. # The next row is NOT modified — "künden" stays in its row # because the original book layout has it there. We only fix # the truncated word in the current row (e.g. "ve" → "ver-"). idx = old_text.rfind(original_word) if idx >= 0: new_text = old_text[:idx] + first_part + old_text[idx + len(original_word):] target_cell["text"] = new_text changes.append({ "type": "hyphen_join", "zone_index": zi, "row_index": ri, "col_index": ci, "cell_id": target_cell.get("cell_id", ""), "old_text": old_text, "new_text": new_text, "joined_word": joined, }) logger.info("Gutter repair applied: %d/%d suggestions", len(changes), len(accepted_suggestions)) return { "applied_count": len(accepted_suggestions), "changes": changes, }