Add gutter repair step to OCR Kombi pipeline
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 41s
CI / test-go-edu-search (push) Successful in 36s
CI / test-python-klausur (push) Failing after 2m31s
CI / test-python-agent-core (push) Successful in 28s
CI / test-nodejs-website (push) Successful in 29s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 41s
CI / test-go-edu-search (push) Successful in 36s
CI / test-python-klausur (push) Failing after 2m31s
CI / test-python-agent-core (push) Successful in 28s
CI / test-nodejs-website (push) Successful in 29s
New step "Wortkorrektur" between Grid-Review and Ground Truth that detects and fixes words truncated or blurred at the book gutter (binding area) of double-page scans. Uses pyspellchecker (DE+EN) for validation. Two repair strategies: - hyphen_join: words split across rows with missing chars (ve + künden → verkünden) - spell_fix: garbled trailing chars from gutter blur (stammeli → stammeln) Interactive frontend with per-suggestion accept/reject and batch controls. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
569
klausur-service/backend/cv_gutter_repair.py
Normal file
569
klausur-service/backend/cv_gutter_repair.py
Normal file
@@ -0,0 +1,569 @@
|
||||
"""
|
||||
Gutter Repair — detects and fixes words truncated or blurred at the book gutter.
|
||||
|
||||
When scanning double-page spreads, the binding area (gutter) causes:
|
||||
1. Blurry/garbled trailing characters ("stammeli" → "stammeln")
|
||||
2. Words split across lines with a hyphen lost in the gutter
|
||||
("ve" + "künden" → "verkünden")
|
||||
|
||||
This module analyses grid cells, identifies gutter-edge candidates, and
|
||||
proposes corrections using pyspellchecker (DE + EN).
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Spellchecker setup (lazy, cached)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_spell_de = None
|
||||
_spell_en = None
|
||||
_SPELL_AVAILABLE = False
|
||||
|
||||
def _init_spellcheckers():
|
||||
"""Lazy-load DE + EN spellcheckers (cached across calls)."""
|
||||
global _spell_de, _spell_en, _SPELL_AVAILABLE
|
||||
if _spell_de is not None:
|
||||
return
|
||||
try:
|
||||
from spellchecker import SpellChecker
|
||||
_spell_de = SpellChecker(language='de', distance=1)
|
||||
_spell_en = SpellChecker(language='en', distance=1)
|
||||
_SPELL_AVAILABLE = True
|
||||
logger.info("Gutter repair: spellcheckers loaded (DE + EN)")
|
||||
except ImportError:
|
||||
logger.warning("pyspellchecker not installed — gutter repair unavailable")
|
||||
|
||||
|
||||
def _is_known(word: str) -> bool:
|
||||
"""Check if a word is known in DE or EN dictionary."""
|
||||
_init_spellcheckers()
|
||||
if not _SPELL_AVAILABLE:
|
||||
return False
|
||||
w = word.lower()
|
||||
return bool(_spell_de.known([w])) or bool(_spell_en.known([w]))
|
||||
|
||||
|
||||
def _spell_correction(word: str, lang: str = "both") -> Optional[str]:
|
||||
"""Get best spellchecker correction for a word."""
|
||||
_init_spellcheckers()
|
||||
if not _SPELL_AVAILABLE:
|
||||
return None
|
||||
w = word.lower()
|
||||
result = None
|
||||
if lang in ("de", "both") and _spell_de:
|
||||
result = _spell_de.correction(w)
|
||||
if result and result != w and _spell_de.known([result]):
|
||||
return result
|
||||
if lang in ("en", "both") and _spell_en:
|
||||
result = _spell_en.correction(w)
|
||||
if result and result != w and _spell_en.known([result]):
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gutter position detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Minimum word length to consider for repair (very short words are often
|
||||
# legitimate: "a", "to", "in", etc.)
|
||||
_MIN_WORD_LEN = 3
|
||||
|
||||
# How close to the right column edge a word must be to count as "gutter-adjacent".
|
||||
# Expressed as fraction of column width (e.g. 0.75 = rightmost 25%).
|
||||
_GUTTER_EDGE_THRESHOLD = 0.70
|
||||
|
||||
# Small common words / abbreviations that should NOT be repaired
|
||||
_STOPWORDS = frozenset([
|
||||
# German
|
||||
"ab", "an", "am", "da", "er", "es", "im", "in", "ja", "ob", "so", "um",
|
||||
"zu", "wo", "du", "eh", "ei", "je", "na", "nu", "oh",
|
||||
# English
|
||||
"a", "am", "an", "as", "at", "be", "by", "do", "go", "he", "if", "in",
|
||||
"is", "it", "me", "my", "no", "of", "on", "or", "so", "to", "up", "us",
|
||||
"we",
|
||||
])
|
||||
|
||||
# IPA / phonetic patterns — skip these cells
|
||||
_IPA_RE = re.compile(r'[\[\]/ˈˌːʃʒθðŋɑɒæɔəɛɪʊʌ]')
|
||||
|
||||
|
||||
def _is_ipa_text(text: str) -> bool:
|
||||
"""True if text looks like IPA transcription."""
|
||||
return bool(_IPA_RE.search(text))
|
||||
|
||||
|
||||
def _word_is_at_gutter_edge(word_bbox: Dict, col_x: float, col_width: float) -> bool:
|
||||
"""Check if a word's right edge is near the right boundary of its column."""
|
||||
if col_width <= 0:
|
||||
return False
|
||||
word_right = word_bbox.get("left", 0) + word_bbox.get("width", 0)
|
||||
col_right = col_x + col_width
|
||||
# Word's right edge within the rightmost portion of the column
|
||||
relative_pos = (word_right - col_x) / col_width
|
||||
return relative_pos >= _GUTTER_EDGE_THRESHOLD
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Suggestion types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class GutterSuggestion:
|
||||
"""A single correction suggestion."""
|
||||
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
|
||||
type: str = "" # "hyphen_join" | "spell_fix"
|
||||
zone_index: int = 0
|
||||
row_index: int = 0
|
||||
col_index: int = 0
|
||||
col_type: str = ""
|
||||
cell_id: str = ""
|
||||
original_text: str = ""
|
||||
suggested_text: str = ""
|
||||
# For hyphen_join:
|
||||
next_row_index: int = -1
|
||||
next_row_cell_id: str = ""
|
||||
next_row_text: str = ""
|
||||
missing_chars: str = ""
|
||||
display_parts: List[str] = field(default_factory=list)
|
||||
# Meta:
|
||||
confidence: float = 0.0
|
||||
reason: str = "" # "gutter_truncation" | "gutter_blur" | "hyphen_continuation"
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core repair logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _try_hyphen_join(
|
||||
word_text: str,
|
||||
next_word_text: str,
|
||||
max_missing: int = 3,
|
||||
) -> Optional[Tuple[str, str, float]]:
|
||||
"""Try joining two fragments with 0..max_missing interpolated chars.
|
||||
|
||||
Returns (joined_word, missing_chars, confidence) or None.
|
||||
"""
|
||||
base = word_text.rstrip("-").rstrip()
|
||||
continuation = next_word_text.lstrip()
|
||||
|
||||
if not base or not continuation:
|
||||
return None
|
||||
|
||||
# 1. Direct join (no missing chars)
|
||||
direct = base + continuation
|
||||
if _is_known(direct):
|
||||
return (direct, "", 0.95)
|
||||
|
||||
# 2. Try with 1..max_missing missing characters
|
||||
# Use common letters, weighted by frequency in German/English
|
||||
_COMMON_CHARS = "enristaldhgcmobwfkzpvjyxqu"
|
||||
|
||||
for n_missing in range(1, max_missing + 1):
|
||||
for chars in itertools.product(_COMMON_CHARS[:15], repeat=n_missing):
|
||||
candidate = base + "".join(chars) + continuation
|
||||
if _is_known(candidate):
|
||||
missing = "".join(chars)
|
||||
# Confidence decreases with more missing chars
|
||||
conf = 0.90 - (n_missing - 1) * 0.10
|
||||
return (candidate, missing, conf)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _try_spell_fix(word_text: str, col_type: str = "") -> Optional[Tuple[str, float]]:
|
||||
"""Try to fix a single garbled gutter word via spellchecker.
|
||||
|
||||
Returns (corrected_word, confidence) or None.
|
||||
"""
|
||||
if len(word_text) < _MIN_WORD_LEN:
|
||||
return None
|
||||
|
||||
# Determine language priority from column type
|
||||
if "en" in col_type:
|
||||
lang = "en"
|
||||
elif "de" in col_type:
|
||||
lang = "de"
|
||||
else:
|
||||
lang = "both"
|
||||
|
||||
correction = _spell_correction(word_text, lang=lang)
|
||||
if not correction:
|
||||
# Try the other language too
|
||||
correction = _spell_correction(word_text, lang="both")
|
||||
|
||||
if correction and correction.lower() != word_text.lower():
|
||||
# Preserve original casing of first letter
|
||||
if word_text[0].isupper():
|
||||
correction = correction[0].upper() + correction[1:]
|
||||
# Confidence based on edit distance
|
||||
dist = _edit_distance(word_text.lower(), correction.lower())
|
||||
conf = max(0.5, 1.0 - dist * 0.15)
|
||||
return (correction, conf)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _edit_distance(a: str, b: str) -> int:
|
||||
"""Simple Levenshtein distance."""
|
||||
if len(a) < len(b):
|
||||
return _edit_distance(b, a)
|
||||
if len(b) == 0:
|
||||
return len(a)
|
||||
prev = list(range(len(b) + 1))
|
||||
for i, ca in enumerate(a):
|
||||
curr = [i + 1]
|
||||
for j, cb in enumerate(b):
|
||||
cost = 0 if ca == cb else 1
|
||||
curr.append(min(curr[j] + 1, prev[j + 1] + 1, prev[j] + cost))
|
||||
prev = curr
|
||||
return prev[len(b)]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Grid analysis
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def analyse_grid_for_gutter_repair(
|
||||
grid_data: Dict[str, Any],
|
||||
image_width: int = 0,
|
||||
) -> Dict[str, Any]:
|
||||
"""Analyse a structured grid and return gutter repair suggestions.
|
||||
|
||||
Args:
|
||||
grid_data: The grid_editor_result from the session (zones→cells structure).
|
||||
image_width: Image width in pixels (for determining gutter side).
|
||||
|
||||
Returns:
|
||||
Dict with "suggestions" list and "stats".
|
||||
"""
|
||||
t0 = time.time()
|
||||
_init_spellcheckers()
|
||||
|
||||
if not _SPELL_AVAILABLE:
|
||||
return {
|
||||
"suggestions": [],
|
||||
"stats": {"error": "pyspellchecker not installed"},
|
||||
"duration_seconds": 0,
|
||||
}
|
||||
|
||||
zones = grid_data.get("zones", [])
|
||||
suggestions: List[GutterSuggestion] = []
|
||||
words_checked = 0
|
||||
gutter_candidates = 0
|
||||
|
||||
for zi, zone in enumerate(zones):
|
||||
columns = zone.get("columns", [])
|
||||
cells = zone.get("cells", [])
|
||||
if not columns or not cells:
|
||||
continue
|
||||
|
||||
# Build column lookup: col_index → {x, width, type}
|
||||
col_info: Dict[int, Dict] = {}
|
||||
for col in columns:
|
||||
ci = col.get("index", col.get("col_index", -1))
|
||||
col_info[ci] = {
|
||||
"x": col.get("x_min_px", col.get("x", 0)),
|
||||
"width": col.get("x_max_px", col.get("width", 0)) - col.get("x_min_px", col.get("x", 0)),
|
||||
"type": col.get("type", col.get("col_type", "")),
|
||||
}
|
||||
|
||||
# Build row→col→cell lookup
|
||||
cell_map: Dict[Tuple[int, int], Dict] = {}
|
||||
max_row = 0
|
||||
for cell in cells:
|
||||
ri = cell.get("row_index", 0)
|
||||
ci = cell.get("col_index", 0)
|
||||
cell_map[(ri, ci)] = cell
|
||||
if ri > max_row:
|
||||
max_row = ri
|
||||
|
||||
# Determine which columns are at the gutter edge.
|
||||
# For a left page: rightmost content columns.
|
||||
# For now, check ALL columns — a word is a candidate if it's at the
|
||||
# right edge of its column AND not a known word.
|
||||
for (ri, ci), cell in cell_map.items():
|
||||
text = (cell.get("text") or "").strip()
|
||||
if not text or len(text) < _MIN_WORD_LEN:
|
||||
continue
|
||||
if _is_ipa_text(text):
|
||||
continue
|
||||
|
||||
words_checked += 1
|
||||
col = col_info.get(ci, {})
|
||||
col_type = col.get("type", "")
|
||||
|
||||
# Get word boxes to check position
|
||||
word_boxes = cell.get("word_boxes", [])
|
||||
|
||||
# Check the LAST word in the cell (rightmost, closest to gutter)
|
||||
cell_words = text.split()
|
||||
if not cell_words:
|
||||
continue
|
||||
|
||||
last_word = cell_words[-1]
|
||||
|
||||
# Skip stopwords and very short words
|
||||
if last_word.lower().rstrip(".,;:!?-") in _STOPWORDS:
|
||||
continue
|
||||
|
||||
last_word_clean = last_word.rstrip(".,;:!?")
|
||||
if len(last_word_clean) < _MIN_WORD_LEN:
|
||||
continue
|
||||
|
||||
# Check if the last word is at the gutter edge
|
||||
is_at_edge = False
|
||||
if word_boxes:
|
||||
last_wb = word_boxes[-1]
|
||||
is_at_edge = _word_is_at_gutter_edge(
|
||||
last_wb, col.get("x", 0), col.get("width", 1)
|
||||
)
|
||||
else:
|
||||
# No word boxes — use cell bbox
|
||||
bbox = cell.get("bbox_px", {})
|
||||
is_at_edge = _word_is_at_gutter_edge(
|
||||
{"left": bbox.get("x", 0), "width": bbox.get("w", 0)},
|
||||
col.get("x", 0), col.get("width", 1)
|
||||
)
|
||||
|
||||
if not is_at_edge:
|
||||
continue
|
||||
|
||||
# Word is at gutter edge — check if it's a known word
|
||||
if _is_known(last_word_clean):
|
||||
continue
|
||||
|
||||
gutter_candidates += 1
|
||||
|
||||
# Check if the word ends with "-" (explicit hyphen break)
|
||||
ends_with_hyphen = last_word.endswith("-")
|
||||
|
||||
# --- Strategy 1: Hyphen join with next row ---
|
||||
next_cell = cell_map.get((ri + 1, ci))
|
||||
if next_cell:
|
||||
next_text = (next_cell.get("text") or "").strip()
|
||||
next_words = next_text.split()
|
||||
if next_words:
|
||||
first_next = next_words[0]
|
||||
first_alpha = next((c for c in first_next if c.isalpha()), "")
|
||||
|
||||
# Continuation likely if:
|
||||
# - explicit hyphen, OR
|
||||
# - next row starts lowercase (= not a new entry)
|
||||
if ends_with_hyphen or (first_alpha and first_alpha.islower()):
|
||||
result = _try_hyphen_join(last_word_clean, first_next)
|
||||
if result:
|
||||
joined, missing, conf = result
|
||||
# Build display parts: show hyphenation for original layout
|
||||
if ends_with_hyphen:
|
||||
display_p1 = last_word_clean.rstrip("-")
|
||||
if missing:
|
||||
display_p1 += missing
|
||||
display_p1 += "-"
|
||||
else:
|
||||
display_p1 = last_word_clean
|
||||
if missing:
|
||||
display_p1 += missing + "-"
|
||||
else:
|
||||
display_p1 += "-"
|
||||
|
||||
# Reconstruct cell texts after join
|
||||
# Current cell: replace last word with first part (hyphenated)
|
||||
# Next cell: remove first word
|
||||
remaining_next = " ".join(next_words[1:])
|
||||
|
||||
suggestion = GutterSuggestion(
|
||||
type="hyphen_join",
|
||||
zone_index=zi,
|
||||
row_index=ri,
|
||||
col_index=ci,
|
||||
col_type=col_type,
|
||||
cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"),
|
||||
original_text=last_word,
|
||||
suggested_text=joined,
|
||||
next_row_index=ri + 1,
|
||||
next_row_cell_id=next_cell.get("cell_id", f"R{ri+1:02d}_C{ci}"),
|
||||
next_row_text=next_text,
|
||||
missing_chars=missing,
|
||||
display_parts=[display_p1, first_next],
|
||||
confidence=conf,
|
||||
reason="gutter_truncation" if missing else "hyphen_continuation",
|
||||
)
|
||||
suggestions.append(suggestion)
|
||||
continue # skip spell_fix if hyphen_join found
|
||||
|
||||
# --- Strategy 2: Single-word spell fix ---
|
||||
fix_result = _try_spell_fix(last_word_clean, col_type)
|
||||
if fix_result:
|
||||
corrected, conf = fix_result
|
||||
suggestion = GutterSuggestion(
|
||||
type="spell_fix",
|
||||
zone_index=zi,
|
||||
row_index=ri,
|
||||
col_index=ci,
|
||||
col_type=col_type,
|
||||
cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"),
|
||||
original_text=last_word,
|
||||
suggested_text=corrected,
|
||||
confidence=conf,
|
||||
reason="gutter_blur",
|
||||
)
|
||||
suggestions.append(suggestion)
|
||||
|
||||
duration = round(time.time() - t0, 3)
|
||||
|
||||
logger.info(
|
||||
"Gutter repair: checked %d words, %d gutter candidates, %d suggestions (%.2fs)",
|
||||
words_checked, gutter_candidates, len(suggestions), duration,
|
||||
)
|
||||
|
||||
return {
|
||||
"suggestions": [s.to_dict() for s in suggestions],
|
||||
"stats": {
|
||||
"words_checked": words_checked,
|
||||
"gutter_candidates": gutter_candidates,
|
||||
"suggestions_found": len(suggestions),
|
||||
},
|
||||
"duration_seconds": duration,
|
||||
}
|
||||
|
||||
|
||||
def apply_gutter_suggestions(
|
||||
grid_data: Dict[str, Any],
|
||||
accepted_ids: List[str],
|
||||
suggestions: List[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
"""Apply accepted gutter repair suggestions to the grid data.
|
||||
|
||||
Modifies cells in-place and returns summary of changes.
|
||||
|
||||
Args:
|
||||
grid_data: The grid_editor_result (zones→cells).
|
||||
accepted_ids: List of suggestion IDs the user accepted.
|
||||
suggestions: The full suggestions list (from analyse_grid_for_gutter_repair).
|
||||
|
||||
Returns:
|
||||
Dict with "applied_count" and "changes" list.
|
||||
"""
|
||||
accepted_set = set(accepted_ids)
|
||||
accepted_suggestions = [s for s in suggestions if s.get("id") in accepted_set]
|
||||
|
||||
zones = grid_data.get("zones", [])
|
||||
changes: List[Dict[str, Any]] = []
|
||||
|
||||
for s in accepted_suggestions:
|
||||
zi = s.get("zone_index", 0)
|
||||
ri = s.get("row_index", 0)
|
||||
ci = s.get("col_index", 0)
|
||||
stype = s.get("type", "")
|
||||
|
||||
if zi >= len(zones):
|
||||
continue
|
||||
zone_cells = zones[zi].get("cells", [])
|
||||
|
||||
# Find the target cell
|
||||
target_cell = None
|
||||
for cell in zone_cells:
|
||||
if cell.get("row_index") == ri and cell.get("col_index") == ci:
|
||||
target_cell = cell
|
||||
break
|
||||
|
||||
if not target_cell:
|
||||
continue
|
||||
|
||||
old_text = target_cell.get("text", "")
|
||||
|
||||
if stype == "spell_fix":
|
||||
# Replace the last word in the cell text
|
||||
original_word = s.get("original_text", "")
|
||||
corrected = s.get("suggested_text", "")
|
||||
if original_word and corrected:
|
||||
# Replace from the right (last occurrence)
|
||||
idx = old_text.rfind(original_word)
|
||||
if idx >= 0:
|
||||
new_text = old_text[:idx] + corrected + old_text[idx + len(original_word):]
|
||||
target_cell["text"] = new_text
|
||||
changes.append({
|
||||
"type": "spell_fix",
|
||||
"zone_index": zi,
|
||||
"row_index": ri,
|
||||
"col_index": ci,
|
||||
"cell_id": target_cell.get("cell_id", ""),
|
||||
"old_text": old_text,
|
||||
"new_text": new_text,
|
||||
})
|
||||
|
||||
elif stype == "hyphen_join":
|
||||
# Current cell: replace last word with the hyphenated first part
|
||||
original_word = s.get("original_text", "")
|
||||
joined = s.get("suggested_text", "")
|
||||
display_parts = s.get("display_parts", [])
|
||||
next_ri = s.get("next_row_index", -1)
|
||||
|
||||
if not original_word or not joined or not display_parts:
|
||||
continue
|
||||
|
||||
# The first display part is what goes in the current row
|
||||
first_part = display_parts[0] if display_parts else ""
|
||||
|
||||
# Replace the last word in current cell
|
||||
idx = old_text.rfind(original_word)
|
||||
if idx >= 0:
|
||||
new_text = old_text[:idx] + first_part + old_text[idx + len(original_word):]
|
||||
target_cell["text"] = new_text
|
||||
changes.append({
|
||||
"type": "hyphen_join_current",
|
||||
"zone_index": zi,
|
||||
"row_index": ri,
|
||||
"col_index": ci,
|
||||
"cell_id": target_cell.get("cell_id", ""),
|
||||
"old_text": old_text,
|
||||
"new_text": new_text,
|
||||
"joined_word": joined,
|
||||
})
|
||||
|
||||
# Next row: remove the first word (it's now joined into current row)
|
||||
if next_ri >= 0:
|
||||
next_cell = None
|
||||
for cell in zone_cells:
|
||||
if cell.get("row_index") == next_ri and cell.get("col_index") == ci:
|
||||
next_cell = cell
|
||||
break
|
||||
|
||||
if next_cell:
|
||||
next_old = next_cell.get("text", "")
|
||||
next_words = next_old.split()
|
||||
if next_words:
|
||||
next_new = " ".join(next_words[1:])
|
||||
next_cell["text"] = next_new
|
||||
changes.append({
|
||||
"type": "hyphen_join_next",
|
||||
"zone_index": zi,
|
||||
"row_index": next_ri,
|
||||
"col_index": ci,
|
||||
"cell_id": next_cell.get("cell_id", ""),
|
||||
"old_text": next_old,
|
||||
"new_text": next_new,
|
||||
})
|
||||
|
||||
logger.info("Gutter repair applied: %d/%d suggestions", len(changes), len(accepted_suggestions))
|
||||
|
||||
return {
|
||||
"applied_count": len(accepted_suggestions),
|
||||
"changes": changes,
|
||||
}
|
||||
@@ -1851,3 +1851,90 @@ async def get_grid(session_id: str):
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gutter Repair endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@router.post("/sessions/{session_id}/gutter-repair")
|
||||
async def gutter_repair(session_id: str):
|
||||
"""Analyse grid for gutter-edge OCR errors and return repair suggestions.
|
||||
|
||||
Detects:
|
||||
- Words truncated/blurred at the book binding (spell_fix)
|
||||
- Words split across rows with missing hyphen chars (hyphen_join)
|
||||
"""
|
||||
session = await get_session_db(session_id)
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
||||
|
||||
grid_data = session.get("grid_editor_result")
|
||||
if not grid_data:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="No grid data. Run build-grid first.",
|
||||
)
|
||||
|
||||
from cv_gutter_repair import analyse_grid_for_gutter_repair
|
||||
|
||||
image_width = grid_data.get("image_width", 0)
|
||||
result = analyse_grid_for_gutter_repair(grid_data, image_width=image_width)
|
||||
|
||||
# Persist suggestions in ground_truth.gutter_repair (avoids DB migration)
|
||||
gt = session.get("ground_truth") or {}
|
||||
gt["gutter_repair"] = result
|
||||
await update_session_db(session_id, ground_truth=gt)
|
||||
|
||||
logger.info(
|
||||
"gutter-repair session %s: %d suggestions in %.2fs",
|
||||
session_id,
|
||||
result.get("stats", {}).get("suggestions_found", 0),
|
||||
result.get("duration_seconds", 0),
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/sessions/{session_id}/gutter-repair/apply")
|
||||
async def gutter_repair_apply(session_id: str, request: Request):
|
||||
"""Apply accepted gutter repair suggestions to the grid.
|
||||
|
||||
Body: { "accepted": ["suggestion_id_1", "suggestion_id_2", ...] }
|
||||
"""
|
||||
session = await get_session_db(session_id)
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
||||
|
||||
grid_data = session.get("grid_editor_result")
|
||||
if not grid_data:
|
||||
raise HTTPException(status_code=400, detail="No grid data.")
|
||||
|
||||
gt = session.get("ground_truth") or {}
|
||||
gutter_result = gt.get("gutter_repair")
|
||||
if not gutter_result:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="No gutter repair data. Run gutter-repair first.",
|
||||
)
|
||||
|
||||
body = await request.json()
|
||||
accepted_ids = body.get("accepted", [])
|
||||
if not accepted_ids:
|
||||
return {"applied_count": 0, "changes": []}
|
||||
|
||||
from cv_gutter_repair import apply_gutter_suggestions
|
||||
|
||||
suggestions = gutter_result.get("suggestions", [])
|
||||
result = apply_gutter_suggestions(grid_data, accepted_ids, suggestions)
|
||||
|
||||
# Save updated grid back to session
|
||||
await update_session_db(session_id, grid_editor_result=grid_data)
|
||||
|
||||
logger.info(
|
||||
"gutter-repair/apply session %s: %d changes applied",
|
||||
session_id,
|
||||
result.get("applied_count", 0),
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
316
klausur-service/backend/tests/test_gutter_repair.py
Normal file
316
klausur-service/backend/tests/test_gutter_repair.py
Normal file
@@ -0,0 +1,316 @@
|
||||
"""Tests for cv_gutter_repair: gutter-edge word detection and repair."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add parent directory to path so we can import the module
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from cv_gutter_repair import (
|
||||
_is_known,
|
||||
_try_hyphen_join,
|
||||
_try_spell_fix,
|
||||
_edit_distance,
|
||||
_word_is_at_gutter_edge,
|
||||
analyse_grid_for_gutter_repair,
|
||||
apply_gutter_suggestions,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper function tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEditDistance:
|
||||
def test_identical(self):
|
||||
assert _edit_distance("hello", "hello") == 0
|
||||
|
||||
def test_one_substitution(self):
|
||||
assert _edit_distance("stammeli", "stammeln") == 1
|
||||
|
||||
def test_one_deletion(self):
|
||||
assert _edit_distance("cat", "ca") == 1
|
||||
|
||||
def test_one_insertion(self):
|
||||
assert _edit_distance("ca", "cat") == 1
|
||||
|
||||
def test_empty(self):
|
||||
assert _edit_distance("", "abc") == 3
|
||||
assert _edit_distance("abc", "") == 3
|
||||
|
||||
def test_both_empty(self):
|
||||
assert _edit_distance("", "") == 0
|
||||
|
||||
|
||||
class TestWordIsAtGutterEdge:
|
||||
def test_word_at_right_edge(self):
|
||||
# Word right edge at 90% of column = within gutter zone
|
||||
word_bbox = {"left": 80, "width": 15} # right edge = 95
|
||||
assert _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=100)
|
||||
|
||||
def test_word_in_middle(self):
|
||||
# Word right edge at 50% of column = NOT at gutter
|
||||
word_bbox = {"left": 30, "width": 20} # right edge = 50
|
||||
assert not _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=100)
|
||||
|
||||
def test_word_at_left(self):
|
||||
word_bbox = {"left": 5, "width": 20} # right edge = 25
|
||||
assert not _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=100)
|
||||
|
||||
def test_zero_width_column(self):
|
||||
word_bbox = {"left": 0, "width": 10}
|
||||
assert not _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Spellchecker-dependent tests (skip if not installed)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
try:
|
||||
from spellchecker import SpellChecker
|
||||
_HAS_SPELLCHECKER = True
|
||||
except ImportError:
|
||||
_HAS_SPELLCHECKER = False
|
||||
|
||||
needs_spellchecker = pytest.mark.skipif(
|
||||
not _HAS_SPELLCHECKER, reason="pyspellchecker not installed"
|
||||
)
|
||||
|
||||
|
||||
@needs_spellchecker
|
||||
class TestIsKnown:
|
||||
def test_known_english(self):
|
||||
assert _is_known("hello") is True
|
||||
assert _is_known("world") is True
|
||||
|
||||
def test_known_german(self):
|
||||
assert _is_known("verkünden") is True
|
||||
assert _is_known("stammeln") is True
|
||||
|
||||
def test_unknown_garbled(self):
|
||||
assert _is_known("stammeli") is False
|
||||
assert _is_known("xyzqwp") is False
|
||||
|
||||
def test_short_word(self):
|
||||
# Words < 3 chars are not checked
|
||||
assert _is_known("a") is False
|
||||
|
||||
|
||||
@needs_spellchecker
|
||||
class TestTryHyphenJoin:
|
||||
def test_direct_join(self):
|
||||
# "ver" + "künden" = "verkünden"
|
||||
result = _try_hyphen_join("ver-", "künden")
|
||||
assert result is not None
|
||||
joined, missing, conf = result
|
||||
assert joined == "verkünden"
|
||||
assert missing == ""
|
||||
assert conf >= 0.9
|
||||
|
||||
def test_join_with_missing_chars(self):
|
||||
# "ve" + "künden" → needs "r" in between → "verkünden"
|
||||
result = _try_hyphen_join("ve", "künden", max_missing=2)
|
||||
assert result is not None
|
||||
joined, missing, conf = result
|
||||
assert joined == "verkünden"
|
||||
assert "r" in missing
|
||||
|
||||
def test_no_valid_join(self):
|
||||
result = _try_hyphen_join("xyz", "qwpgh")
|
||||
assert result is None
|
||||
|
||||
def test_empty_inputs(self):
|
||||
assert _try_hyphen_join("", "word") is None
|
||||
assert _try_hyphen_join("word", "") is None
|
||||
|
||||
|
||||
@needs_spellchecker
|
||||
class TestTrySpellFix:
|
||||
def test_fix_garbled_ending(self):
|
||||
# "stammeli" should suggest "stammeln"
|
||||
result = _try_spell_fix("stammeli", col_type="column_de")
|
||||
assert result is not None
|
||||
corrected, conf = result
|
||||
assert corrected == "stammeln"
|
||||
|
||||
def test_known_word_not_fixed(self):
|
||||
# "Haus" is correct — no fix needed
|
||||
result = _try_spell_fix("Haus", col_type="column_de")
|
||||
# Should be None since the word is correct
|
||||
# (unless spellchecker suggests something else)
|
||||
# Either None or same word is acceptable
|
||||
if result is not None:
|
||||
corrected, _ = result
|
||||
assert corrected.lower() == "haus"
|
||||
|
||||
def test_short_word_skipped(self):
|
||||
result = _try_spell_fix("ab")
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Grid analysis tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_grid(cells, columns=None):
|
||||
"""Helper to create a minimal grid_data structure."""
|
||||
if columns is None:
|
||||
columns = [
|
||||
{"index": 0, "type": "column_en", "x_min_px": 0, "x_max_px": 200},
|
||||
{"index": 1, "type": "column_de", "x_min_px": 200, "x_max_px": 400},
|
||||
{"index": 2, "type": "column_text", "x_min_px": 400, "x_max_px": 600},
|
||||
]
|
||||
return {
|
||||
"image_width": 600,
|
||||
"image_height": 800,
|
||||
"zones": [{
|
||||
"columns": columns,
|
||||
"cells": cells,
|
||||
}],
|
||||
}
|
||||
|
||||
|
||||
def _make_cell(row, col, text, left=0, width=50, col_width=200, col_x=0):
|
||||
"""Helper to create a cell dict with word_boxes at a specific position."""
|
||||
return {
|
||||
"cell_id": f"R{row:02d}_C{col}",
|
||||
"row_index": row,
|
||||
"col_index": col,
|
||||
"col_type": "column_text",
|
||||
"text": text,
|
||||
"confidence": 90.0,
|
||||
"bbox_px": {"x": left, "y": row * 25, "w": width, "h": 20},
|
||||
"word_boxes": [
|
||||
{"text": text, "left": left, "top": row * 25, "width": width, "height": 20, "conf": 90},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@needs_spellchecker
|
||||
class TestAnalyseGrid:
|
||||
def test_empty_grid(self):
|
||||
result = analyse_grid_for_gutter_repair({"zones": []})
|
||||
assert result["suggestions"] == []
|
||||
assert result["stats"]["words_checked"] == 0
|
||||
|
||||
def test_detects_spell_fix_at_edge(self):
|
||||
# "stammeli" at position 160 in a column 0-200 wide = 80% = at gutter
|
||||
cells = [
|
||||
_make_cell(29, 2, "stammeli", left=540, width=55, col_width=200, col_x=400),
|
||||
]
|
||||
grid = _make_grid(cells)
|
||||
result = analyse_grid_for_gutter_repair(grid)
|
||||
suggestions = result["suggestions"]
|
||||
assert len(suggestions) >= 1
|
||||
assert suggestions[0]["type"] == "spell_fix"
|
||||
assert suggestions[0]["suggested_text"] == "stammeln"
|
||||
|
||||
def test_detects_hyphen_join(self):
|
||||
# Row 30: "ve" at gutter edge, Row 31: "künden"
|
||||
cells = [
|
||||
_make_cell(30, 2, "ve", left=570, width=25, col_width=200, col_x=400),
|
||||
_make_cell(31, 2, "künden", left=410, width=80, col_width=200, col_x=400),
|
||||
]
|
||||
grid = _make_grid(cells)
|
||||
result = analyse_grid_for_gutter_repair(grid)
|
||||
suggestions = result["suggestions"]
|
||||
# Should find hyphen_join or spell_fix
|
||||
assert len(suggestions) >= 1
|
||||
|
||||
def test_ignores_known_words(self):
|
||||
# "hello" is a known word — should not be suggested
|
||||
cells = [
|
||||
_make_cell(0, 0, "hello", left=160, width=35),
|
||||
]
|
||||
grid = _make_grid(cells)
|
||||
result = analyse_grid_for_gutter_repair(grid)
|
||||
# Should not suggest anything for known words
|
||||
spell_fixes = [s for s in result["suggestions"] if s["original_text"] == "hello"]
|
||||
assert len(spell_fixes) == 0
|
||||
|
||||
def test_ignores_words_not_at_edge(self):
|
||||
# "stammeli" at position 10 = NOT at gutter edge
|
||||
cells = [
|
||||
_make_cell(0, 0, "stammeli", left=10, width=50),
|
||||
]
|
||||
grid = _make_grid(cells)
|
||||
result = analyse_grid_for_gutter_repair(grid)
|
||||
assert len(result["suggestions"]) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Apply suggestions tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestApplySuggestions:
|
||||
def test_apply_spell_fix(self):
|
||||
cells = [
|
||||
{"cell_id": "R29_C2", "row_index": 29, "col_index": 2,
|
||||
"text": "er stammeli", "word_boxes": []},
|
||||
]
|
||||
grid = _make_grid(cells)
|
||||
suggestions = [{
|
||||
"id": "abc",
|
||||
"type": "spell_fix",
|
||||
"zone_index": 0,
|
||||
"row_index": 29,
|
||||
"col_index": 2,
|
||||
"original_text": "stammeli",
|
||||
"suggested_text": "stammeln",
|
||||
}]
|
||||
result = apply_gutter_suggestions(grid, ["abc"], suggestions)
|
||||
assert result["applied_count"] == 1
|
||||
assert grid["zones"][0]["cells"][0]["text"] == "er stammeln"
|
||||
|
||||
def test_apply_hyphen_join(self):
|
||||
cells = [
|
||||
{"cell_id": "R30_C2", "row_index": 30, "col_index": 2,
|
||||
"text": "ve", "word_boxes": []},
|
||||
{"cell_id": "R31_C2", "row_index": 31, "col_index": 2,
|
||||
"text": "künden und", "word_boxes": []},
|
||||
]
|
||||
grid = _make_grid(cells)
|
||||
suggestions = [{
|
||||
"id": "def",
|
||||
"type": "hyphen_join",
|
||||
"zone_index": 0,
|
||||
"row_index": 30,
|
||||
"col_index": 2,
|
||||
"original_text": "ve",
|
||||
"suggested_text": "verkünden",
|
||||
"next_row_index": 31,
|
||||
"display_parts": ["ver-", "künden"],
|
||||
"missing_chars": "r",
|
||||
}]
|
||||
result = apply_gutter_suggestions(grid, ["def"], suggestions)
|
||||
assert result["applied_count"] == 1
|
||||
# Current row: "ve" replaced with "ver-"
|
||||
assert grid["zones"][0]["cells"][0]["text"] == "ver-"
|
||||
# Next row: "künden" removed, "und" remains
|
||||
assert grid["zones"][0]["cells"][1]["text"] == "und"
|
||||
|
||||
def test_apply_nothing_when_no_accepted(self):
|
||||
grid = _make_grid([])
|
||||
result = apply_gutter_suggestions(grid, [], [])
|
||||
assert result["applied_count"] == 0
|
||||
|
||||
def test_skip_unknown_suggestion_id(self):
|
||||
cells = [
|
||||
{"cell_id": "R0_C0", "row_index": 0, "col_index": 0,
|
||||
"text": "test", "word_boxes": []},
|
||||
]
|
||||
grid = _make_grid(cells)
|
||||
suggestions = [{
|
||||
"id": "abc",
|
||||
"type": "spell_fix",
|
||||
"zone_index": 0,
|
||||
"row_index": 0,
|
||||
"col_index": 0,
|
||||
"original_text": "test",
|
||||
"suggested_text": "test2",
|
||||
}]
|
||||
# Accept a non-existent ID
|
||||
result = apply_gutter_suggestions(grid, ["nonexistent"], suggestions)
|
||||
assert result["applied_count"] == 0
|
||||
assert grid["zones"][0]["cells"][0]["text"] == "test"
|
||||
Reference in New Issue
Block a user