Files
breakpilot-lehrer/klausur-service/backend/cv_ocr_engines.py
Benjamin Admin be86a7d14d fix: preserve pipe syllable dividers + detect alphabet sidebar columns
1. Pipe divider fix: Changed OCR char-confusion regex so | between
   letters (Ka|me|rad) is NOT converted to I. Only standalone/
   word-boundary pipes are converted (|ch → Ich, | want → I want).

2. Alphabet sidebar detection improvements:
   - _filter_decorative_margin() now considers 2-char words (OCR reads
     "Aa", "Bb" from sidebars), lowered min strip from 8→6
   - _filter_border_strip_words() lowered decorative threshold from 50%→45%
   - New step 4f: grid-level thin-edge-column filter as safety net —
     removes edge columns with <35% fill rate and >60% short text

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 13:52:11 +01:00

2069 lines
78 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OCR engines (RapidOCR, TrOCR, LightOn), vocab postprocessing, and text cleaning.
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import io
import logging
import os
import re
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from cv_vocab_types import (
IPA_AVAILABLE,
PageRegion,
RowGeometry,
_britfone_dict,
_ipa_convert_american,
)
logger = logging.getLogger(__name__)
try:
import cv2
except ImportError:
cv2 = None # type: ignore[assignment]
try:
from PIL import Image
except ImportError:
Image = None # type: ignore[assignment,misc]
# =============================================================================
# Pipeline Step 5: Word Grid from Columns × Rows
# =============================================================================
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
"""Group words by Y position into lines, sorted by X within each line."""
if not words:
return []
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
lines: List[List[Dict]] = []
current_line: List[Dict] = [sorted_words[0]]
current_y = sorted_words[0]['top']
for word in sorted_words[1:]:
if abs(word['top'] - current_y) <= y_tolerance_px:
current_line.append(word)
else:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
current_line = [word]
current_y = word['top']
if current_line:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
return lines
def _words_to_reading_order_lines(words: List[Dict], y_tolerance_px: int = 15) -> List[str]:
"""Group OCR words into visual lines in reading order.
Returns a list of line strings (one per visual line in the cell).
"""
if not words:
return []
lines = _group_words_into_lines(words, y_tolerance_px=y_tolerance_px)
return [' '.join(w['text'] for w in line) for line in lines]
def _rejoin_hyphenated(lines: List[str]) -> List[str]:
"""Rejoin words split by line-break hyphenation.
E.g. ['Fuß-', 'boden'] → ['Fußboden']
['some text-', 'thing here'] → ['something here']
"""
if len(lines) <= 1:
return lines
result = []
i = 0
while i < len(lines):
line = lines[i]
# If line ends with '-' and there's a next line, rejoin
if i + 1 < len(lines) and line.rstrip().endswith('-'):
stripped = line.rstrip()
# Get the word fragment before hyphen (last word)
prefix = stripped[:-1] # remove trailing hyphen
next_line = lines[i + 1]
# Join: last word of this line + first word of next line
prefix_words = prefix.rsplit(' ', 1)
next_words = next_line.split(' ', 1)
if len(prefix_words) > 1:
joined = prefix_words[0] + ' ' + prefix_words[1] + next_words[0]
else:
joined = prefix_words[0] + next_words[0]
remainder = next_words[1] if len(next_words) > 1 else ''
if remainder:
result.append(joined + ' ' + remainder)
else:
result.append(joined)
i += 2
else:
result.append(line)
i += 1
return result
def _words_to_reading_order_text(words: List[Dict], y_tolerance_px: int = 15) -> str:
"""Join OCR words into text in correct reading order, preserving line breaks.
Groups words into visual lines by Y-tolerance, sorts each line by X,
rejoins hyphenated words, then joins lines with newlines.
"""
lines = _words_to_reading_order_lines(words, y_tolerance_px)
lines = _rejoin_hyphenated(lines)
return '\n'.join(lines)
def _words_to_spaced_text(words: List[Dict], y_tolerance_px: int = 15) -> str:
"""Join OCR words preserving proportional horizontal spacing.
Instead of single spaces between words, inserts multiple spaces based on
the pixel gap between words relative to average character width.
Useful for box sub-sessions where spatial layout matters.
"""
lines = _group_words_into_lines(words, y_tolerance_px=y_tolerance_px)
result_lines = []
for line_words in lines:
if not line_words:
continue
sorted_words = sorted(line_words, key=lambda w: w['left'])
# Calculate average character width from all words in line
total_chars = sum(len(w['text']) for w in sorted_words if w.get('text'))
total_width = sum(w['width'] for w in sorted_words if w.get('text'))
avg_char_width = total_width / total_chars if total_chars > 0 else 10
parts = []
for i, word in enumerate(sorted_words):
parts.append(word.get('text', ''))
if i < len(sorted_words) - 1:
next_word = sorted_words[i + 1]
gap_px = next_word['left'] - (word['left'] + word['width'])
num_spaces = max(1, round(gap_px / avg_char_width))
parts.append(' ' * num_spaces)
result_lines.append(''.join(parts))
return '\n'.join(result_lines)
# --- RapidOCR integration (PaddleOCR models on ONNX Runtime) ---
_rapid_engine = None
RAPIDOCR_AVAILABLE = False
try:
from rapidocr import RapidOCR as _RapidOCRClass
from rapidocr import LangRec as _LangRec, OCRVersion as _OCRVersion, ModelType as _ModelType
RAPIDOCR_AVAILABLE = True
logger.info("RapidOCR available — can be used as alternative to Tesseract")
except ImportError:
logger.info("RapidOCR not installed — using Tesseract only")
def _get_rapid_engine():
"""Lazy-init RapidOCR engine with PP-OCRv5 Latin model for German support."""
global _rapid_engine
if _rapid_engine is None:
_rapid_engine = _RapidOCRClass(params={
# PP-OCRv5 Latin model — supports German umlauts (ä, ö, ü, ß)
"Rec.lang_type": _LangRec.LATIN,
"Rec.model_type": _ModelType.SERVER,
"Rec.ocr_version": _OCRVersion.PPOCRV5,
# Tighter detection boxes to reduce word merging
"Det.unclip_ratio": 1.3,
# Lower threshold to detect small chars (periods, ellipsis, phonetics)
"Det.box_thresh": 0.4,
# Silence verbose logging
"Global.log_level": "critical",
})
logger.info("RapidOCR engine initialized (PP-OCRv5 Latin, unclip_ratio=1.3)")
return _rapid_engine
def ocr_region_rapid(
img_bgr: np.ndarray,
region: PageRegion,
) -> List[Dict[str, Any]]:
"""Run RapidOCR on a specific region, returning word dicts compatible with Tesseract format.
Args:
img_bgr: Full-page BGR image (NOT binarized — RapidOCR works on color/gray).
region: Region to crop and OCR.
Returns:
List of word dicts with text, left, top, width, height, conf, region_type.
"""
engine = _get_rapid_engine()
# Crop region from BGR image
crop = img_bgr[region.y:region.y + region.height,
region.x:region.x + region.width]
if crop.size == 0:
return []
result = engine(crop)
if result is None or result.boxes is None or result.txts is None:
return []
words = []
boxes = result.boxes # shape (N, 4, 2) — 4 corner points per text line
txts = result.txts # tuple of strings
scores = result.scores # tuple of floats
for i, (box, txt, score) in enumerate(zip(boxes, txts, scores)):
if not txt or not txt.strip():
continue
# box is [[x1,y1],[x2,y2],[x3,y3],[x4,y4]] (clockwise from top-left)
xs = [p[0] for p in box]
ys = [p[1] for p in box]
left = int(min(xs))
top = int(min(ys))
w = int(max(xs) - left)
h = int(max(ys) - top)
words.append({
'text': txt.strip(),
'left': left + region.x, # Absolute coords
'top': top + region.y,
'width': w,
'height': h,
'conf': int(score * 100), # 0-100 like Tesseract
'region_type': region.type,
})
return words
def ocr_region_trocr(img_bgr: np.ndarray, region: PageRegion, handwritten: bool = False) -> List[Dict[str, Any]]:
"""Run TrOCR on a region. Returns line-level word dicts (same format as ocr_region_rapid).
Uses trocr_service.get_trocr_model() + _split_into_lines() for line segmentation.
Bboxes are approximated from equal line-height distribution within the region.
Falls back to Tesseract if TrOCR is not available.
"""
from services.trocr_service import get_trocr_model, _split_into_lines, _check_trocr_available
if not _check_trocr_available():
logger.warning("TrOCR not available, falling back to Tesseract")
if region.height > 0 and region.width > 0:
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None
if ocr_img_crop is not None:
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
return []
crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width]
if crop.size == 0:
return []
try:
import torch
from PIL import Image as _PILImage
processor, model = get_trocr_model(handwritten=handwritten)
if processor is None or model is None:
logger.warning("TrOCR model not loaded, falling back to Tesseract")
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
lines = _split_into_lines(pil_crop)
if not lines:
lines = [pil_crop]
device = next(model.parameters()).device
all_text = []
confidences = []
for line_img in lines:
pixel_values = processor(images=line_img, return_tensors="pt").pixel_values.to(device)
with torch.no_grad():
generated_ids = model.generate(pixel_values, max_length=128)
text_line = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
if text_line:
all_text.append(text_line)
confidences.append(0.85 if len(text_line) > 3 else 0.5)
if not all_text:
return []
avg_conf = int(sum(confidences) / len(confidences) * 100)
line_h = region.height // max(len(all_text), 1)
words = []
for i, line in enumerate(all_text):
words.append({
"text": line,
"left": region.x,
"top": region.y + i * line_h,
"width": region.width,
"height": line_h,
"conf": avg_conf,
"region_type": region.type,
})
return words
except Exception as e:
logger.error(f"ocr_region_trocr failed: {e}")
return []
def ocr_region_lighton(img_bgr: np.ndarray, region: PageRegion) -> List[Dict[str, Any]]:
"""Run LightOnOCR-2-1B on a region. Returns line-level word dicts (same format as ocr_region_rapid).
Falls back to RapidOCR or Tesseract if LightOnOCR is not available.
"""
from services.lighton_ocr_service import get_lighton_model, _check_lighton_available
if not _check_lighton_available():
logger.warning("LightOnOCR not available, falling back to RapidOCR/Tesseract")
if RAPIDOCR_AVAILABLE and img_bgr is not None:
return ocr_region_rapid(img_bgr, region)
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6) if ocr_img_crop is not None else []
crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width]
if crop.size == 0:
return []
try:
import io
import torch
from PIL import Image as _PILImage
processor, model = get_lighton_model()
if processor is None or model is None:
logger.warning("LightOnOCR model not loaded, falling back to RapidOCR/Tesseract")
if RAPIDOCR_AVAILABLE and img_bgr is not None:
return ocr_region_rapid(img_bgr, region)
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
conversation = [{"role": "user", "content": [{"type": "image"}]}]
inputs = processor.apply_chat_template(
conversation, images=[pil_crop],
add_generation_prompt=True, return_tensors="pt"
).to(model.device)
with torch.no_grad():
output_ids = model.generate(**inputs, max_new_tokens=1024)
text = processor.decode(output_ids[0], skip_special_tokens=True).strip()
if not text:
return []
lines = [l.strip() for l in text.split("\n") if l.strip()]
line_h = region.height // max(len(lines), 1)
words = []
for i, line in enumerate(lines):
words.append({
"text": line,
"left": region.x,
"top": region.y + i * line_h,
"width": region.width,
"height": line_h,
"conf": 85,
"region_type": region.type,
})
return words
except Exception as e:
logger.error(f"ocr_region_lighton failed: {e}")
return []
# --- Remote PaddleOCR (Hetzner x86_64) ---
async def ocr_region_paddle(
img_bgr: np.ndarray,
region: Optional["PageRegion"] = None,
) -> List[Dict[str, Any]]:
"""Run OCR via local RapidOCR (default) or remote PaddleOCR (fallback).
Uses RapidOCR (same PP-OCRv5 ONNX models) locally for speed and reliability.
Falls back to remote PaddleOCR service only if:
- env FORCE_REMOTE_PADDLE=1 is set, or
- RapidOCR fails or returns no words
"""
force_remote = os.environ.get("FORCE_REMOTE_PADDLE", "").strip() == "1"
if not force_remote:
try:
if region is None:
h, w = img_bgr.shape[:2]
_region = PageRegion(type="full_page", x=0, y=0, width=w, height=h)
else:
_region = region
words = ocr_region_rapid(img_bgr, _region)
if words:
logger.info("ocr_region_paddle: used local RapidOCR (%d words)", len(words))
return words
logger.warning("ocr_region_paddle: RapidOCR returned 0 words, trying remote")
except Exception as e:
logger.warning("ocr_region_paddle: RapidOCR failed (%s), trying remote", e)
# --- Remote PaddleOCR fallback (Hetzner x86_64) ---
from services.paddleocr_remote import ocr_remote_paddle
if region is not None:
crop = img_bgr[
region.y : region.y + region.height,
region.x : region.x + region.width,
]
offset_x, offset_y = region.x, region.y
else:
crop = img_bgr
offset_x, offset_y = 0, 0
if crop.size == 0:
return []
# Downscale large images to fit within Traefik's 60s timeout.
# PaddleOCR works well at ~1500px max dimension.
h, w = crop.shape[:2]
scale = 1.0
_MAX_DIM = 1500
if max(h, w) > _MAX_DIM:
scale = _MAX_DIM / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
crop = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_AREA)
logger.info("ocr_region_paddle: downscaled %dx%d%dx%d (scale=%.2f)",
w, h, new_w, new_h, scale)
# Encode as JPEG (smaller than PNG, faster upload)
success, jpg_buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, 90])
if not success:
logger.error("ocr_region_paddle: cv2.imencode failed")
return []
words, _w, _h = await ocr_remote_paddle(jpg_buf.tobytes(), filename="scan.jpg")
logger.info("ocr_region_paddle: used remote PaddleOCR (%d words)", len(words))
# Scale coordinates back to original size and shift to absolute image space
inv_scale = 1.0 / scale if scale != 1.0 else 1.0
for wd in words:
wd["left"] = int(wd["left"] * inv_scale) + offset_x
wd["top"] = int(wd["top"] * inv_scale) + offset_y
wd["width"] = int(wd["width"] * inv_scale)
wd["height"] = int(wd["height"] * inv_scale)
if region is not None:
wd["region_type"] = region.type
return words
# =============================================================================
# Post-Processing: Deterministic Quality Fixes
# =============================================================================
# --- A. Character Confusion Fix (I/1/l) ---
# Common OCR confusion pairs in vocabulary context
_CHAR_CONFUSION_RULES = [
# "1" at word start followed by lowercase → likely "I" or "l"
# Exception: NOT before "." or "," (numbered list prefix: "1. Kreuz", "1, 2, 3")
(re.compile(r'\b1([a-z])'), r'I\1'), # 1ch → Ich, 1want → Iwant
# Standalone "1" → "I" (English pronoun), but NOT "1." or "1," (list number)
(re.compile(r'(?<!\d)\b1\b(?![\d.,])'), 'I'), # "1 want" → "I want"
# "|" → "I", but NOT when embedded between letters (syllable divider: Ka|me|rad)
# and NOT "|." or "|," (those are "1." list prefixes → spell-checker handles them)
(re.compile(r'(?<![a-zA-ZäöüÄÖÜß])\|(?!\||[.,])'), 'I'), # |ch → Ich, | want → I want
]
# Cross-language indicators: if DE has these, EN "1" is almost certainly "I"
_DE_INDICATORS_FOR_EN_I = {'ich', 'mich', 'mir', 'mein', 'meine', 'meiner', 'meinem'}
def _fix_character_confusion(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Fix common OCR character confusions using context.
Deterministic rules:
- "1" at word start → "I" or "l" based on context
- Cross-reference EN↔DE: if DE contains "ich/mich/mir", EN "1""I"
- "y " artifact at word boundaries → remove (e.g. "y you""you")
"""
for entry in entries:
en = entry.get('english', '') or ''
de = entry.get('german', '') or ''
ex = entry.get('example', '') or ''
# Apply general rules to all fields
for pattern, replacement in _CHAR_CONFUSION_RULES:
en = pattern.sub(replacement, en)
de = pattern.sub(replacement, de)
ex = pattern.sub(replacement, ex)
# Cross-reference: if DE has "ich"/"mich" indicators, fix EN "1" → "I"
de_lower_words = set(de.lower().replace(',', ' ').split())
if de_lower_words & _DE_INDICATORS_FOR_EN_I:
# Any remaining "1" in EN that looks like "I"
en = re.sub(r'\b1\b(?![\d.,])', 'I', en)
# Fix "y " artifact before repeated word: "y you" → "you"
en = re.sub(r'\by\s+([a-z])', r'\1', en)
ex = re.sub(r'\by\s+([a-z])', r'\1', ex)
entry['english'] = en.strip()
entry['german'] = de.strip()
entry['example'] = ex.strip()
return entries
# --- B. Comma-Separated Word Form Splitting ---
def _is_singular_plural_pair(parts: List[str]) -> bool:
"""Detect if comma-separated parts are singular/plural forms of the same word.
E.g. "mouse, mice" or "Maus, Mäuse" → True (should NOT be split).
"break, broke, broken" → False (different verb forms, OK to split).
Heuristic: exactly 2 parts that share a common prefix of >= 50% length,
OR one part is a known plural suffix of the other (e.g. +s, +es, +en).
"""
if len(parts) != 2:
return False
a, b = parts[0].lower().strip(), parts[1].lower().strip()
if not a or not b:
return False
# Common prefix heuristic: if words share >= 50% of the shorter word,
# they are likely forms of the same word (Maus/Mäuse, child/children).
min_len = min(len(a), len(b))
common = 0
for ca, cb in zip(a, b):
if ca == cb:
common += 1
else:
break
if common >= max(2, min_len * 0.5):
return True
# Umlaut relation: one form adds umlaut (a→ä, o→ö, u→ü)
umlaut_map = str.maketrans('aou', 'äöü')
if a.translate(umlaut_map) == b or b.translate(umlaut_map) == a:
return True
return False
def _split_comma_entries(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Split entries with comma-separated word forms into individual entries.
E.g. EN: "break, broke, broken" / DE: "brechen, brach, gebrochen"
→ 3 entries: break/brechen, broke/brach, broken/gebrochen
Does NOT split singular/plural pairs like "mouse, mice" / "Maus, Mäuse"
because those are forms of the same vocabulary entry.
Only splits when both EN and DE have the same number of comma-parts,
parts are short (word forms, not sentences), and at least 3 parts
(to avoid splitting pairs that likely belong together).
"""
result: List[Dict[str, Any]] = []
for entry in entries:
en = (entry.get('english', '') or '').strip()
de = (entry.get('german', '') or '').strip()
# Split by comma (but not inside brackets or parentheses)
en_parts = _split_by_comma(en)
de_parts = _split_by_comma(de)
# Only split if we have multiple parts and counts match
should_split = False
if len(en_parts) > 1 and len(de_parts) > 1 and len(en_parts) == len(de_parts):
# All parts must be short (word forms, not sentences)
if all(len(p.split()) <= 3 for p in en_parts) and all(len(p.split()) <= 3 for p in de_parts):
# Do NOT split singular/plural pairs (2 parts that are
# forms of the same word)
if _is_singular_plural_pair(en_parts) or _is_singular_plural_pair(de_parts):
should_split = False
else:
should_split = True
if not should_split:
result.append(entry)
continue
# Split into individual entries
for k in range(len(en_parts)):
sub = dict(entry) # shallow copy
sub['english'] = en_parts[k].strip()
sub['german'] = de_parts[k].strip() if k < len(de_parts) else ''
sub['example'] = '' # examples get attached later
sub['split_from_comma'] = True
result.append(sub)
# Re-number
for i, e in enumerate(result):
e['row_index'] = i
return result
def _split_by_comma(text: str) -> List[str]:
"""Split text by commas, but not inside brackets [...] or parens (...)."""
if ',' not in text:
return [text]
parts = []
depth_bracket = 0
depth_paren = 0
current = []
for ch in text:
if ch == '[':
depth_bracket += 1
elif ch == ']':
depth_bracket = max(0, depth_bracket - 1)
elif ch == '(':
depth_paren += 1
elif ch == ')':
depth_paren = max(0, depth_paren - 1)
elif ch == ',' and depth_bracket == 0 and depth_paren == 0:
parts.append(''.join(current).strip())
current = []
continue
current.append(ch)
if current:
parts.append(''.join(current).strip())
# Filter empty parts
return [p for p in parts if p]
# --- C. Example Sentence Attachment ---
def _find_best_vocab_match(example_text: str, vocab_entries: List[Dict[str, Any]]) -> int:
"""Find the vocab entry whose English word(s) best match the example sentence.
Returns index into vocab_entries, or -1 if no match found.
Uses word stem overlap: "a broken arm" matches "broken" or "break".
"""
if not vocab_entries or not example_text:
return -1
example_lower = example_text.lower()
example_words = set(re.findall(r'[a-zäöüß]+', example_lower))
best_idx = -1
best_score = 0
for i, entry in enumerate(vocab_entries):
en = (entry.get('english', '') or '').lower()
if not en:
continue
# Extract vocab words (split on space, comma, newline)
vocab_words = set(re.findall(r'[a-zäöüß]+', en))
# Score: how many vocab words appear in the example?
# Also check if example words share a common stem (first 4 chars)
direct_matches = vocab_words & example_words
score = len(direct_matches) * 10
# Stem matching: "broken" matches "break" via shared prefix "bro"/"bre"
if score == 0:
for vw in vocab_words:
if len(vw) < 3:
continue
stem = vw[:4] if len(vw) >= 4 else vw[:3]
for ew in example_words:
if len(ew) >= len(stem) and ew[:len(stem)] == stem:
score += 5
break
if score > best_score:
best_score = score
best_idx = i
return best_idx if best_score > 0 else -1
def _attach_example_sentences(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Attach rows with EN text but no DE translation as examples to matching vocab entries.
Vocabulary worksheets often have:
Row 1: break, broke, broken / brechen, brach, gebrochen
Row 2: a broken arm (no DE → example for "broken")
Row 3: a broken plate (no DE → example for "broken")
Row 4: egg / Ei (has DE → new vocab entry)
Rules (deterministic, generic):
- A row is an "example row" if it has EN text but NO DE text (or very short DE ≤2 chars)
- Find the best matching vocab entry by checking which entry's English words
appear in the example sentence (semantic matching via word overlap)
- Fall back to the nearest preceding entry if no word match found
- Multiple examples get joined with " | "
"""
if not entries:
return entries
# Separate into vocab entries (have DE) and example candidates (no DE)
vocab_entries: List[Dict[str, Any]] = []
examples_for: Dict[int, List[str]] = {} # vocab_index → list of example texts
for entry in entries:
en = (entry.get('english', '') or '').strip()
de = (entry.get('german', '') or '').strip()
ex = (entry.get('example', '') or '').strip()
# Treat single-char DE as OCR noise, not real translation.
# "Ei" (2 chars) is a valid German word, so threshold is 1.
has_de = len(de) > 1
has_en = bool(en)
# Heuristic: a row without DE is an "example sentence" only if
# the EN text looks like a sentence (>= 4 words, or contains
# typical sentence punctuation). Short EN text (1-3 words) is
# more likely a vocab entry whose DE was missed by OCR.
_looks_like_sentence = (
len(en.split()) >= 4
or en.rstrip().endswith(('.', '!', '?'))
)
is_example_candidate = (
has_en and not has_de and _looks_like_sentence and vocab_entries
)
if is_example_candidate:
# This is an example sentence — find best matching vocab entry
example_text = en
match_idx = _find_best_vocab_match(en, vocab_entries)
if match_idx < 0:
# No word match → fall back to last entry
match_idx = len(vocab_entries) - 1
if match_idx not in examples_for:
examples_for[match_idx] = []
examples_for[match_idx].append(example_text)
else:
vocab_entries.append(entry)
# Attach examples to their matched vocab entries
for idx, example_list in examples_for.items():
if 0 <= idx < len(vocab_entries):
entry = vocab_entries[idx]
existing_ex = (entry.get('example', '') or '').strip()
new_examples = ' | '.join(example_list)
entry['example'] = f"{existing_ex} | {new_examples}" if existing_ex else new_examples
# Re-number
for i, e in enumerate(vocab_entries):
e['row_index'] = i
return vocab_entries
# --- D. Phonetic Bracket IPA Replacement ---
# Pattern: word followed by any bracket type containing phonetic content.
# Tesseract often garbles IPA brackets: [ˈdɑːns] → {'tfatno] or (cy) etc.
# Match any opener ([, {, () with any closer (], }, )) — even mixed pairs.
# This intentionally matches mixed brackets (e.g. {content]) because
# Tesseract frequently misrecognizes bracket characters.
_PHONETIC_BRACKET_RE = re.compile(
r'(\b[a-zA-ZäöüÄÖÜß]+)\s*[\[\{\(]([^\]\}\)]*?)[\]\}\)]'
)
# Unicode IPA characters — used to distinguish correct IPA (from dictionary
# lookup) from garbled OCR content when stripping orphan brackets.
_IPA_CHARS = frozenset('ˈˌːɑɒæɛəɜɪɔʊʌðŋθʃʒɹɡɾʔɐ')
# Minimum word confidence for full-page Tesseract results (0-100).
# Words below this threshold are OCR noise (scanner shadows, borders).
_MIN_WORD_CONF = 30
def _lookup_ipa(word: str, pronunciation: str = 'british') -> Optional[str]:
"""Look up IPA for a word using the selected pronunciation dictionary.
Args:
word: English word to look up.
pronunciation: 'british' (Britfone, MIT) or 'american' (eng_to_ipa, MIT).
Returns:
IPA string or None if not found.
"""
word_lower = word.lower().strip()
if not word_lower:
return None
if pronunciation == 'british' and _britfone_dict:
ipa = _britfone_dict.get(word_lower)
if ipa:
return ipa
# Fallback to American if not in Britfone
if _ipa_convert_american:
result = _ipa_convert_american(word_lower)
if result and '*' not in result:
return result
return None
if pronunciation == 'american' and _ipa_convert_american:
result = _ipa_convert_american(word_lower)
if result and '*' not in result:
return result
# Fallback to Britfone if not in CMU
if _britfone_dict:
ipa = _britfone_dict.get(word_lower)
if ipa:
return ipa
return None
# Try any available source
if _britfone_dict:
ipa = _britfone_dict.get(word_lower)
if ipa:
return ipa
if _ipa_convert_american:
result = _ipa_convert_american(word_lower)
if result and '*' not in result:
return result
return None
def _fix_phonetic_brackets(
entries: List[Dict[str, Any]],
pronunciation: str = 'british',
) -> List[Dict[str, Any]]:
"""Replace OCR'd phonetic transcriptions with dictionary IPA.
Detects patterns like "dance [du:ns]" and replaces with correct IPA:
- British: "dance [dˈɑːns]" (Britfone, MIT)
- American: "dance [dæns]" (eng_to_ipa/CMU, MIT)
Only replaces if the word before brackets is found in the dictionary.
"""
if not IPA_AVAILABLE:
return entries
# IPA phonetics only appear in the ENGLISH field of vocab tables.
# German and example fields contain meaningful parenthetical content:
# german: "Eis (gefrorenes Wasser)", "(Salat-)Gurke", "sauer (auf)"
# example: "(sich beschweren)", "(brauchen)", "(jammern)"
# These must NEVER be processed as phonetic transcriptions.
replaced_count = 0
for entry in entries:
text = entry.get('english', '') or ''
if not any(ch in text for ch in '[{('):
continue
new_text = _replace_phonetics_in_text(text, pronunciation)
if new_text != text:
logger.debug(f"_fix_phonetic_brackets: '{text}''{new_text}'")
replaced_count += 1
entry['english'] = new_text
if replaced_count:
logger.info(f"_fix_phonetic_brackets: {replaced_count} IPA replacements in {len(entries)} entries")
return entries
# Grammar particles that appear in brackets after English words:
# cross (with), complain (about/of), agree (on/with), look (sth) up
# These must NOT be replaced with IPA. Only used for the English field
# (German/example fields are never processed for IPA replacement).
_GRAMMAR_BRACKET_WORDS = frozenset({
# English prepositions/particles commonly in vocab tables
'with', 'about', 'of', 'for', 'to', 'from', 'in', 'on', 'at', 'by',
'up', 'out', 'off', 'into', 'over', 'down', 'away', 'back', 'through',
# English grammar abbreviations used in vocab tables
'sth', 'sb', 'adj', 'adv',
# Number/plural/grammar annotations
'pl', 'sg', 'sing', 'no', 'also', 'auch',
# Regional English markers
'ae', 'be', 'ame', 'bre',
})
def _is_grammar_bracket_content(content: str) -> bool:
"""Return True if bracket content is grammar info in the ENGLISH field.
Grammar info: cross (with), complain (about/of), agree (on/with)
NOT grammar: [breik], [maus], {'tfatno], (cy), ['kju:kambo], [test]
Since we only process the English field, we only need to recognize
English grammar particles. Everything else is (garbled) IPA.
"""
if not content:
return False
# Split on / and spaces for patterns like (about/of), (no pl)
tokens = re.split(r'[/\s]+', content.strip().lower())
tokens = [t for t in tokens if t]
if not tokens:
return False
# ALL tokens must be known grammar words
return all(token in _GRAMMAR_BRACKET_WORDS for token in tokens)
def _replace_phonetics_in_text(
text: str,
pronunciation: str = 'british',
strip_orphans: bool = True,
) -> str:
"""Replace [phonetic] / {phonetic} / (phonetic) after words with dictionary IPA.
Tesseract garbles IPA brackets, e.g. China [ˈtʃaɪnə] → China {'tfatno].
We match any bracket type and replace with dictionary IPA if found.
Legitimate parenthetical content like (zer)brechen or (veranstaltung) is preserved.
Args:
strip_orphans: If True, strip orphan brackets that look like garbled IPA.
Set to False for column_text where brackets may be German content.
"""
if not IPA_AVAILABLE:
return text
def replacer(match):
word = match.group(1)
bracket_content = match.group(2).strip()
full_match = match.group(0)
# Skip if bracket content looks like regular text (multiple words)
if len(bracket_content.split()) > 3:
return full_match
# Look up IPA for the word before brackets
ipa = _lookup_ipa(word, pronunciation)
if ipa:
# Word has IPA → bracket content is phonetic (garbled or correct).
# Exception: grammar particles like cross (with) — keep those.
if _is_grammar_bracket_content(bracket_content):
return full_match
logger.debug(f"phonetic: '{full_match}''{word} [{ipa}]'")
return f"{word} [{ipa}]"
# No IPA for this word — keep as-is
return full_match
text = _PHONETIC_BRACKET_RE.sub(replacer, text)
if strip_orphans:
# Second pass: strip remaining orphan brackets that are garbled IPA.
# These have no word before them (the main regex requires \b word \s* bracket).
# Examples: "[mais]", "{'mani setva]", trailing "(kros]"
# Keep: grammar parens "(sich beschweren)", correct IPA "[dˈɑːns]"
def _strip_orphan_bracket(m):
content = m.group(1).strip()
# Keep grammar info: (sich beschweren), (about/of)
if _is_grammar_bracket_content(content):
return m.group(0)
# Keep correct IPA (contains Unicode IPA characters)
if any(ch in _IPA_CHARS for ch in content):
return m.group(0)
# Keep real-word parentheticals like (probieren), (Profit), (Geld).
# Garbled IPA fragments are short nonsense like (kros), (cy), (mais)
# — they never contain a real word ≥4 letters with proper casing.
content_alpha = re.sub(r'[^a-zA-ZäöüÄÖÜßéèêëàâîïôûùç]', '', content)
if len(content_alpha) >= 4:
return m.group(0)
logger.debug(f"phonetic: stripping orphan bracket '{m.group(0)}'")
return ''
text = re.sub(r'[\[\{\(]([^\]\}\)]*)[\]\}\)]', _strip_orphan_bracket, text)
text = text.strip()
return text
def _text_has_garbled_ipa(text: str) -> bool:
"""Check if text contains garbled IPA-like fragments from OCR.
Returns True if there is evidence of OCR-mangled phonetic
transcription, e.g. stress marks, length marks, or IPA special chars.
This is used to decide whether ``_insert_missing_ipa`` should run:
it must only insert IPA to *replace* garbled phonetics that are already
in the text — never to ADD phonetics where none existed on the page.
"""
# Bracketed text that doesn't contain valid IPA symbols is garbled OCR
# of a phonetic transcription, e.g. "[n, nn]" or "[1uedtX,1]".
stripped = text.strip()
if stripped.startswith('[') and stripped.endswith(']'):
inner = stripped[1:-1]
# Real IPA brackets contain IPA symbols (ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ)
if not any(c in inner for c in 'ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ'):
# Not a valid dictionary-style bracket like "(no pl)" — those
# use parentheses, not square brackets. Square brackets with
# no IPA chars are garbled phonetics.
return True
for w in text.strip().split():
# Skip delimiters and very short tokens
if len(w) <= 1 or w in ('', '', '-', '/', '|', ',', ';'):
continue
# Starts with stress mark (OCR read IPA stress ' as apostrophe)
if w.startswith("'") and len(w) > 1 and not w[1:].istitle():
return True
if w.startswith("\u02c8") or w.startswith("\u02cc"): # ˈ ˌ
return True
# Contains IPA length mark ':' in a short non-word fragment
if ':' in w and len(w) < 12:
# But not things like "3:00" (time) or common words
stripped = re.sub(r'[^a-zA-Z:]', '', w)
if ':' in stripped and not stripped.replace(':', '').isalpha():
continue
return True
# Contains IPA special characters
if any(c in w for c in 'əɪɛɒʊʌæɑɔʃʒθðŋ'):
return True
return False
def _decompose_compound(word: str, pronunciation: str = 'british') -> Optional[str]:
"""Try to decompose a compound word and concatenate IPA for each part.
E.g. "schoolbag""school"+"bag" → IPA for both concatenated.
Only returns IPA if ALL parts are found in the dictionary.
Tries splits at every position (min 3 chars per part) and picks the
split where the first part is longest.
"""
if not IPA_AVAILABLE:
return None
lower = word.lower().strip()
if len(lower) < 6:
return None # too short for a compound
best_ipa = None
best_first_len = 0
for split_pos in range(3, len(lower) - 2): # min 3 chars each part
first = lower[:split_pos]
second = lower[split_pos:]
ipa_first = _lookup_ipa(first, pronunciation)
ipa_second = _lookup_ipa(second, pronunciation)
if ipa_first and ipa_second:
if split_pos > best_first_len:
best_first_len = split_pos
best_ipa = ipa_first + ipa_second
return best_ipa
def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
"""Insert IPA pronunciation for English words that have no brackets at all.
OCR sometimes garbles the phonetic transcription into plain-text fragments
(e.g. "scare skea" where "skea" is garbled /skɛə/). This scans the text
for the headword, inserts correct [IPA], and strips the garbled fragments.
Only inserts for words that:
- are standalone (not already followed by a bracket)
- have an IPA entry in the dictionary
- appear to be English headwords (at the start of text or after common
separators like ",", ";", "")
This is intentionally conservative: it only inserts at the END of each
whitespace-separated token group to avoid breaking phrases.
"""
if not IPA_AVAILABLE:
return text
if not text or not text.strip():
return text
# Skip if already has brackets (IPA replacement handles those)
if any(ch in text for ch in '[{('):
return text
# Only process short text fragments (typical vocab cells).
# Long sentences / paragraphs should not get IPA insertions.
words = text.strip().split()
if len(words) > 6:
return text
# Try to insert IPA for the first alphanumeric word
# Typical patterns: "challenge", "profit", "film", "badge"
for i, w in enumerate(words):
# Clean punctuation for lookup
clean = re.sub(r'[^a-zA-ZäöüÄÖÜß\'-]', '', w)
if not clean or len(clean) < 2:
continue
# Skip German/grammar words
if clean.lower() in _GRAMMAR_BRACKET_WORDS:
continue
ipa = _lookup_ipa(clean, pronunciation)
# Fallback: try without hyphens (e.g. "second-hand" → "secondhand")
if not ipa and '-' in clean:
ipa = _lookup_ipa(clean.replace('-', ''), pronunciation)
# Fallback 0b: compound word decomposition
# E.g. "schoolbag" → "school"+"bag" → concatenated IPA
if not ipa:
ipa = _decompose_compound(clean, pronunciation)
# Fallback 1: IPA-marker split for merged tokens where OCR
# joined headword with its IPA (e.g. "schoolbagsku:lbæg").
# Find the first IPA marker character (:, æ, ɪ, etc.), walk
# backwards ≤3 chars for the onset consonant cluster, and
# split into headword + OCR IPA.
_IPA_SPLIT_CHARS = set(':əɪɛɒʊʌæɑɔʃʒθðŋˈˌ')
if not ipa:
first_marker = next(
(p for p, ch in enumerate(w) if ch in _IPA_SPLIT_CHARS), -1,
)
if first_marker >= 3:
split = first_marker
while (split > 0
and split > first_marker - 3
and w[split - 1].isalpha()
and w[split - 1].islower()):
split -= 1
if split >= 2:
headword = w[:split]
ocr_ipa = w[split:]
hw_ipa = _lookup_ipa(headword, pronunciation)
if not hw_ipa:
# Try compound decomposition for the headword part
hw_ipa = _decompose_compound(headword, pronunciation)
if hw_ipa:
words[i] = f"{headword} [{hw_ipa}]"
else:
# Word not in dictionary — use OCR IPA
words[i] = f"{headword} [{ocr_ipa}]"
words = words[:i + 1]
ipa = True # signal that we handled it
break
# Fallback 2: prefix matching for merged tokens WITHOUT IPA
# markers (e.g. "Scotland'skotland"). Find longest dictionary
# prefix using only alpha chars to avoid punctuation matches.
if not ipa:
alpha = re.sub(r'[^a-zA-Z]', '', clean)
if len(alpha) > 5: # need at least 6 chars for meaningful split
for end in range(len(alpha), 3, -1): # min prefix 4 chars
prefix = alpha[:end]
test_ipa = _lookup_ipa(prefix, pronunciation)
if test_ipa:
ipa = test_ipa
w = prefix
words[i] = prefix
break
if ipa:
words[i] = f"{w} [{ipa}]"
# Strip garbled OCR phonetics after the IPA bracket.
# On scanned vocab pages, printed IPA is read as garbled
# text (e.g. "scare skea" where "skea" is garbled /skɛə/).
# After inserting correct IPA, remove remaining words that
# aren't real English words, delimiters, or German text.
kept = words[:i + 1]
for j in range(i + 1, len(words)):
wj = words[j]
# Delimiter — keep this and everything after
if wj in ('', '', '-', '/', '|', ',', ';'):
kept.extend(words[j:])
break
# Starts with uppercase — likely German or proper noun
clean_j = re.sub(r'[^a-zA-Z]', '', wj)
if clean_j and clean_j[0].isupper():
kept.extend(words[j:])
break
# Known English word (≥2 chars) — keep it and rest
if clean_j and len(clean_j) >= 2:
if _lookup_ipa(clean_j, pronunciation):
kept.extend(words[j:])
break
# Otherwise — likely garbled phonetics, skip
words = kept
break
return ' '.join(words)
def _has_non_dict_trailing(text: str, pronunciation: str = 'british') -> bool:
"""Check if text has a headword followed by non-dictionary trailing words.
Used as an additional trigger for ``_insert_missing_ipa`` when
``_text_has_garbled_ipa`` returns False because the garbled IPA
happens to look like plain ASCII (e.g. "skea" for /skɛə/).
"""
if not IPA_AVAILABLE:
return False
words = text.strip().split()
if len(words) < 2 or len(words) > 6:
return False
# Find first dictionary word
hw_idx = -1
for i, w in enumerate(words):
clean = re.sub(r'[^a-zA-Z\'-]', '', w)
if not clean or len(clean) < 2:
continue
if clean.lower() in _GRAMMAR_BRACKET_WORDS:
continue
if _lookup_ipa(clean, pronunciation):
hw_idx = i
break
if hw_idx < 0 or hw_idx >= len(words) - 1:
return False
# Check ALL remaining words — if none are dictionary/delimiter/German,
# they are likely garbled IPA.
for j in range(hw_idx + 1, len(words)):
wj = words[j]
if wj in ('', '', '-', '/', '|', ',', ';'):
return False
clean_j = re.sub(r'[^a-zA-Z]', '', wj)
if clean_j and clean_j[0].isupper():
return False
if clean_j and len(clean_j) >= 2 and _lookup_ipa(clean_j, pronunciation):
return False
return True
def _strip_post_bracket_garbled(
text: str, pronunciation: str = 'british',
) -> str:
"""Strip garbled IPA fragments that trail after proper [IPA] brackets.
E.g. ``sea [sˈiː] si:`` → ``sea [sˈiː]``
``seat [sˈiːt] si:t`` → ``seat [sˈiːt]``
``seat [sˈiːt] belt si:t belt`` → ``seat [sˈiːt] belt``
For multi-word headwords like "seat belt", a real English word ("belt")
may be followed by garbled IPA duplicates. We detect this by checking
whether the sequence after a real word contains IPA markers (`:`, `ə`,
etc.) — if so, everything from the first garbled token onward is stripped.
"""
if ']' not in text:
return text
last_bracket = text.rfind(']')
if last_bracket >= len(text) - 1:
return text
before = text[:last_bracket + 1].rstrip()
after = text[last_bracket + 1:].strip()
if not after:
return text
_IPA_MARKER_CHARS = set(':əɪɛɒʊʌæɑɔʃʒθðŋˈˌ')
after_words = after.split()
kept: List[str] = []
for idx, w in enumerate(after_words):
# Delimiter — keep rest
if w in ('', '', '-', '/', '|', ',', ';'):
kept.extend(after_words[idx:])
break
# Contains IPA markers (length mark, IPA chars) — garbled, skip
if any(c in w for c in _IPA_MARKER_CHARS):
# Everything from here is garbled IPA — stop scanning
# but look ahead: if any remaining words are real English
# words WITHOUT IPA markers, they might be a different headword
# following. Only skip the contiguous garbled run.
continue
clean = re.sub(r'[^a-zA-Z]', '', w)
# Uppercase — likely German, keep rest
if clean and clean[0].isupper():
kept.extend(after_words[idx:])
break
# Known English word — keep it, but check if followed by garbled IPA
# (multi-word headword case like "seat [siːt] belt si:t belt")
if clean and len(clean) >= 2 and _lookup_ipa(clean, pronunciation):
# Peek ahead: if next word has IPA markers, the rest is garbled
remaining = after_words[idx + 1:]
has_garbled_after = any(
any(c in rw for c in _IPA_MARKER_CHARS)
for rw in remaining
)
if has_garbled_after:
# Keep this real word but stop — rest is garbled duplication
kept.append(w)
# Still scan for delimiters/German in the remaining words
for ridx, rw in enumerate(remaining):
if rw in ('', '', '-', '/', '|', ',', ';'):
kept.extend(remaining[ridx:])
break
rclean = re.sub(r'[^a-zA-Z]', '', rw)
if rclean and rclean[0].isupper():
kept.extend(remaining[ridx:])
break
break
else:
kept.extend(after_words[idx:])
break
# Unknown short word — likely garbled, skip
if kept:
return before + ' ' + ' '.join(kept)
return before
def fix_ipa_continuation_cell(
garbled_text: str,
headword_text: str,
pronunciation: str = 'british',
) -> str:
"""Replace garbled IPA in a continuation row with proper IPA.
Continuation rows appear below the headword and contain only the
printed phonetic transcription, which OCR garbles into fragments
like ``ska:f ska:vz`` (should be ``[skˈɑːf] [skˈɑːvz]``).
Args:
garbled_text: The OCR-garbled IPA text from the continuation row.
headword_text: The headword text from the previous row
(e.g. ``scarf scarves``).
pronunciation: ``'british'`` or ``'american'``.
Returns:
Corrected IPA text, or the original if no fix could be applied.
"""
if not IPA_AVAILABLE or not garbled_text or not headword_text:
return garbled_text
# If headword already has inline IPA like "beat [bˈiːt] , beat, beaten",
# only generate continuation IPA for words NOT already covered.
covered_words: set = set()
has_inline_ipa = bool(re.search(r'\[[^\]]*\]', headword_text))
if has_inline_ipa:
# Words before the first bracket already have their IPA shown
first_bracket = headword_text.index('[')
pre_bracket = headword_text[:first_bracket].strip()
for w in pre_bracket.split():
clean = re.sub(r'[^a-zA-Z\'-]', '', w).lower()
if clean and len(clean) >= 2:
covered_words.add(clean)
last_bracket_end = headword_text.rfind(']')
tail = headword_text[last_bracket_end + 1:].strip()
if not tail or not re.search(r'[a-zA-Z]{2,}', tail):
# Bracket is at the end (e.g. "the Highlands [ˈhaɪləndz]")
# — return the inline IPA directly (continuation duplicates it)
last_bracket_start = headword_text.rfind('[')
inline_ipa = headword_text[last_bracket_start:last_bracket_end + 1]
return inline_ipa
# Only the tail words need continuation IPA
headword_text = tail
# Strip existing IPA brackets and parenthetical grammar annotations
# like "(no pl)", "(sth)", "(sb)" from headword text
clean_hw = re.sub(r'\[[^\]]*\]', '', headword_text)
clean_hw = re.sub(r'\([^)]*\)', '', clean_hw).strip()
if not clean_hw:
return garbled_text
# Split headword by delimiters ( — -)
# "scarf scarves" → ["scarf", "scarves"]
# "see - saw - seen" → ["see", "saw", "seen"]
parts = re.split(r'\s*[–—]\s*|\s+-\s+', clean_hw)
parts = [p.strip() for p in parts if p.strip()]
if not parts:
return garbled_text
# Look up IPA for each headword part.
# Skip articles (the, a, an) — they never get IPA in vocab books.
# Other function words like "down", "up" are kept because they are
# integral parts of phrasal verbs (e.g. "close down").
# Skip words that already have inline IPA in the headword row.
_ARTICLES = {'the', 'a', 'an'}
ipa_parts: List[str] = []
for part in parts:
# A part may be multi-word like "secondary school"
words = part.split()
word_ipas: List[str] = []
for w in words:
clean_w = re.sub(r'[^a-zA-Z\'-]', '', w)
if not clean_w or len(clean_w) < 2:
continue
if covered_words and clean_w.lower() in covered_words:
continue # Already has IPA inline in the headword
if clean_w.lower() in _ARTICLES:
continue # Articles never get IPA in vocab books
ipa = _lookup_ipa(clean_w, pronunciation)
if ipa:
word_ipas.append(ipa)
if word_ipas:
ipa_parts.append('[' + ' '.join(word_ipas) + ']')
if not ipa_parts:
return garbled_text
# Join with delimiter
result = ' '.join(ipa_parts)
logger.debug(
"fix_ipa_continuation: '%s''%s' (headwords: '%s')",
garbled_text, result, headword_text,
)
return result
def _insert_headword_ipa(text: str, pronunciation: str = 'british') -> str:
"""Insert IPA for the first English headword in a long mixed-language line.
Unlike _insert_missing_ipa (for short column_en cells), this handles
column_text lines of any length. It only inserts IPA for the FIRST word
if that word:
- has no bracket following it already
- has an IPA entry in the dictionary
- is not a number/symbol prefix like "».55"
Returns the text with [ipa] inserted after the first word, or unchanged.
"""
if not IPA_AVAILABLE:
return text
if not text or not text.strip():
return text
words = text.strip().split()
if not words:
return text
# Check if text already starts with a bracket (IPA already present)
if len(words) > 1 and words[1].startswith(('[', '{', '(')):
return text
# Try the first few words (skip numeric prefixes like "».55", "0.56")
for i in range(min(3, len(words))):
w = words[i]
clean = re.sub(r'[^a-zA-ZäöüÄÖÜß\'-]', '', w)
if not clean or len(clean) < 2:
continue
if clean.lower() in _GRAMMAR_BRACKET_WORDS:
continue
ipa = _lookup_ipa(clean, pronunciation)
if ipa:
words[i] = f"{w} [{ipa}]"
return ' '.join(words)
# Stop at first real word even if no IPA found
break
return text
def fix_cell_phonetics(
cells: List[Dict[str, Any]],
pronunciation: str = 'british',
) -> List[Dict[str, Any]]:
"""Apply IPA phonetic fixes to cell texts for overlay mode.
In the normal pipeline, _fix_phonetic_brackets operates on vocab entries
(entry['english']). But the overlay reads cell['text'] directly, so
phonetic fixes must be applied to cells too.
Processing depends on column type:
- column_en: Full processing (replace garbled IPA + strip orphan brackets
+ insert missing IPA). Safe because these cells contain only English
headwords.
- column_text: Light processing (replace garbled IPA ONLY). No orphan
bracket stripping (brackets may be German content like "(probieren)")
and no IPA insertion (would add tokens and break overlay positioning).
"""
if not IPA_AVAILABLE:
return cells
ipa_col_types = {'column_en', 'column_text'}
replaced = 0
for cell in cells:
col_type = cell.get('col_type', '')
if col_type not in ipa_col_types:
continue
text = cell.get('text', '') or ''
if not text.strip():
continue
if col_type == 'column_en':
# Full processing: replace garbled IPA, strip orphan brackets.
new_text = _replace_phonetics_in_text(text, pronunciation, strip_orphans=True)
if new_text == text:
# Insert IPA when garbled phonetics exist OR when trailing
# non-dictionary words suggest garbled IPA in plain ASCII.
if _text_has_garbled_ipa(text) or _has_non_dict_trailing(text, pronunciation):
new_text = _insert_missing_ipa(text, pronunciation)
# Strip trailing garbled fragments after proper [IPA] brackets
# (e.g. "sea [sˈiː] si:" → "sea [sˈiː]")
if ']' in new_text:
new_text = _strip_post_bracket_garbled(new_text, pronunciation)
else:
# column_text: replace garbled IPA, no orphan stripping
new_text = _replace_phonetics_in_text(text, pronunciation, strip_orphans=False)
# Insert headword IPA ONLY if there's a gap in word_boxes
# suggesting Tesseract missed an IPA bracket on the page.
# Without gap evidence, the original page had no IPA.
if new_text == text:
wb = cell.get('word_boxes', [])
if _has_ipa_gap(text, wb):
inserted = _insert_headword_ipa(text, pronunciation)
if inserted != text:
new_text = inserted
_sync_word_boxes_after_ipa_insert(cell, text, new_text)
if new_text != text:
logger.debug(f"fix_cell_phonetics: '{text}''{new_text}'")
cell['text'] = new_text
replaced += 1
if replaced:
logger.info(f"fix_cell_phonetics: {replaced} IPA fixes in {len(cells)} cells")
return cells
def _has_ipa_gap(text: str, word_boxes: List[Dict]) -> bool:
"""Check if word_boxes show a gap where IPA brackets should be.
On a typical vocab page, the layout is:
headword [ipa] German translation
If Tesseract missed the IPA bracket, the gap between the headword
and the next word (German translation) is unusually large (>80px)
because the IPA occupied physical space on the page.
If no IPA was on the page (e.g. "be good at sth."), the words are
close together (<30px).
"""
if not word_boxes or len(word_boxes) < 2:
return False
tokens = text.split()
if not tokens:
return False
# Find the headword index: skip numeric prefixes like "».55", "0.56"
hw_box_idx = 0
for i, wb in enumerate(word_boxes):
wt = wb.get('text', '')
clean = re.sub(r'[^a-zA-ZäöüÄÖÜß]', '', wt)
if len(clean) >= 2:
hw_box_idx = i
break
if hw_box_idx >= len(word_boxes) - 1:
return False
# Check gap between headword and the next word_box
hw = word_boxes[hw_box_idx]
next_wb = word_boxes[hw_box_idx + 1]
gap = next_wb['left'] - (hw['left'] + hw['width'])
return gap > 80
def _sync_word_boxes_after_ipa_insert(
cell: Dict[str, Any],
old_text: str,
new_text: str,
) -> None:
"""Insert a synthetic word_box for an IPA token added by IPA insertion.
E.g. "challenge ...""challenge [tʃælɪndʒ] ..."
Adds a new word_box right after the headword's box so the 1:1
token-to-box mapping in the frontend overlay stays consistent.
"""
word_boxes = cell.get('word_boxes')
if not word_boxes:
return
old_tokens = old_text.split()
new_tokens = new_text.split()
if len(new_tokens) != len(old_tokens) + 1:
return # unexpected change, skip
# Find the inserted token by walking both lists in parallel.
# One token in new_tokens won't match — that's the inserted IPA.
insert_idx = -1
j = 0 # index into old_tokens
for i in range(len(new_tokens)):
if j < len(old_tokens) and new_tokens[i] == old_tokens[j]:
j += 1
else:
insert_idx = i
break
if insert_idx < 0 or insert_idx >= len(new_tokens):
return
ipa_token = new_tokens[insert_idx]
# The headword is at insert_idx - 1 in old_tokens (and word_boxes)
ref_idx = insert_idx - 1
if ref_idx < 0 or ref_idx >= len(word_boxes):
return
ref_box = word_boxes[ref_idx]
ipa_box = {
'text': ipa_token,
'left': ref_box['left'] + ref_box['width'] + 2,
'top': ref_box['top'],
'width': ref_box['width'],
'height': ref_box['height'],
'conf': ref_box.get('conf', 90),
}
word_boxes.insert(insert_idx, ipa_box)
def _assign_row_words_to_columns(
row: RowGeometry,
columns: List[PageRegion],
) -> Dict[int, List[Dict]]:
"""Assign each word in a row to exactly one column.
Uses a two-pass strategy:
1. Containment: if a word's center falls within a column's horizontal
bounds (with padding), assign it to that column.
2. Nearest center: for words not contained by any column, fall back to
nearest column center distance.
This prevents long sentences in wide columns (e.g. example) from having
their rightmost words stolen by an adjacent column.
Args:
row: Row with words (relative coordinates).
columns: Sorted list of columns (absolute coordinates).
Returns:
Dict mapping col_index → list of words assigned to that column.
"""
result: Dict[int, List[Dict]] = {i: [] for i in range(len(columns))}
if not row.words or not columns:
return result
left_x = row.x # content ROI left (absolute)
# Build non-overlapping column assignment ranges using midpoints.
# For adjacent columns, the boundary is the midpoint between them.
# This prevents words near column borders from being assigned to
# the wrong column (e.g. "We" at the start of an example sentence
# being stolen by the preceding DE column).
n = len(columns)
col_ranges_rel = [] # (assign_left, assign_right) per column
for ci, col in enumerate(columns):
col_left_rel = col.x - left_x
col_right_rel = col_left_rel + col.width
# Left boundary: midpoint to previous column, or 0
if ci == 0:
assign_left = 0
else:
prev_right = columns[ci - 1].x - left_x + columns[ci - 1].width
assign_left = (prev_right + col_left_rel) / 2
# Right boundary: midpoint to next column, or infinity (row width)
if ci == n - 1:
assign_right = row.width + 100 # generous for last column
else:
next_left = columns[ci + 1].x - left_x
assign_right = (col_right_rel + next_left) / 2
col_ranges_rel.append((assign_left, assign_right))
for w in row.words:
w_left = w['left']
w_right = w_left + w['width']
w_center_x = w_left + w['width'] / 2
# Primary: overlap-based matching — assign to column with most overlap.
# This is more robust than center-based for narrow columns (page_ref)
# where the last character's center may fall into the next column.
best_col = -1
best_overlap = 0
for ci, col in enumerate(columns):
col_left_rel = col.x - left_x
col_right_rel = col_left_rel + col.width
overlap = max(0, min(w_right, col_right_rel) - max(w_left, col_left_rel))
if overlap > best_overlap:
best_overlap = overlap
best_col = ci
if best_col >= 0 and best_overlap > 0:
result[best_col].append(w)
else:
# Fallback: center-based range matching
assigned = False
for ci, (al, ar) in enumerate(col_ranges_rel):
if al <= w_center_x < ar:
result[ci].append(w)
assigned = True
break
if not assigned:
# Last resort: nearest column center
best_col = 0
col_left_0 = columns[0].x - left_x
best_dist = abs(w_center_x - (col_left_0 + columns[0].width / 2))
for ci in range(1, n):
col_left = columns[ci].x - left_x
dist = abs(w_center_x - (col_left + columns[ci].width / 2))
if dist < best_dist:
best_dist = dist
best_col = ci
result[best_col].append(w)
return result
# Regex: at least 2 consecutive letters (Latin + umlauts + accents)
_RE_REAL_WORD = re.compile(r'[a-zA-ZäöüÄÖÜßéèêëàâîïôûùç]{2,}')
_RE_ALPHA = re.compile(r'[a-zA-ZäöüÄÖÜßéèêëàâîïôûùç]')
# Common short EN/DE words (2-3 chars). Tokens at the end of a cell
# that do NOT appear here are treated as trailing OCR noise.
_COMMON_SHORT_WORDS: set = {
# EN 1-2 letter
'a', 'i', 'am', 'an', 'as', 'at', 'be', 'by', 'do', 'go', 'he',
'if', 'in', 'is', 'it', 'me', 'my', 'no', 'of', 'oh', 'ok', 'on',
'or', 'so', 'to', 'up', 'us', 'we',
# EN 3 letter
'ace', 'act', 'add', 'age', 'ago', 'aid', 'aim', 'air', 'all',
'and', 'ant', 'any', 'ape', 'arc', 'are', 'ark', 'arm', 'art',
'ask', 'ate', 'axe', 'bad', 'bag', 'ban', 'bar', 'bat', 'bay',
'bed', 'bee', 'bet', 'big', 'bin', 'bit', 'bow', 'box', 'boy',
'bud', 'bug', 'bun', 'bus', 'but', 'buy', 'cab', 'can', 'cap',
'car', 'cat', 'cop', 'cow', 'cry', 'cub', 'cup', 'cut', 'dad',
'dam', 'day', 'den', 'dew', 'did', 'die', 'dig', 'dim', 'dip',
'dog', 'dot', 'dry', 'due', 'dug', 'dye', 'ear', 'eat', 'eel',
'egg', 'elm', 'end', 'era', 'eve', 'ewe', 'eye', 'fan', 'far',
'fat', 'fax', 'fed', 'fee', 'few', 'fig', 'fin', 'fir', 'fit',
'fix', 'fly', 'foe', 'fog', 'for', 'fox', 'fry', 'fun', 'fur',
'gag', 'gap', 'gas', 'get', 'god', 'got', 'gum', 'gun', 'gut',
'guy', 'gym', 'had', 'ham', 'has', 'hat', 'hay', 'hen', 'her',
'hid', 'him', 'hip', 'his', 'hit', 'hog', 'hop', 'hot', 'how',
'hue', 'hug', 'hum', 'hut', 'ice', 'icy', 'ill', 'imp', 'ink',
'inn', 'ion', 'its', 'ivy', 'jam', 'jar', 'jaw', 'jay', 'jet',
'jig', 'job', 'jog', 'joy', 'jug', 'key', 'kid', 'kin', 'kit',
'lab', 'lad', 'lag', 'lap', 'law', 'lay', 'led', 'leg', 'let',
'lid', 'lie', 'lip', 'lit', 'log', 'lot', 'low', 'mad', 'man',
'map', 'mat', 'maw', 'may', 'men', 'met', 'mid', 'mix', 'mob',
'mog', 'mom', 'mop', 'mow', 'mrs', 'mud', 'mug', 'mum', 'nag',
'nap', 'net', 'new', 'nod', 'nor', 'not', 'now', 'nun', 'nut',
'oak', 'oar', 'oat', 'odd', 'off', 'oft', 'oil', 'old', 'one',
'opt', 'orb', 'ore', 'our', 'out', 'owe', 'owl', 'own', 'pad',
'pal', 'pan', 'pat', 'paw', 'pay', 'pea', 'peg', 'pen', 'per',
'pet', 'pie', 'pig', 'pin', 'pit', 'ply', 'pod', 'pop', 'pot',
'pro', 'pry', 'pub', 'pug', 'pun', 'pup', 'put', 'rag', 'ram',
'ran', 'rap', 'rat', 'raw', 'ray', 'red', 'ref', 'rib', 'rid',
'rig', 'rim', 'rip', 'rob', 'rod', 'roe', 'rot', 'row', 'rub',
'rug', 'rum', 'run', 'rut', 'rye', 'sac', 'sad', 'sag', 'sap',
'sat', 'saw', 'say', 'sea', 'set', 'sew', 'she', 'shy', 'sin',
'sip', 'sir', 'sis', 'sit', 'six', 'ski', 'sky', 'sly', 'sob',
'sod', 'son', 'sop', 'sot', 'sow', 'soy', 'spa', 'spy', 'sty',
'sub', 'sue', 'sum', 'sun', 'sup', 'tab', 'tad', 'tag', 'tan',
'tap', 'tar', 'tax', 'tea', 'ten', 'the', 'tie', 'tin', 'tip',
'toe', 'ton', 'too', 'top', 'tow', 'toy', 'try', 'tub', 'tug',
'two', 'urn', 'use', 'van', 'vat', 'vet', 'via', 'vie', 'vim',
'vow', 'wag', 'war', 'was', 'wax', 'way', 'web', 'wed', 'wet',
'who', 'why', 'wig', 'win', 'wit', 'woe', 'wok', 'won', 'woo',
'wow', 'yam', 'yap', 'yaw', 'yea', 'yes', 'yet', 'yew', 'you',
'zap', 'zip', 'zoo',
# DE 2-3 letter
'ab', 'da', 'du', 'ei', 'er', 'es', 'ja', 'ob', 'um', 'zu',
'als', 'alt', 'auf', 'aus', 'bei', 'bin', 'bis', 'das', 'dem',
'den', 'der', 'des', 'die', 'dir', 'ehe', 'ein', 'eng', 'gar',
'gib', 'gut', 'hat', 'her', 'ich', 'ihm', 'ihr', 'ins', 'ist',
'mal', 'man', 'mir', 'mit', 'nah', 'neu', 'nie', 'nur', 'nun',
'ort', 'rad', 'rat', 'rot', 'ruf', 'ruh', 'sei', 'sie', 'tag',
'tal', 'tat', 'tee', 'tor', 'tun', 'tut', 'uns', 'vom', 'von',
'vor', 'war', 'was', 'weg', 'wem', 'wen', 'wer', 'wie', 'wir',
'wut', 'zum', 'zur',
}
# Known abbreviations found in EN/DE textbooks and dictionaries.
# Stored WITHOUT trailing period (the noise filter strips periods).
# These rescue tokens like "sth." / "sb." / "usw." from being deleted.
_KNOWN_ABBREVIATIONS: set = {
# EN dictionary meta-words
'sth', 'sb', 'smth', 'smb', 'sbd',
# EN general
'etc', 'eg', 'ie', 'esp', 'approx', 'dept', 'govt', 'corp',
'inc', 'ltd', 'vs', 'cf', 'ibid', 'nb', 'ps', 'asap',
# EN references / textbook
'p', 'pp', 'ch', 'chap', 'fig', 'figs', 'no', 'nos', 'nr',
'vol', 'vols', 'ed', 'eds', 'rev', 'repr', 'trans', 'ff',
'fn', 'sec', 'par', 'para', 'app', 'abbr', 'ex', 'exs',
'ans', 'wb', 'tb', 'vocab',
# EN parts of speech / grammar
'adj', 'adv', 'prep', 'conj', 'pron', 'det', 'art', 'interj',
'aux', 'mod', 'inf', 'pt', 'pres', 'pret', 'ger',
'sg', 'pl', 'sing', 'irreg', 'reg', 'intr', 'intrans',
'refl', 'pass', 'imper', 'subj', 'ind', 'perf', 'fut',
'attr', 'pred', 'comp', 'superl', 'pos', 'neg',
'lit', 'colloq', 'sl', 'dial', 'arch', 'obs', 'fml', 'infml',
'syn', 'ant', 'opp', 'var', 'orig',
# EN titles
'mr', 'mrs', 'ms', 'dr', 'prof', 'st', 'jr', 'sr',
# EN pronunciation
'br', 'am', 'brit', 'amer',
# EN units
'hr', 'hrs', 'min', 'km', 'cm', 'mm', 'kg', 'mg', 'ml',
# DE general
'usw', 'bzw', 'evtl', 'ggf', 'ggfs', 'sog', 'eigtl', 'allg',
'bes', 'insb', 'insbes', 'bspw', 'ca',
'od', 'ua', 'sa', 'vgl', 'zb', 'dh', 'zt', 'idr',
'inkl', 'exkl', 'zzgl', 'abzgl',
# DE references
'abs', 'abschn', 'abt', 'anm', 'ausg', 'aufl', 'bd', 'bde',
'bearb', 'ebd', 'hrsg', 'hg', 'jg', 'jh', 'jhd', 'kap',
's', 'sp', 'zit', 'zs', 'vlg',
# DE grammar
'nom', 'akk', 'dat', 'gen', 'konj', 'subst', 'obj',
'praet', 'imp', 'part', 'mask', 'fem', 'neutr',
'trennb', 'untrennb', 'ugs', 'geh', 'pej',
# DE regional
'nordd', 'österr', 'schweiz',
# Linguistic
'lex', 'morph', 'phon', 'phonet', 'sem', 'synt', 'etym',
'deriv', 'pref', 'suf', 'suff', 'dim', 'coll',
'count', 'uncount', 'indef', 'def', 'poss', 'demon',
}
def _is_noise_tail_token(token: str) -> bool:
"""Check if a token at the END of cell text is trailing OCR noise.
Trailing fragments are very common OCR artifacts from image edges,
borders, and neighbouring cells. This is more aggressive than a
general word filter: any short token that isn't in the dictionary
of common EN/DE words is considered noise.
Examples of noise: "Es)", "3", "ee", "B"
Examples to keep: "sister.", "cupcakes.", "...", "mice", "[eg]"
"""
t = token.strip()
if not t:
return True
# Keep ellipsis
if t in ('...', ''):
return False
# Keep phonetic brackets: [eg], [maus], ["a:mand], serva], etc.
if t.startswith('[') or t.startswith('["') or t.startswith("['"):
return False
if t.endswith(']'):
return False
# Pure non-alpha → noise ("3", ")", "|")
alpha_chars = _RE_ALPHA.findall(t)
if not alpha_chars:
return True
# Extract only alpha characters for dictionary lookup
cleaned = ''.join(alpha_chars)
# Known abbreviations (e.g. "sth.", "usw.", "adj.") — always keep
if cleaned.lower() in _KNOWN_ABBREVIATIONS:
return False
# Strip normal trailing punctuation before checking for internal noise.
stripped_punct = re.sub(r'[.,;:!?]+$', '', t) # "cupcakes." → "cupcakes"
t_check = stripped_punct if stripped_punct else t
# Check for legitimate punctuation patterns vs. real noise.
# Legitimate: "(auf)", "under-", "e.g.", "(on)", "selbst)", "(wir",
# "(Salat-)Gurke", "Tanz(veranstaltung)", "(zer)brechen"
# Noise: "3d", "B|", "x7"
# Strategy: strip common dictionary punctuation (parens, hyphens, slashes),
# THEN check if residual contains only alpha characters.
t_inner = t_check
# Remove all parentheses, hyphens, slashes, and dots — these are normal
# in dictionary entries: "(Salat-)Gurke", "Tanz(veranstaltung)",
# "(zer)brechen", "wir/uns", "e.g."
t_inner = re.sub(r'[()\-/.,;:!?]', '', t_inner)
# Now check: does the inner form still have non-alpha noise?
inner_alpha = ''.join(_RE_ALPHA.findall(t_inner))
has_internal_noise = (len(t_inner) > len(inner_alpha)) if t_inner else False
# Long alpha words (4+ chars) without internal noise are likely real
if len(cleaned) >= 4 and not has_internal_noise:
return False
# Short words: check dictionary (uses only alpha chars)
if cleaned.lower() in _COMMON_SHORT_WORDS and not has_internal_noise:
return False
# Default: short or suspicious → noise
return True
def _is_garbage_text(text: str) -> bool:
"""Check if entire cell text is OCR garbage from image areas.
Garbage text = no recognizable dictionary word. Catches
"(ci]oeu", "uanoaain." etc.
"""
words = _RE_REAL_WORD.findall(text)
if not words:
# Check if any token is a known abbreviation (e.g. "e.g.")
alpha_only = ''.join(_RE_ALPHA.findall(text)).lower()
if alpha_only in _KNOWN_ABBREVIATIONS:
return False
return True
for w in words:
wl = w.lower()
# Known short word or abbreviation → not garbage
if wl in _COMMON_SHORT_WORDS or wl in _KNOWN_ABBREVIATIONS:
return False
# Long word (>= 4 chars): check vowel/consonant ratio.
# Real EN/DE words have 20-60% vowels. Garbage like "uanoaain"
# or "cioeu" has unusual ratios (too many or too few vowels).
if len(wl) >= 4:
vowels = sum(1 for c in wl if c in 'aeiouäöü')
ratio = vowels / len(wl)
if 0.15 <= ratio <= 0.65:
return False # plausible vowel ratio → real word
return True
def _clean_cell_text(text: str) -> str:
"""Remove OCR noise from cell text. Generic filters:
1. If the entire text has no real alphabetic word (>= 2 letters), clear.
2. If the entire text is garbage (no dictionary word), clear.
3. Strip trailing noise tokens from the end of the text.
"""
stripped = text.strip()
if not stripped:
return ''
# --- Filter 1: No real word at all ---
if not _RE_REAL_WORD.search(stripped):
# Exception: dotted abbreviations like "e.g.", "z.B.", "i.e."
alpha_only = ''.join(_RE_ALPHA.findall(stripped)).lower()
if alpha_only not in _KNOWN_ABBREVIATIONS:
return ''
# --- Filter 2: Entire text is garbage ---
if _is_garbage_text(stripped):
return ''
# --- Filter 3: Strip trailing noise tokens ---
tokens = stripped.split()
while tokens and _is_noise_tail_token(tokens[-1]):
tokens.pop()
if not tokens:
return ''
return ' '.join(tokens)
def _clean_cell_text_lite(text: str) -> str:
"""Simplified noise filter for cell-first OCR (isolated cell crops).
Since each cell is OCR'd in isolation (no neighbour content visible),
trailing-noise stripping is unnecessary. Only 2 filters remain:
1. No real alphabetic word (>= 2 letters) and not a known abbreviation → empty.
2. Entire text is garbage (no dictionary word) → empty.
"""
stripped = text.strip()
if not stripped:
return ''
# --- Filter 1: No real word at all ---
if not _RE_REAL_WORD.search(stripped):
alpha_only = ''.join(_RE_ALPHA.findall(stripped)).lower()
if alpha_only not in _KNOWN_ABBREVIATIONS:
return ''
# --- Filter 2: Entire text is garbage ---
if _is_garbage_text(stripped):
return ''
return stripped
# ---------------------------------------------------------------------------
# Bold detection via stroke-width analysis (relative / page-level)
# ---------------------------------------------------------------------------
def _measure_stroke_width(gray_crop: np.ndarray) -> float:
"""Measure mean stroke width in a binarised cell crop.
Returns a DPI-normalised value (mean stroke width as % of crop height),
or 0.0 if measurement is not possible.
"""
if gray_crop is None or gray_crop.size == 0:
return 0.0
h, w = gray_crop.shape[:2]
if h < 10 or w < 10:
return 0.0
# Binarise: text = white (255), background = black (0)
_, bw = cv2.threshold(gray_crop, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
if cv2.countNonZero(bw) < 20:
return 0.0
# Distance transform: value at each white pixel = distance to nearest black
dist = cv2.distanceTransform(bw, cv2.DIST_L2, 3)
# Skeleton via morphological thinning
kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
thin = bw.copy()
for _ in range(max(1, min(h, w) // 6)):
eroded = cv2.erode(thin, kernel)
if cv2.countNonZero(eroded) < 5:
break
thin = eroded
skeleton_pts = thin > 0
if not np.any(skeleton_pts):
return 0.0
mean_stroke = float(np.mean(dist[skeleton_pts]))
return mean_stroke / max(h, 1) * 100 # normalised: % of cell height
def _classify_bold_cells(cells: List[Dict[str, Any]], ocr_img: Optional[np.ndarray],
img_w: int, img_h: int) -> None:
"""Two-pass bold detection: measure all cells, then compare against median.
Cells with stroke width > 1.4× the page median are marked as bold.
This adapts automatically to font, DPI and scan quality.
Modifies cells in-place (sets 'is_bold' key).
"""
if ocr_img is None:
return
# Pass 1: measure stroke width for every cell with text
metrics: List[float] = []
cell_strokes: List[float] = []
for cell in cells:
sw = 0.0
if cell.get('text', '').strip():
bp = cell['bbox_px']
y1 = max(0, bp['y'])
y2 = min(img_h, bp['y'] + bp['h'])
x1 = max(0, bp['x'])
x2 = min(img_w, bp['x'] + bp['w'])
if y2 > y1 and x2 > x1:
sw = _measure_stroke_width(ocr_img[y1:y2, x1:x2])
cell_strokes.append(sw)
if sw > 0:
metrics.append(sw)
if len(metrics) < 3:
# Too few cells to compare — leave all as non-bold
return
median_sw = float(np.median(metrics))
if median_sw <= 0:
return
# Pass 2: cells significantly above median → bold
for cell, sw in zip(cells, cell_strokes):
cell['is_bold'] = sw > 0 and (sw / median_sw) > 1.4
# ---------------------------------------------------------------------------