Files
breakpilot-lehrer/klausur-service/backend/cv_gutter_repair.py
Benjamin Admin 50bfd6e902
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 50s
CI / test-go-edu-search (push) Successful in 50s
CI / test-python-klausur (push) Failing after 2m37s
CI / test-python-agent-core (push) Successful in 40s
CI / test-nodejs-website (push) Successful in 31s
Fix gutter repair: don't suggest corrections for words with parentheses
Words like "probieren)" or "Englisch)" were incorrectly flagged as
gutter OCR errors because the closing parenthesis wasn't stripped
before dictionary lookup. The spellchecker then suggested "probierend"
(replacing ) with d, edit distance 1).

Two fixes:
1. Strip trailing/leading parentheses in _try_spell_fix before checking
   if the bare word is valid — skip correction if it is
2. Add )( to the rstrip characters in the analysis phase so
   "probieren)" becomes "probieren" for the known-word check

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 22:38:22 +02:00

611 lines
22 KiB
Python

"""
Gutter Repair — detects and fixes words truncated or blurred at the book gutter.
When scanning double-page spreads, the binding area (gutter) causes:
1. Blurry/garbled trailing characters ("stammeli""stammeln")
2. Words split across lines with a hyphen lost in the gutter
("ve" + "künden""verkünden")
This module analyses grid cells, identifies gutter-edge candidates, and
proposes corrections using pyspellchecker (DE + EN).
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import itertools
import logging
import re
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Spellchecker setup (lazy, cached)
# ---------------------------------------------------------------------------
_spell_de = None
_spell_en = None
_SPELL_AVAILABLE = False
def _init_spellcheckers():
"""Lazy-load DE + EN spellcheckers (cached across calls)."""
global _spell_de, _spell_en, _SPELL_AVAILABLE
if _spell_de is not None:
return
try:
from spellchecker import SpellChecker
_spell_de = SpellChecker(language='de', distance=1)
_spell_en = SpellChecker(language='en', distance=1)
_SPELL_AVAILABLE = True
logger.info("Gutter repair: spellcheckers loaded (DE + EN)")
except ImportError:
logger.warning("pyspellchecker not installed — gutter repair unavailable")
def _is_known(word: str) -> bool:
"""Check if a word is known in DE or EN dictionary."""
_init_spellcheckers()
if not _SPELL_AVAILABLE:
return False
w = word.lower()
return bool(_spell_de.known([w])) or bool(_spell_en.known([w]))
def _spell_candidates(word: str, lang: str = "both") -> List[str]:
"""Get all plausible spellchecker candidates for a word (deduplicated)."""
_init_spellcheckers()
if not _SPELL_AVAILABLE:
return []
w = word.lower()
seen: set = set()
results: List[str] = []
for checker in ([_spell_de, _spell_en] if lang == "both"
else [_spell_de] if lang == "de"
else [_spell_en]):
if checker is None:
continue
cands = checker.candidates(w)
if cands:
for c in cands:
if c and c != w and c not in seen:
seen.add(c)
results.append(c)
return results
# ---------------------------------------------------------------------------
# Gutter position detection
# ---------------------------------------------------------------------------
# Minimum word length for spell-fix (very short words are often legitimate)
_MIN_WORD_LEN_SPELL = 3
# Minimum word length for hyphen-join candidates (fragments at the gutter
# can be as short as 1-2 chars, e.g. "ve" from "ver-künden")
_MIN_WORD_LEN_HYPHEN = 2
# How close to the right column edge a word must be to count as "gutter-adjacent".
# Expressed as fraction of column width (e.g. 0.75 = rightmost 25%).
_GUTTER_EDGE_THRESHOLD = 0.70
# Small common words / abbreviations that should NOT be repaired
_STOPWORDS = frozenset([
# German
"ab", "an", "am", "da", "er", "es", "im", "in", "ja", "ob", "so", "um",
"zu", "wo", "du", "eh", "ei", "je", "na", "nu", "oh",
# English
"a", "am", "an", "as", "at", "be", "by", "do", "go", "he", "if", "in",
"is", "it", "me", "my", "no", "of", "on", "or", "so", "to", "up", "us",
"we",
])
# IPA / phonetic patterns — skip these cells
_IPA_RE = re.compile(r'[\[\]/ˈˌːʃʒθðŋɑɒæɔəɛɪʊʌ]')
def _is_ipa_text(text: str) -> bool:
"""True if text looks like IPA transcription."""
return bool(_IPA_RE.search(text))
def _word_is_at_gutter_edge(word_bbox: Dict, col_x: float, col_width: float) -> bool:
"""Check if a word's right edge is near the right boundary of its column."""
if col_width <= 0:
return False
word_right = word_bbox.get("left", 0) + word_bbox.get("width", 0)
col_right = col_x + col_width
# Word's right edge within the rightmost portion of the column
relative_pos = (word_right - col_x) / col_width
return relative_pos >= _GUTTER_EDGE_THRESHOLD
# ---------------------------------------------------------------------------
# Suggestion types
# ---------------------------------------------------------------------------
@dataclass
class GutterSuggestion:
"""A single correction suggestion."""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
type: str = "" # "hyphen_join" | "spell_fix"
zone_index: int = 0
row_index: int = 0
col_index: int = 0
col_type: str = ""
cell_id: str = ""
original_text: str = ""
suggested_text: str = ""
# For hyphen_join:
next_row_index: int = -1
next_row_cell_id: str = ""
next_row_text: str = ""
missing_chars: str = ""
display_parts: List[str] = field(default_factory=list)
# Alternatives (other plausible corrections the user can pick from)
alternatives: List[str] = field(default_factory=list)
# Meta:
confidence: float = 0.0
reason: str = "" # "gutter_truncation" | "gutter_blur" | "hyphen_continuation"
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
# ---------------------------------------------------------------------------
# Core repair logic
# ---------------------------------------------------------------------------
_TRAILING_PUNCT_RE = re.compile(r'[.,;:!?\)\]]+$')
def _try_hyphen_join(
word_text: str,
next_word_text: str,
max_missing: int = 3,
) -> Optional[Tuple[str, str, float]]:
"""Try joining two fragments with 0..max_missing interpolated chars.
Strips trailing punctuation from the continuation word before testing
(e.g. "künden,""künden") so dictionary lookup succeeds.
Returns (joined_word, missing_chars, confidence) or None.
"""
base = word_text.rstrip("-").rstrip()
# Strip trailing punctuation from continuation (commas, periods, etc.)
raw_continuation = next_word_text.lstrip()
continuation = _TRAILING_PUNCT_RE.sub('', raw_continuation)
if not base or not continuation:
return None
# 1. Direct join (no missing chars)
direct = base + continuation
if _is_known(direct):
return (direct, "", 0.95)
# 2. Try with 1..max_missing missing characters
# Use common letters, weighted by frequency in German/English
_COMMON_CHARS = "enristaldhgcmobwfkzpvjyxqu"
for n_missing in range(1, max_missing + 1):
for chars in itertools.product(_COMMON_CHARS[:15], repeat=n_missing):
candidate = base + "".join(chars) + continuation
if _is_known(candidate):
missing = "".join(chars)
# Confidence decreases with more missing chars
conf = 0.90 - (n_missing - 1) * 0.10
return (candidate, missing, conf)
return None
def _try_spell_fix(
word_text: str, col_type: str = "",
) -> Optional[Tuple[str, float, List[str]]]:
"""Try to fix a single garbled gutter word via spellchecker.
Returns (best_correction, confidence, alternatives_list) or None.
The alternatives list contains other plausible corrections the user
can choose from (e.g. "stammelt" vs "stammeln").
"""
if len(word_text) < _MIN_WORD_LEN_SPELL:
return None
# Strip trailing/leading parentheses and check if the bare word is valid.
# Words like "probieren)" or "(Englisch" are valid words with punctuation,
# not OCR errors. Don't suggest corrections for them.
stripped = word_text.strip("()")
if stripped and _is_known(stripped):
return None
# Determine language priority from column type
if "en" in col_type:
lang = "en"
elif "de" in col_type:
lang = "de"
else:
lang = "both"
candidates = _spell_candidates(word_text, lang=lang)
if not candidates and lang != "both":
candidates = _spell_candidates(word_text, lang="both")
if not candidates:
return None
# Preserve original casing
is_upper = word_text[0].isupper()
def _preserve_case(w: str) -> str:
if is_upper and w:
return w[0].upper() + w[1:]
return w
# Sort candidates by edit distance (closest first)
scored = []
for c in candidates:
dist = _edit_distance(word_text.lower(), c.lower())
scored.append((dist, c))
scored.sort(key=lambda x: x[0])
best_dist, best = scored[0]
best = _preserve_case(best)
conf = max(0.5, 1.0 - best_dist * 0.15)
# Build alternatives (all other candidates, also case-preserved)
alts = [_preserve_case(c) for _, c in scored[1:] if c.lower() != best.lower()]
# Limit to top 5 alternatives
alts = alts[:5]
return (best, conf, alts)
def _edit_distance(a: str, b: str) -> int:
"""Simple Levenshtein distance."""
if len(a) < len(b):
return _edit_distance(b, a)
if len(b) == 0:
return len(a)
prev = list(range(len(b) + 1))
for i, ca in enumerate(a):
curr = [i + 1]
for j, cb in enumerate(b):
cost = 0 if ca == cb else 1
curr.append(min(curr[j] + 1, prev[j + 1] + 1, prev[j] + cost))
prev = curr
return prev[len(b)]
# ---------------------------------------------------------------------------
# Grid analysis
# ---------------------------------------------------------------------------
def analyse_grid_for_gutter_repair(
grid_data: Dict[str, Any],
image_width: int = 0,
) -> Dict[str, Any]:
"""Analyse a structured grid and return gutter repair suggestions.
Args:
grid_data: The grid_editor_result from the session (zones→cells structure).
image_width: Image width in pixels (for determining gutter side).
Returns:
Dict with "suggestions" list and "stats".
"""
t0 = time.time()
_init_spellcheckers()
if not _SPELL_AVAILABLE:
return {
"suggestions": [],
"stats": {"error": "pyspellchecker not installed"},
"duration_seconds": 0,
}
zones = grid_data.get("zones", [])
suggestions: List[GutterSuggestion] = []
words_checked = 0
gutter_candidates = 0
for zi, zone in enumerate(zones):
columns = zone.get("columns", [])
cells = zone.get("cells", [])
if not columns or not cells:
continue
# Build column lookup: col_index → {x, width, type}
col_info: Dict[int, Dict] = {}
for col in columns:
ci = col.get("index", col.get("col_index", -1))
col_info[ci] = {
"x": col.get("x_min_px", col.get("x", 0)),
"width": col.get("x_max_px", col.get("width", 0)) - col.get("x_min_px", col.get("x", 0)),
"type": col.get("type", col.get("col_type", "")),
}
# Build row→col→cell lookup
cell_map: Dict[Tuple[int, int], Dict] = {}
max_row = 0
for cell in cells:
ri = cell.get("row_index", 0)
ci = cell.get("col_index", 0)
cell_map[(ri, ci)] = cell
if ri > max_row:
max_row = ri
# Determine which columns are at the gutter edge.
# For a left page: rightmost content columns.
# For now, check ALL columns — a word is a candidate if it's at the
# right edge of its column AND not a known word.
for (ri, ci), cell in cell_map.items():
text = (cell.get("text") or "").strip()
if not text:
continue
if _is_ipa_text(text):
continue
words_checked += 1
col = col_info.get(ci, {})
col_type = col.get("type", "")
# Get word boxes to check position
word_boxes = cell.get("word_boxes", [])
# Check the LAST word in the cell (rightmost, closest to gutter)
cell_words = text.split()
if not cell_words:
continue
last_word = cell_words[-1]
# Skip stopwords
if last_word.lower().rstrip(".,;:!?-") in _STOPWORDS:
continue
last_word_clean = last_word.rstrip(".,;:!?)(")
if len(last_word_clean) < _MIN_WORD_LEN_HYPHEN:
continue
# Check if the last word is at the gutter edge
is_at_edge = False
if word_boxes:
last_wb = word_boxes[-1]
is_at_edge = _word_is_at_gutter_edge(
last_wb, col.get("x", 0), col.get("width", 1)
)
else:
# No word boxes — use cell bbox
bbox = cell.get("bbox_px", {})
is_at_edge = _word_is_at_gutter_edge(
{"left": bbox.get("x", 0), "width": bbox.get("w", 0)},
col.get("x", 0), col.get("width", 1)
)
if not is_at_edge:
continue
# Word is at gutter edge — check if it's a known word
if _is_known(last_word_clean):
continue
# Check if the word ends with "-" (explicit hyphen break)
ends_with_hyphen = last_word.endswith("-")
# If the word already ends with "-" and the stem (without
# the hyphen) is a known word, this is a VALID line-break
# hyphenation — not a gutter error. Gutter problems cause
# the hyphen to be LOST ("ve" instead of "ver-"), so a
# visible hyphen + known stem = intentional word-wrap.
# Example: "wunder-" → "wunder" is known → skip.
if ends_with_hyphen:
stem = last_word_clean.rstrip("-")
if stem and _is_known(stem):
continue
gutter_candidates += 1
# --- Strategy 1: Hyphen join with next row ---
next_cell = cell_map.get((ri + 1, ci))
if next_cell:
next_text = (next_cell.get("text") or "").strip()
next_words = next_text.split()
if next_words:
first_next = next_words[0]
first_next_clean = _TRAILING_PUNCT_RE.sub('', first_next)
first_alpha = next((c for c in first_next if c.isalpha()), "")
# Also skip if the joined word is known (covers compound
# words where the stem alone might not be in the dictionary)
if ends_with_hyphen and first_next_clean:
direct = last_word_clean.rstrip("-") + first_next_clean
if _is_known(direct):
continue
# Continuation likely if:
# - explicit hyphen, OR
# - next row starts lowercase (= not a new entry)
if ends_with_hyphen or (first_alpha and first_alpha.islower()):
result = _try_hyphen_join(last_word_clean, first_next)
if result:
joined, missing, conf = result
# Build display parts: show hyphenation for original layout
if ends_with_hyphen:
display_p1 = last_word_clean.rstrip("-")
if missing:
display_p1 += missing
display_p1 += "-"
else:
display_p1 = last_word_clean
if missing:
display_p1 += missing + "-"
else:
display_p1 += "-"
suggestion = GutterSuggestion(
type="hyphen_join",
zone_index=zi,
row_index=ri,
col_index=ci,
col_type=col_type,
cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"),
original_text=last_word,
suggested_text=joined,
next_row_index=ri + 1,
next_row_cell_id=next_cell.get("cell_id", f"R{ri+1:02d}_C{ci}"),
next_row_text=next_text,
missing_chars=missing,
display_parts=[display_p1, first_next],
confidence=conf,
reason="gutter_truncation" if missing else "hyphen_continuation",
)
suggestions.append(suggestion)
continue # skip spell_fix if hyphen_join found
# --- Strategy 2: Single-word spell fix (only for longer words) ---
fix_result = _try_spell_fix(last_word_clean, col_type)
if fix_result:
corrected, conf, alts = fix_result
suggestion = GutterSuggestion(
type="spell_fix",
zone_index=zi,
row_index=ri,
col_index=ci,
col_type=col_type,
cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"),
original_text=last_word,
suggested_text=corrected,
alternatives=alts,
confidence=conf,
reason="gutter_blur",
)
suggestions.append(suggestion)
duration = round(time.time() - t0, 3)
logger.info(
"Gutter repair: checked %d words, %d gutter candidates, %d suggestions (%.2fs)",
words_checked, gutter_candidates, len(suggestions), duration,
)
return {
"suggestions": [s.to_dict() for s in suggestions],
"stats": {
"words_checked": words_checked,
"gutter_candidates": gutter_candidates,
"suggestions_found": len(suggestions),
},
"duration_seconds": duration,
}
def apply_gutter_suggestions(
grid_data: Dict[str, Any],
accepted_ids: List[str],
suggestions: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Apply accepted gutter repair suggestions to the grid data.
Modifies cells in-place and returns summary of changes.
Args:
grid_data: The grid_editor_result (zones→cells).
accepted_ids: List of suggestion IDs the user accepted.
suggestions: The full suggestions list (from analyse_grid_for_gutter_repair).
Returns:
Dict with "applied_count" and "changes" list.
"""
accepted_set = set(accepted_ids)
accepted_suggestions = [s for s in suggestions if s.get("id") in accepted_set]
zones = grid_data.get("zones", [])
changes: List[Dict[str, Any]] = []
for s in accepted_suggestions:
zi = s.get("zone_index", 0)
ri = s.get("row_index", 0)
ci = s.get("col_index", 0)
stype = s.get("type", "")
if zi >= len(zones):
continue
zone_cells = zones[zi].get("cells", [])
# Find the target cell
target_cell = None
for cell in zone_cells:
if cell.get("row_index") == ri and cell.get("col_index") == ci:
target_cell = cell
break
if not target_cell:
continue
old_text = target_cell.get("text", "")
if stype == "spell_fix":
# Replace the last word in the cell text
original_word = s.get("original_text", "")
corrected = s.get("suggested_text", "")
if original_word and corrected:
# Replace from the right (last occurrence)
idx = old_text.rfind(original_word)
if idx >= 0:
new_text = old_text[:idx] + corrected + old_text[idx + len(original_word):]
target_cell["text"] = new_text
changes.append({
"type": "spell_fix",
"zone_index": zi,
"row_index": ri,
"col_index": ci,
"cell_id": target_cell.get("cell_id", ""),
"old_text": old_text,
"new_text": new_text,
})
elif stype == "hyphen_join":
# Current cell: replace last word with the hyphenated first part
original_word = s.get("original_text", "")
joined = s.get("suggested_text", "")
display_parts = s.get("display_parts", [])
next_ri = s.get("next_row_index", -1)
if not original_word or not joined or not display_parts:
continue
# The first display part is what goes in the current row
first_part = display_parts[0] if display_parts else ""
# Replace the last word in current cell with the restored form.
# The next row is NOT modified — "künden" stays in its row
# because the original book layout has it there. We only fix
# the truncated word in the current row (e.g. "ve" → "ver-").
idx = old_text.rfind(original_word)
if idx >= 0:
new_text = old_text[:idx] + first_part + old_text[idx + len(original_word):]
target_cell["text"] = new_text
changes.append({
"type": "hyphen_join",
"zone_index": zi,
"row_index": ri,
"col_index": ci,
"cell_id": target_cell.get("cell_id", ""),
"old_text": old_text,
"new_text": new_text,
"joined_word": joined,
})
logger.info("Gutter repair applied: %d/%d suggestions", len(changes), len(accepted_suggestions))
return {
"applied_count": len(accepted_suggestions),
"changes": changes,
}