diff --git a/admin-lehrer/app/(admin)/ai/ocr-kombi/page.tsx b/admin-lehrer/app/(admin)/ai/ocr-kombi/page.tsx index 45d587e..3e4699d 100644 --- a/admin-lehrer/app/(admin)/ai/ocr-kombi/page.tsx +++ b/admin-lehrer/app/(admin)/ai/ocr-kombi/page.tsx @@ -15,6 +15,7 @@ import { StepOcr } from '@/components/ocr-kombi/StepOcr' import { StepStructure } from '@/components/ocr-kombi/StepStructure' import { StepGridBuild } from '@/components/ocr-kombi/StepGridBuild' import { StepGridReview } from '@/components/ocr-kombi/StepGridReview' +import { StepGutterRepair } from '@/components/ocr-kombi/StepGutterRepair' import { StepGroundTruth } from '@/components/ocr-kombi/StepGroundTruth' import { useKombiPipeline } from './useKombiPipeline' @@ -93,6 +94,8 @@ function OcrKombiContent() { case 9: return case 10: + return + case 11: return ( = { 7: 9, // structure 8: 10, // grid-build 9: 11, // grid-review - 10: 12, // ground-truth + 10: 11, // gutter-repair (shares DB step with grid-review) + 11: 12, // ground-truth } /** Map from DB step to Kombi V2 UI step index */ @@ -68,7 +70,7 @@ export function dbStepToKombiV2Ui(dbStep: number): number { if (dbStep === 9) return 7 // structure if (dbStep === 10) return 8 // grid-build if (dbStep === 11) return 9 // grid-review - return 10 // ground-truth + return 11 // ground-truth } /** Document group: groups multiple sessions from a multi-page upload */ diff --git a/admin-lehrer/app/(admin)/ai/ocr-kombi/useKombiPipeline.ts b/admin-lehrer/app/(admin)/ai/ocr-kombi/useKombiPipeline.ts index 3f5c5fa..796ef0a 100644 --- a/admin-lehrer/app/(admin)/ai/ocr-kombi/useKombiPipeline.ts +++ b/admin-lehrer/app/(admin)/ai/ocr-kombi/useKombiPipeline.ts @@ -129,9 +129,12 @@ export function useKombiPipeline() { const hasGrid = !!data.grid_editor_result const hasStructure = !!data.structure_result const hasWords = !!data.word_result + const hasGutterRepair = !!(data.ground_truth?.gutter_repair) let uiStep: number - if (hasGrid) { + if (hasGrid && hasGutterRepair) { + uiStep = 10 // gutter-repair (already analysed) + } else if (hasGrid) { uiStep = 9 // grid-review } else if (hasStructure) { uiStep = 8 // grid-build diff --git a/admin-lehrer/components/ocr-kombi/StepGutterRepair.tsx b/admin-lehrer/components/ocr-kombi/StepGutterRepair.tsx new file mode 100644 index 0000000..5302237 --- /dev/null +++ b/admin-lehrer/components/ocr-kombi/StepGutterRepair.tsx @@ -0,0 +1,393 @@ +'use client' + +import { useState, useEffect, useCallback } from 'react' + +const KLAUSUR_API = '/klausur-api' + +interface GutterSuggestion { + id: string + type: 'hyphen_join' | 'spell_fix' + zone_index: number + row_index: number + col_index: number + col_type: string + cell_id: string + original_text: string + suggested_text: string + next_row_index: number + next_row_cell_id: string + next_row_text: string + missing_chars: string + display_parts: string[] + confidence: number + reason: string +} + +interface GutterRepairResult { + suggestions: GutterSuggestion[] + stats: { + words_checked: number + gutter_candidates: number + suggestions_found: number + error?: string + } + duration_seconds: number +} + +interface StepGutterRepairProps { + sessionId: string | null + onNext: () => void +} + +/** + * Step 11: Gutter Repair (Wortkorrektur). + * Detects words truncated at the book gutter and proposes corrections. + * User can accept/reject each suggestion individually or in batch. + */ +export function StepGutterRepair({ sessionId, onNext }: StepGutterRepairProps) { + const [loading, setLoading] = useState(false) + const [applying, setApplying] = useState(false) + const [result, setResult] = useState(null) + const [accepted, setAccepted] = useState>(new Set()) + const [rejected, setRejected] = useState>(new Set()) + const [applied, setApplied] = useState(false) + const [error, setError] = useState('') + const [applyMessage, setApplyMessage] = useState('') + + const analyse = useCallback(async () => { + if (!sessionId) return + setLoading(true) + setError('') + setApplied(false) + setApplyMessage('') + try { + const res = await fetch( + `${KLAUSUR_API}/api/v1/ocr-pipeline/sessions/${sessionId}/gutter-repair`, + { method: 'POST' }, + ) + if (!res.ok) { + const body = await res.json().catch(() => ({})) + throw new Error(body.detail || `Analyse fehlgeschlagen (${res.status})`) + } + const data: GutterRepairResult = await res.json() + setResult(data) + // Auto-accept all suggestions with high confidence + const autoAccept = new Set() + for (const s of data.suggestions) { + if (s.confidence >= 0.85) { + autoAccept.add(s.id) + } + } + setAccepted(autoAccept) + setRejected(new Set()) + } catch (e) { + setError(e instanceof Error ? e.message : String(e)) + } finally { + setLoading(false) + } + }, [sessionId]) + + // Auto-trigger analysis on mount + useEffect(() => { + if (sessionId) analyse() + }, [sessionId, analyse]) + + const toggleSuggestion = (id: string) => { + setAccepted(prev => { + const next = new Set(prev) + if (next.has(id)) { + next.delete(id) + setRejected(r => new Set(r).add(id)) + } else { + next.add(id) + setRejected(r => { const n = new Set(r); n.delete(id); return n }) + } + return next + }) + } + + const acceptAll = () => { + if (!result) return + setAccepted(new Set(result.suggestions.map(s => s.id))) + setRejected(new Set()) + } + + const rejectAll = () => { + if (!result) return + setRejected(new Set(result.suggestions.map(s => s.id))) + setAccepted(new Set()) + } + + const applyAccepted = async () => { + if (!sessionId || accepted.size === 0) return + setApplying(true) + setApplyMessage('') + try { + const res = await fetch( + `${KLAUSUR_API}/api/v1/ocr-pipeline/sessions/${sessionId}/gutter-repair/apply`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ accepted: Array.from(accepted) }), + }, + ) + if (!res.ok) { + const body = await res.json().catch(() => ({})) + throw new Error(body.detail || `Anwenden fehlgeschlagen (${res.status})`) + } + const data = await res.json() + setApplied(true) + setApplyMessage(`${data.applied_count} Korrektur(en) angewendet.`) + } catch (e) { + setApplyMessage(e instanceof Error ? e.message : String(e)) + } finally { + setApplying(false) + } + } + + const suggestions = result?.suggestions || [] + const hasSuggestions = suggestions.length > 0 + + return ( +
+ {/* Header */} +
+
+

+ Wortkorrektur (Buchfalz) +

+

+ Erkennt abgeschnittene oder unscharfe Woerter am Buchfalz und Bindestrich-Trennungen ueber Zeilen hinweg. +

+
+ {result && !loading && ( + + )} +
+ + {/* Loading */} + {loading && ( +
+
+ Analysiere Woerter am Buchfalz... +
+ )} + + {/* Error */} + {error && ( +
+
+ {error} +
+ +
+ )} + + {/* No suggestions */} + {result && !hasSuggestions && !loading && ( +
+
+ Keine Buchfalz-Fehler erkannt. +
+
+ {result.stats.words_checked} Woerter geprueft, {result.stats.gutter_candidates} Kandidaten am Rand analysiert. +
+
+ )} + + {/* Suggestions list */} + {hasSuggestions && !loading && ( + <> + {/* Stats bar */} +
+
+ {suggestions.length} Vorschlag/Vorschlaege ·{' '} + {result!.stats.words_checked} Woerter geprueft ·{' '} + {result!.duration_seconds}s +
+
+ + +
+
+ + {/* Suggestion cards */} +
+ {suggestions.map((s) => { + const isAccepted = accepted.has(s.id) + const isRejected = rejected.has(s.id) + + return ( +
+
+ {/* Left: suggestion details */} +
+ {/* Type badge */} +
+ + {s.type === 'hyphen_join' ? 'Zeilenumbruch' : 'Buchfalz-Korrektur'} + + + Zeile {s.row_index + 1}, Spalte {s.col_index + 1} + {s.col_type && ` (${s.col_type.replace('column_', '')})`} + + = 0.9 ? 'text-green-500' : + s.confidence >= 0.7 ? 'text-yellow-500' : 'text-red-500' + }`}> + {Math.round(s.confidence * 100)}% + +
+ + {/* Correction display */} + {s.type === 'hyphen_join' ? ( +
+
+ + {s.original_text} + + Z.{s.row_index + 1} + + + + {s.next_row_text.split(' ')[0]} + + Z.{s.next_row_index + 1} + + + {s.suggested_text} + +
+ {s.missing_chars && ( +
+ Fehlende Zeichen: {s.missing_chars} + {' '}· Darstellung: {s.display_parts.join(' | ')} +
+ )} +
+ ) : ( +
+ + {s.original_text} + + + + {s.suggested_text} + +
+ )} +
+ + {/* Right: accept/reject toggle */} + {!applied && ( + + )} +
+
+ ) + })} +
+ + {/* Apply / Next buttons */} +
+ {!applied ? ( + + ) : ( + + )} + {!applied && ( + + )} +
+ + {/* Apply result message */} + {applyMessage && ( +
+ {applyMessage} +
+ )} + + )} + + {/* Skip button when no suggestions */} + {result && !hasSuggestions && !loading && ( + + )} +
+ ) +} diff --git a/klausur-service/backend/cv_gutter_repair.py b/klausur-service/backend/cv_gutter_repair.py new file mode 100644 index 0000000..bc0780b --- /dev/null +++ b/klausur-service/backend/cv_gutter_repair.py @@ -0,0 +1,569 @@ +""" +Gutter Repair — detects and fixes words truncated or blurred at the book gutter. + +When scanning double-page spreads, the binding area (gutter) causes: + 1. Blurry/garbled trailing characters ("stammeli" → "stammeln") + 2. Words split across lines with a hyphen lost in the gutter + ("ve" + "künden" → "verkünden") + +This module analyses grid cells, identifies gutter-edge candidates, and +proposes corrections using pyspellchecker (DE + EN). + +Lizenz: Apache 2.0 (kommerziell nutzbar) +DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. +""" + +import itertools +import logging +import re +import time +import uuid +from dataclasses import dataclass, field, asdict +from typing import Any, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Spellchecker setup (lazy, cached) +# --------------------------------------------------------------------------- + +_spell_de = None +_spell_en = None +_SPELL_AVAILABLE = False + +def _init_spellcheckers(): + """Lazy-load DE + EN spellcheckers (cached across calls).""" + global _spell_de, _spell_en, _SPELL_AVAILABLE + if _spell_de is not None: + return + try: + from spellchecker import SpellChecker + _spell_de = SpellChecker(language='de', distance=1) + _spell_en = SpellChecker(language='en', distance=1) + _SPELL_AVAILABLE = True + logger.info("Gutter repair: spellcheckers loaded (DE + EN)") + except ImportError: + logger.warning("pyspellchecker not installed — gutter repair unavailable") + + +def _is_known(word: str) -> bool: + """Check if a word is known in DE or EN dictionary.""" + _init_spellcheckers() + if not _SPELL_AVAILABLE: + return False + w = word.lower() + return bool(_spell_de.known([w])) or bool(_spell_en.known([w])) + + +def _spell_correction(word: str, lang: str = "both") -> Optional[str]: + """Get best spellchecker correction for a word.""" + _init_spellcheckers() + if not _SPELL_AVAILABLE: + return None + w = word.lower() + result = None + if lang in ("de", "both") and _spell_de: + result = _spell_de.correction(w) + if result and result != w and _spell_de.known([result]): + return result + if lang in ("en", "both") and _spell_en: + result = _spell_en.correction(w) + if result and result != w and _spell_en.known([result]): + return result + return None + + +# --------------------------------------------------------------------------- +# Gutter position detection +# --------------------------------------------------------------------------- + +# Minimum word length to consider for repair (very short words are often +# legitimate: "a", "to", "in", etc.) +_MIN_WORD_LEN = 3 + +# How close to the right column edge a word must be to count as "gutter-adjacent". +# Expressed as fraction of column width (e.g. 0.75 = rightmost 25%). +_GUTTER_EDGE_THRESHOLD = 0.70 + +# Small common words / abbreviations that should NOT be repaired +_STOPWORDS = frozenset([ + # German + "ab", "an", "am", "da", "er", "es", "im", "in", "ja", "ob", "so", "um", + "zu", "wo", "du", "eh", "ei", "je", "na", "nu", "oh", + # English + "a", "am", "an", "as", "at", "be", "by", "do", "go", "he", "if", "in", + "is", "it", "me", "my", "no", "of", "on", "or", "so", "to", "up", "us", + "we", +]) + +# IPA / phonetic patterns — skip these cells +_IPA_RE = re.compile(r'[\[\]/ˈˌːʃʒθðŋɑɒæɔəɛɪʊʌ]') + + +def _is_ipa_text(text: str) -> bool: + """True if text looks like IPA transcription.""" + return bool(_IPA_RE.search(text)) + + +def _word_is_at_gutter_edge(word_bbox: Dict, col_x: float, col_width: float) -> bool: + """Check if a word's right edge is near the right boundary of its column.""" + if col_width <= 0: + return False + word_right = word_bbox.get("left", 0) + word_bbox.get("width", 0) + col_right = col_x + col_width + # Word's right edge within the rightmost portion of the column + relative_pos = (word_right - col_x) / col_width + return relative_pos >= _GUTTER_EDGE_THRESHOLD + + +# --------------------------------------------------------------------------- +# Suggestion types +# --------------------------------------------------------------------------- + +@dataclass +class GutterSuggestion: + """A single correction suggestion.""" + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + type: str = "" # "hyphen_join" | "spell_fix" + zone_index: int = 0 + row_index: int = 0 + col_index: int = 0 + col_type: str = "" + cell_id: str = "" + original_text: str = "" + suggested_text: str = "" + # For hyphen_join: + next_row_index: int = -1 + next_row_cell_id: str = "" + next_row_text: str = "" + missing_chars: str = "" + display_parts: List[str] = field(default_factory=list) + # Meta: + confidence: float = 0.0 + reason: str = "" # "gutter_truncation" | "gutter_blur" | "hyphen_continuation" + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +# --------------------------------------------------------------------------- +# Core repair logic +# --------------------------------------------------------------------------- + +def _try_hyphen_join( + word_text: str, + next_word_text: str, + max_missing: int = 3, +) -> Optional[Tuple[str, str, float]]: + """Try joining two fragments with 0..max_missing interpolated chars. + + Returns (joined_word, missing_chars, confidence) or None. + """ + base = word_text.rstrip("-").rstrip() + continuation = next_word_text.lstrip() + + if not base or not continuation: + return None + + # 1. Direct join (no missing chars) + direct = base + continuation + if _is_known(direct): + return (direct, "", 0.95) + + # 2. Try with 1..max_missing missing characters + # Use common letters, weighted by frequency in German/English + _COMMON_CHARS = "enristaldhgcmobwfkzpvjyxqu" + + for n_missing in range(1, max_missing + 1): + for chars in itertools.product(_COMMON_CHARS[:15], repeat=n_missing): + candidate = base + "".join(chars) + continuation + if _is_known(candidate): + missing = "".join(chars) + # Confidence decreases with more missing chars + conf = 0.90 - (n_missing - 1) * 0.10 + return (candidate, missing, conf) + + return None + + +def _try_spell_fix(word_text: str, col_type: str = "") -> Optional[Tuple[str, float]]: + """Try to fix a single garbled gutter word via spellchecker. + + Returns (corrected_word, confidence) or None. + """ + if len(word_text) < _MIN_WORD_LEN: + return None + + # Determine language priority from column type + if "en" in col_type: + lang = "en" + elif "de" in col_type: + lang = "de" + else: + lang = "both" + + correction = _spell_correction(word_text, lang=lang) + if not correction: + # Try the other language too + correction = _spell_correction(word_text, lang="both") + + if correction and correction.lower() != word_text.lower(): + # Preserve original casing of first letter + if word_text[0].isupper(): + correction = correction[0].upper() + correction[1:] + # Confidence based on edit distance + dist = _edit_distance(word_text.lower(), correction.lower()) + conf = max(0.5, 1.0 - dist * 0.15) + return (correction, conf) + + return None + + +def _edit_distance(a: str, b: str) -> int: + """Simple Levenshtein distance.""" + if len(a) < len(b): + return _edit_distance(b, a) + if len(b) == 0: + return len(a) + prev = list(range(len(b) + 1)) + for i, ca in enumerate(a): + curr = [i + 1] + for j, cb in enumerate(b): + cost = 0 if ca == cb else 1 + curr.append(min(curr[j] + 1, prev[j + 1] + 1, prev[j] + cost)) + prev = curr + return prev[len(b)] + + +# --------------------------------------------------------------------------- +# Grid analysis +# --------------------------------------------------------------------------- + +def analyse_grid_for_gutter_repair( + grid_data: Dict[str, Any], + image_width: int = 0, +) -> Dict[str, Any]: + """Analyse a structured grid and return gutter repair suggestions. + + Args: + grid_data: The grid_editor_result from the session (zones→cells structure). + image_width: Image width in pixels (for determining gutter side). + + Returns: + Dict with "suggestions" list and "stats". + """ + t0 = time.time() + _init_spellcheckers() + + if not _SPELL_AVAILABLE: + return { + "suggestions": [], + "stats": {"error": "pyspellchecker not installed"}, + "duration_seconds": 0, + } + + zones = grid_data.get("zones", []) + suggestions: List[GutterSuggestion] = [] + words_checked = 0 + gutter_candidates = 0 + + for zi, zone in enumerate(zones): + columns = zone.get("columns", []) + cells = zone.get("cells", []) + if not columns or not cells: + continue + + # Build column lookup: col_index → {x, width, type} + col_info: Dict[int, Dict] = {} + for col in columns: + ci = col.get("index", col.get("col_index", -1)) + col_info[ci] = { + "x": col.get("x_min_px", col.get("x", 0)), + "width": col.get("x_max_px", col.get("width", 0)) - col.get("x_min_px", col.get("x", 0)), + "type": col.get("type", col.get("col_type", "")), + } + + # Build row→col→cell lookup + cell_map: Dict[Tuple[int, int], Dict] = {} + max_row = 0 + for cell in cells: + ri = cell.get("row_index", 0) + ci = cell.get("col_index", 0) + cell_map[(ri, ci)] = cell + if ri > max_row: + max_row = ri + + # Determine which columns are at the gutter edge. + # For a left page: rightmost content columns. + # For now, check ALL columns — a word is a candidate if it's at the + # right edge of its column AND not a known word. + for (ri, ci), cell in cell_map.items(): + text = (cell.get("text") or "").strip() + if not text or len(text) < _MIN_WORD_LEN: + continue + if _is_ipa_text(text): + continue + + words_checked += 1 + col = col_info.get(ci, {}) + col_type = col.get("type", "") + + # Get word boxes to check position + word_boxes = cell.get("word_boxes", []) + + # Check the LAST word in the cell (rightmost, closest to gutter) + cell_words = text.split() + if not cell_words: + continue + + last_word = cell_words[-1] + + # Skip stopwords and very short words + if last_word.lower().rstrip(".,;:!?-") in _STOPWORDS: + continue + + last_word_clean = last_word.rstrip(".,;:!?") + if len(last_word_clean) < _MIN_WORD_LEN: + continue + + # Check if the last word is at the gutter edge + is_at_edge = False + if word_boxes: + last_wb = word_boxes[-1] + is_at_edge = _word_is_at_gutter_edge( + last_wb, col.get("x", 0), col.get("width", 1) + ) + else: + # No word boxes — use cell bbox + bbox = cell.get("bbox_px", {}) + is_at_edge = _word_is_at_gutter_edge( + {"left": bbox.get("x", 0), "width": bbox.get("w", 0)}, + col.get("x", 0), col.get("width", 1) + ) + + if not is_at_edge: + continue + + # Word is at gutter edge — check if it's a known word + if _is_known(last_word_clean): + continue + + gutter_candidates += 1 + + # Check if the word ends with "-" (explicit hyphen break) + ends_with_hyphen = last_word.endswith("-") + + # --- Strategy 1: Hyphen join with next row --- + next_cell = cell_map.get((ri + 1, ci)) + if next_cell: + next_text = (next_cell.get("text") or "").strip() + next_words = next_text.split() + if next_words: + first_next = next_words[0] + first_alpha = next((c for c in first_next if c.isalpha()), "") + + # Continuation likely if: + # - explicit hyphen, OR + # - next row starts lowercase (= not a new entry) + if ends_with_hyphen or (first_alpha and first_alpha.islower()): + result = _try_hyphen_join(last_word_clean, first_next) + if result: + joined, missing, conf = result + # Build display parts: show hyphenation for original layout + if ends_with_hyphen: + display_p1 = last_word_clean.rstrip("-") + if missing: + display_p1 += missing + display_p1 += "-" + else: + display_p1 = last_word_clean + if missing: + display_p1 += missing + "-" + else: + display_p1 += "-" + + # Reconstruct cell texts after join + # Current cell: replace last word with first part (hyphenated) + # Next cell: remove first word + remaining_next = " ".join(next_words[1:]) + + suggestion = GutterSuggestion( + type="hyphen_join", + zone_index=zi, + row_index=ri, + col_index=ci, + col_type=col_type, + cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"), + original_text=last_word, + suggested_text=joined, + next_row_index=ri + 1, + next_row_cell_id=next_cell.get("cell_id", f"R{ri+1:02d}_C{ci}"), + next_row_text=next_text, + missing_chars=missing, + display_parts=[display_p1, first_next], + confidence=conf, + reason="gutter_truncation" if missing else "hyphen_continuation", + ) + suggestions.append(suggestion) + continue # skip spell_fix if hyphen_join found + + # --- Strategy 2: Single-word spell fix --- + fix_result = _try_spell_fix(last_word_clean, col_type) + if fix_result: + corrected, conf = fix_result + suggestion = GutterSuggestion( + type="spell_fix", + zone_index=zi, + row_index=ri, + col_index=ci, + col_type=col_type, + cell_id=cell.get("cell_id", f"R{ri:02d}_C{ci}"), + original_text=last_word, + suggested_text=corrected, + confidence=conf, + reason="gutter_blur", + ) + suggestions.append(suggestion) + + duration = round(time.time() - t0, 3) + + logger.info( + "Gutter repair: checked %d words, %d gutter candidates, %d suggestions (%.2fs)", + words_checked, gutter_candidates, len(suggestions), duration, + ) + + return { + "suggestions": [s.to_dict() for s in suggestions], + "stats": { + "words_checked": words_checked, + "gutter_candidates": gutter_candidates, + "suggestions_found": len(suggestions), + }, + "duration_seconds": duration, + } + + +def apply_gutter_suggestions( + grid_data: Dict[str, Any], + accepted_ids: List[str], + suggestions: List[Dict[str, Any]], +) -> Dict[str, Any]: + """Apply accepted gutter repair suggestions to the grid data. + + Modifies cells in-place and returns summary of changes. + + Args: + grid_data: The grid_editor_result (zones→cells). + accepted_ids: List of suggestion IDs the user accepted. + suggestions: The full suggestions list (from analyse_grid_for_gutter_repair). + + Returns: + Dict with "applied_count" and "changes" list. + """ + accepted_set = set(accepted_ids) + accepted_suggestions = [s for s in suggestions if s.get("id") in accepted_set] + + zones = grid_data.get("zones", []) + changes: List[Dict[str, Any]] = [] + + for s in accepted_suggestions: + zi = s.get("zone_index", 0) + ri = s.get("row_index", 0) + ci = s.get("col_index", 0) + stype = s.get("type", "") + + if zi >= len(zones): + continue + zone_cells = zones[zi].get("cells", []) + + # Find the target cell + target_cell = None + for cell in zone_cells: + if cell.get("row_index") == ri and cell.get("col_index") == ci: + target_cell = cell + break + + if not target_cell: + continue + + old_text = target_cell.get("text", "") + + if stype == "spell_fix": + # Replace the last word in the cell text + original_word = s.get("original_text", "") + corrected = s.get("suggested_text", "") + if original_word and corrected: + # Replace from the right (last occurrence) + idx = old_text.rfind(original_word) + if idx >= 0: + new_text = old_text[:idx] + corrected + old_text[idx + len(original_word):] + target_cell["text"] = new_text + changes.append({ + "type": "spell_fix", + "zone_index": zi, + "row_index": ri, + "col_index": ci, + "cell_id": target_cell.get("cell_id", ""), + "old_text": old_text, + "new_text": new_text, + }) + + elif stype == "hyphen_join": + # Current cell: replace last word with the hyphenated first part + original_word = s.get("original_text", "") + joined = s.get("suggested_text", "") + display_parts = s.get("display_parts", []) + next_ri = s.get("next_row_index", -1) + + if not original_word or not joined or not display_parts: + continue + + # The first display part is what goes in the current row + first_part = display_parts[0] if display_parts else "" + + # Replace the last word in current cell + idx = old_text.rfind(original_word) + if idx >= 0: + new_text = old_text[:idx] + first_part + old_text[idx + len(original_word):] + target_cell["text"] = new_text + changes.append({ + "type": "hyphen_join_current", + "zone_index": zi, + "row_index": ri, + "col_index": ci, + "cell_id": target_cell.get("cell_id", ""), + "old_text": old_text, + "new_text": new_text, + "joined_word": joined, + }) + + # Next row: remove the first word (it's now joined into current row) + if next_ri >= 0: + next_cell = None + for cell in zone_cells: + if cell.get("row_index") == next_ri and cell.get("col_index") == ci: + next_cell = cell + break + + if next_cell: + next_old = next_cell.get("text", "") + next_words = next_old.split() + if next_words: + next_new = " ".join(next_words[1:]) + next_cell["text"] = next_new + changes.append({ + "type": "hyphen_join_next", + "zone_index": zi, + "row_index": next_ri, + "col_index": ci, + "cell_id": next_cell.get("cell_id", ""), + "old_text": next_old, + "new_text": next_new, + }) + + logger.info("Gutter repair applied: %d/%d suggestions", len(changes), len(accepted_suggestions)) + + return { + "applied_count": len(accepted_suggestions), + "changes": changes, + } diff --git a/klausur-service/backend/grid_editor_api.py b/klausur-service/backend/grid_editor_api.py index 003de0d..33e16f8 100644 --- a/klausur-service/backend/grid_editor_api.py +++ b/klausur-service/backend/grid_editor_api.py @@ -1851,3 +1851,90 @@ async def get_grid(session_id: str): ) return result + + +# --------------------------------------------------------------------------- +# Gutter Repair endpoints +# --------------------------------------------------------------------------- + +@router.post("/sessions/{session_id}/gutter-repair") +async def gutter_repair(session_id: str): + """Analyse grid for gutter-edge OCR errors and return repair suggestions. + + Detects: + - Words truncated/blurred at the book binding (spell_fix) + - Words split across rows with missing hyphen chars (hyphen_join) + """ + session = await get_session_db(session_id) + if not session: + raise HTTPException(status_code=404, detail=f"Session {session_id} not found") + + grid_data = session.get("grid_editor_result") + if not grid_data: + raise HTTPException( + status_code=400, + detail="No grid data. Run build-grid first.", + ) + + from cv_gutter_repair import analyse_grid_for_gutter_repair + + image_width = grid_data.get("image_width", 0) + result = analyse_grid_for_gutter_repair(grid_data, image_width=image_width) + + # Persist suggestions in ground_truth.gutter_repair (avoids DB migration) + gt = session.get("ground_truth") or {} + gt["gutter_repair"] = result + await update_session_db(session_id, ground_truth=gt) + + logger.info( + "gutter-repair session %s: %d suggestions in %.2fs", + session_id, + result.get("stats", {}).get("suggestions_found", 0), + result.get("duration_seconds", 0), + ) + + return result + + +@router.post("/sessions/{session_id}/gutter-repair/apply") +async def gutter_repair_apply(session_id: str, request: Request): + """Apply accepted gutter repair suggestions to the grid. + + Body: { "accepted": ["suggestion_id_1", "suggestion_id_2", ...] } + """ + session = await get_session_db(session_id) + if not session: + raise HTTPException(status_code=404, detail=f"Session {session_id} not found") + + grid_data = session.get("grid_editor_result") + if not grid_data: + raise HTTPException(status_code=400, detail="No grid data.") + + gt = session.get("ground_truth") or {} + gutter_result = gt.get("gutter_repair") + if not gutter_result: + raise HTTPException( + status_code=400, + detail="No gutter repair data. Run gutter-repair first.", + ) + + body = await request.json() + accepted_ids = body.get("accepted", []) + if not accepted_ids: + return {"applied_count": 0, "changes": []} + + from cv_gutter_repair import apply_gutter_suggestions + + suggestions = gutter_result.get("suggestions", []) + result = apply_gutter_suggestions(grid_data, accepted_ids, suggestions) + + # Save updated grid back to session + await update_session_db(session_id, grid_editor_result=grid_data) + + logger.info( + "gutter-repair/apply session %s: %d changes applied", + session_id, + result.get("applied_count", 0), + ) + + return result diff --git a/klausur-service/backend/tests/test_gutter_repair.py b/klausur-service/backend/tests/test_gutter_repair.py new file mode 100644 index 0000000..b39bf82 --- /dev/null +++ b/klausur-service/backend/tests/test_gutter_repair.py @@ -0,0 +1,316 @@ +"""Tests for cv_gutter_repair: gutter-edge word detection and repair.""" + +import pytest +import sys +import os + +# Add parent directory to path so we can import the module +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from cv_gutter_repair import ( + _is_known, + _try_hyphen_join, + _try_spell_fix, + _edit_distance, + _word_is_at_gutter_edge, + analyse_grid_for_gutter_repair, + apply_gutter_suggestions, +) + + +# --------------------------------------------------------------------------- +# Helper function tests +# --------------------------------------------------------------------------- + +class TestEditDistance: + def test_identical(self): + assert _edit_distance("hello", "hello") == 0 + + def test_one_substitution(self): + assert _edit_distance("stammeli", "stammeln") == 1 + + def test_one_deletion(self): + assert _edit_distance("cat", "ca") == 1 + + def test_one_insertion(self): + assert _edit_distance("ca", "cat") == 1 + + def test_empty(self): + assert _edit_distance("", "abc") == 3 + assert _edit_distance("abc", "") == 3 + + def test_both_empty(self): + assert _edit_distance("", "") == 0 + + +class TestWordIsAtGutterEdge: + def test_word_at_right_edge(self): + # Word right edge at 90% of column = within gutter zone + word_bbox = {"left": 80, "width": 15} # right edge = 95 + assert _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=100) + + def test_word_in_middle(self): + # Word right edge at 50% of column = NOT at gutter + word_bbox = {"left": 30, "width": 20} # right edge = 50 + assert not _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=100) + + def test_word_at_left(self): + word_bbox = {"left": 5, "width": 20} # right edge = 25 + assert not _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=100) + + def test_zero_width_column(self): + word_bbox = {"left": 0, "width": 10} + assert not _word_is_at_gutter_edge(word_bbox, col_x=0, col_width=0) + + +# --------------------------------------------------------------------------- +# Spellchecker-dependent tests (skip if not installed) +# --------------------------------------------------------------------------- + +try: + from spellchecker import SpellChecker + _HAS_SPELLCHECKER = True +except ImportError: + _HAS_SPELLCHECKER = False + +needs_spellchecker = pytest.mark.skipif( + not _HAS_SPELLCHECKER, reason="pyspellchecker not installed" +) + + +@needs_spellchecker +class TestIsKnown: + def test_known_english(self): + assert _is_known("hello") is True + assert _is_known("world") is True + + def test_known_german(self): + assert _is_known("verkünden") is True + assert _is_known("stammeln") is True + + def test_unknown_garbled(self): + assert _is_known("stammeli") is False + assert _is_known("xyzqwp") is False + + def test_short_word(self): + # Words < 3 chars are not checked + assert _is_known("a") is False + + +@needs_spellchecker +class TestTryHyphenJoin: + def test_direct_join(self): + # "ver" + "künden" = "verkünden" + result = _try_hyphen_join("ver-", "künden") + assert result is not None + joined, missing, conf = result + assert joined == "verkünden" + assert missing == "" + assert conf >= 0.9 + + def test_join_with_missing_chars(self): + # "ve" + "künden" → needs "r" in between → "verkünden" + result = _try_hyphen_join("ve", "künden", max_missing=2) + assert result is not None + joined, missing, conf = result + assert joined == "verkünden" + assert "r" in missing + + def test_no_valid_join(self): + result = _try_hyphen_join("xyz", "qwpgh") + assert result is None + + def test_empty_inputs(self): + assert _try_hyphen_join("", "word") is None + assert _try_hyphen_join("word", "") is None + + +@needs_spellchecker +class TestTrySpellFix: + def test_fix_garbled_ending(self): + # "stammeli" should suggest "stammeln" + result = _try_spell_fix("stammeli", col_type="column_de") + assert result is not None + corrected, conf = result + assert corrected == "stammeln" + + def test_known_word_not_fixed(self): + # "Haus" is correct — no fix needed + result = _try_spell_fix("Haus", col_type="column_de") + # Should be None since the word is correct + # (unless spellchecker suggests something else) + # Either None or same word is acceptable + if result is not None: + corrected, _ = result + assert corrected.lower() == "haus" + + def test_short_word_skipped(self): + result = _try_spell_fix("ab") + assert result is None + + +# --------------------------------------------------------------------------- +# Grid analysis tests +# --------------------------------------------------------------------------- + +def _make_grid(cells, columns=None): + """Helper to create a minimal grid_data structure.""" + if columns is None: + columns = [ + {"index": 0, "type": "column_en", "x_min_px": 0, "x_max_px": 200}, + {"index": 1, "type": "column_de", "x_min_px": 200, "x_max_px": 400}, + {"index": 2, "type": "column_text", "x_min_px": 400, "x_max_px": 600}, + ] + return { + "image_width": 600, + "image_height": 800, + "zones": [{ + "columns": columns, + "cells": cells, + }], + } + + +def _make_cell(row, col, text, left=0, width=50, col_width=200, col_x=0): + """Helper to create a cell dict with word_boxes at a specific position.""" + return { + "cell_id": f"R{row:02d}_C{col}", + "row_index": row, + "col_index": col, + "col_type": "column_text", + "text": text, + "confidence": 90.0, + "bbox_px": {"x": left, "y": row * 25, "w": width, "h": 20}, + "word_boxes": [ + {"text": text, "left": left, "top": row * 25, "width": width, "height": 20, "conf": 90}, + ], + } + + +@needs_spellchecker +class TestAnalyseGrid: + def test_empty_grid(self): + result = analyse_grid_for_gutter_repair({"zones": []}) + assert result["suggestions"] == [] + assert result["stats"]["words_checked"] == 0 + + def test_detects_spell_fix_at_edge(self): + # "stammeli" at position 160 in a column 0-200 wide = 80% = at gutter + cells = [ + _make_cell(29, 2, "stammeli", left=540, width=55, col_width=200, col_x=400), + ] + grid = _make_grid(cells) + result = analyse_grid_for_gutter_repair(grid) + suggestions = result["suggestions"] + assert len(suggestions) >= 1 + assert suggestions[0]["type"] == "spell_fix" + assert suggestions[0]["suggested_text"] == "stammeln" + + def test_detects_hyphen_join(self): + # Row 30: "ve" at gutter edge, Row 31: "künden" + cells = [ + _make_cell(30, 2, "ve", left=570, width=25, col_width=200, col_x=400), + _make_cell(31, 2, "künden", left=410, width=80, col_width=200, col_x=400), + ] + grid = _make_grid(cells) + result = analyse_grid_for_gutter_repair(grid) + suggestions = result["suggestions"] + # Should find hyphen_join or spell_fix + assert len(suggestions) >= 1 + + def test_ignores_known_words(self): + # "hello" is a known word — should not be suggested + cells = [ + _make_cell(0, 0, "hello", left=160, width=35), + ] + grid = _make_grid(cells) + result = analyse_grid_for_gutter_repair(grid) + # Should not suggest anything for known words + spell_fixes = [s for s in result["suggestions"] if s["original_text"] == "hello"] + assert len(spell_fixes) == 0 + + def test_ignores_words_not_at_edge(self): + # "stammeli" at position 10 = NOT at gutter edge + cells = [ + _make_cell(0, 0, "stammeli", left=10, width=50), + ] + grid = _make_grid(cells) + result = analyse_grid_for_gutter_repair(grid) + assert len(result["suggestions"]) == 0 + + +# --------------------------------------------------------------------------- +# Apply suggestions tests +# --------------------------------------------------------------------------- + +class TestApplySuggestions: + def test_apply_spell_fix(self): + cells = [ + {"cell_id": "R29_C2", "row_index": 29, "col_index": 2, + "text": "er stammeli", "word_boxes": []}, + ] + grid = _make_grid(cells) + suggestions = [{ + "id": "abc", + "type": "spell_fix", + "zone_index": 0, + "row_index": 29, + "col_index": 2, + "original_text": "stammeli", + "suggested_text": "stammeln", + }] + result = apply_gutter_suggestions(grid, ["abc"], suggestions) + assert result["applied_count"] == 1 + assert grid["zones"][0]["cells"][0]["text"] == "er stammeln" + + def test_apply_hyphen_join(self): + cells = [ + {"cell_id": "R30_C2", "row_index": 30, "col_index": 2, + "text": "ve", "word_boxes": []}, + {"cell_id": "R31_C2", "row_index": 31, "col_index": 2, + "text": "künden und", "word_boxes": []}, + ] + grid = _make_grid(cells) + suggestions = [{ + "id": "def", + "type": "hyphen_join", + "zone_index": 0, + "row_index": 30, + "col_index": 2, + "original_text": "ve", + "suggested_text": "verkünden", + "next_row_index": 31, + "display_parts": ["ver-", "künden"], + "missing_chars": "r", + }] + result = apply_gutter_suggestions(grid, ["def"], suggestions) + assert result["applied_count"] == 1 + # Current row: "ve" replaced with "ver-" + assert grid["zones"][0]["cells"][0]["text"] == "ver-" + # Next row: "künden" removed, "und" remains + assert grid["zones"][0]["cells"][1]["text"] == "und" + + def test_apply_nothing_when_no_accepted(self): + grid = _make_grid([]) + result = apply_gutter_suggestions(grid, [], []) + assert result["applied_count"] == 0 + + def test_skip_unknown_suggestion_id(self): + cells = [ + {"cell_id": "R0_C0", "row_index": 0, "col_index": 0, + "text": "test", "word_boxes": []}, + ] + grid = _make_grid(cells) + suggestions = [{ + "id": "abc", + "type": "spell_fix", + "zone_index": 0, + "row_index": 0, + "col_index": 0, + "original_text": "test", + "suggested_text": "test2", + }] + # Accept a non-existent ID + result = apply_gutter_suggestions(grid, ["nonexistent"], suggestions) + assert result["applied_count"] == 0 + assert grid["zones"][0]["cells"][0]["text"] == "test"