Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 26s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m53s
CI / test-python-agent-core (push) Successful in 15s
CI / test-nodejs-website (push) Successful in 16s
Two fixes: 1. Step 5d now only treats cells as continuation when text is entirely inside brackets (e.g. "[n, nn]"). Cells with headwords outside brackets (e.g. "employee [im'ploi:]") are no longer overwritten. 2. fix_ipa_continuation_cell no longer skips grammar words like "down" — they are part of the headword in phrasal verbs like "close sth. down". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1960 lines
73 KiB
Python
1960 lines
73 KiB
Python
"""
|
||
OCR engines (RapidOCR, TrOCR, LightOn), vocab postprocessing, and text cleaning.
|
||
|
||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||
"""
|
||
|
||
import io
|
||
import logging
|
||
import os
|
||
import re
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import numpy as np
|
||
|
||
from cv_vocab_types import (
|
||
IPA_AVAILABLE,
|
||
PageRegion,
|
||
RowGeometry,
|
||
_britfone_dict,
|
||
_ipa_convert_american,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
try:
|
||
import cv2
|
||
except ImportError:
|
||
cv2 = None # type: ignore[assignment]
|
||
|
||
try:
|
||
from PIL import Image
|
||
except ImportError:
|
||
Image = None # type: ignore[assignment,misc]
|
||
|
||
|
||
# =============================================================================
|
||
# Pipeline Step 5: Word Grid from Columns × Rows
|
||
# =============================================================================
|
||
|
||
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
|
||
"""Group words by Y position into lines, sorted by X within each line."""
|
||
if not words:
|
||
return []
|
||
|
||
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
|
||
lines: List[List[Dict]] = []
|
||
current_line: List[Dict] = [sorted_words[0]]
|
||
current_y = sorted_words[0]['top']
|
||
|
||
for word in sorted_words[1:]:
|
||
if abs(word['top'] - current_y) <= y_tolerance_px:
|
||
current_line.append(word)
|
||
else:
|
||
current_line.sort(key=lambda w: w['left'])
|
||
lines.append(current_line)
|
||
current_line = [word]
|
||
current_y = word['top']
|
||
|
||
if current_line:
|
||
current_line.sort(key=lambda w: w['left'])
|
||
lines.append(current_line)
|
||
|
||
return lines
|
||
|
||
|
||
def _words_to_reading_order_lines(words: List[Dict], y_tolerance_px: int = 15) -> List[str]:
|
||
"""Group OCR words into visual lines in reading order.
|
||
|
||
Returns a list of line strings (one per visual line in the cell).
|
||
"""
|
||
if not words:
|
||
return []
|
||
|
||
lines = _group_words_into_lines(words, y_tolerance_px=y_tolerance_px)
|
||
return [' '.join(w['text'] for w in line) for line in lines]
|
||
|
||
|
||
def _rejoin_hyphenated(lines: List[str]) -> List[str]:
|
||
"""Rejoin words split by line-break hyphenation.
|
||
|
||
E.g. ['Fuß-', 'boden'] → ['Fußboden']
|
||
['some text-', 'thing here'] → ['something here']
|
||
"""
|
||
if len(lines) <= 1:
|
||
return lines
|
||
|
||
result = []
|
||
i = 0
|
||
while i < len(lines):
|
||
line = lines[i]
|
||
# If line ends with '-' and there's a next line, rejoin
|
||
if i + 1 < len(lines) and line.rstrip().endswith('-'):
|
||
stripped = line.rstrip()
|
||
# Get the word fragment before hyphen (last word)
|
||
prefix = stripped[:-1] # remove trailing hyphen
|
||
next_line = lines[i + 1]
|
||
# Join: last word of this line + first word of next line
|
||
prefix_words = prefix.rsplit(' ', 1)
|
||
next_words = next_line.split(' ', 1)
|
||
if len(prefix_words) > 1:
|
||
joined = prefix_words[0] + ' ' + prefix_words[1] + next_words[0]
|
||
else:
|
||
joined = prefix_words[0] + next_words[0]
|
||
remainder = next_words[1] if len(next_words) > 1 else ''
|
||
if remainder:
|
||
result.append(joined + ' ' + remainder)
|
||
else:
|
||
result.append(joined)
|
||
i += 2
|
||
else:
|
||
result.append(line)
|
||
i += 1
|
||
return result
|
||
|
||
|
||
def _words_to_reading_order_text(words: List[Dict], y_tolerance_px: int = 15) -> str:
|
||
"""Join OCR words into text in correct reading order, preserving line breaks.
|
||
|
||
Groups words into visual lines by Y-tolerance, sorts each line by X,
|
||
rejoins hyphenated words, then joins lines with newlines.
|
||
"""
|
||
lines = _words_to_reading_order_lines(words, y_tolerance_px)
|
||
lines = _rejoin_hyphenated(lines)
|
||
return '\n'.join(lines)
|
||
|
||
|
||
def _words_to_spaced_text(words: List[Dict], y_tolerance_px: int = 15) -> str:
|
||
"""Join OCR words preserving proportional horizontal spacing.
|
||
|
||
Instead of single spaces between words, inserts multiple spaces based on
|
||
the pixel gap between words relative to average character width.
|
||
Useful for box sub-sessions where spatial layout matters.
|
||
"""
|
||
lines = _group_words_into_lines(words, y_tolerance_px=y_tolerance_px)
|
||
result_lines = []
|
||
|
||
for line_words in lines:
|
||
if not line_words:
|
||
continue
|
||
sorted_words = sorted(line_words, key=lambda w: w['left'])
|
||
|
||
# Calculate average character width from all words in line
|
||
total_chars = sum(len(w['text']) for w in sorted_words if w.get('text'))
|
||
total_width = sum(w['width'] for w in sorted_words if w.get('text'))
|
||
avg_char_width = total_width / total_chars if total_chars > 0 else 10
|
||
|
||
parts = []
|
||
for i, word in enumerate(sorted_words):
|
||
parts.append(word.get('text', ''))
|
||
if i < len(sorted_words) - 1:
|
||
next_word = sorted_words[i + 1]
|
||
gap_px = next_word['left'] - (word['left'] + word['width'])
|
||
num_spaces = max(1, round(gap_px / avg_char_width))
|
||
parts.append(' ' * num_spaces)
|
||
|
||
result_lines.append(''.join(parts))
|
||
|
||
return '\n'.join(result_lines)
|
||
|
||
|
||
# --- RapidOCR integration (PaddleOCR models on ONNX Runtime) ---
|
||
|
||
_rapid_engine = None
|
||
RAPIDOCR_AVAILABLE = False
|
||
|
||
try:
|
||
from rapidocr import RapidOCR as _RapidOCRClass
|
||
from rapidocr import LangRec as _LangRec, OCRVersion as _OCRVersion, ModelType as _ModelType
|
||
RAPIDOCR_AVAILABLE = True
|
||
logger.info("RapidOCR available — can be used as alternative to Tesseract")
|
||
except ImportError:
|
||
logger.info("RapidOCR not installed — using Tesseract only")
|
||
|
||
|
||
def _get_rapid_engine():
|
||
"""Lazy-init RapidOCR engine with PP-OCRv5 Latin model for German support."""
|
||
global _rapid_engine
|
||
if _rapid_engine is None:
|
||
_rapid_engine = _RapidOCRClass(params={
|
||
# PP-OCRv5 Latin model — supports German umlauts (ä, ö, ü, ß)
|
||
"Rec.lang_type": _LangRec.LATIN,
|
||
"Rec.model_type": _ModelType.SERVER,
|
||
"Rec.ocr_version": _OCRVersion.PPOCRV5,
|
||
# Tighter detection boxes to reduce word merging
|
||
"Det.unclip_ratio": 1.3,
|
||
# Lower threshold to detect small chars (periods, ellipsis, phonetics)
|
||
"Det.box_thresh": 0.4,
|
||
# Silence verbose logging
|
||
"Global.log_level": "critical",
|
||
})
|
||
logger.info("RapidOCR engine initialized (PP-OCRv5 Latin, unclip_ratio=1.3)")
|
||
return _rapid_engine
|
||
|
||
|
||
def ocr_region_rapid(
|
||
img_bgr: np.ndarray,
|
||
region: PageRegion,
|
||
) -> List[Dict[str, Any]]:
|
||
"""Run RapidOCR on a specific region, returning word dicts compatible with Tesseract format.
|
||
|
||
Args:
|
||
img_bgr: Full-page BGR image (NOT binarized — RapidOCR works on color/gray).
|
||
region: Region to crop and OCR.
|
||
|
||
Returns:
|
||
List of word dicts with text, left, top, width, height, conf, region_type.
|
||
"""
|
||
engine = _get_rapid_engine()
|
||
|
||
# Crop region from BGR image
|
||
crop = img_bgr[region.y:region.y + region.height,
|
||
region.x:region.x + region.width]
|
||
|
||
if crop.size == 0:
|
||
return []
|
||
|
||
result = engine(crop)
|
||
|
||
if result is None or result.boxes is None or result.txts is None:
|
||
return []
|
||
|
||
words = []
|
||
boxes = result.boxes # shape (N, 4, 2) — 4 corner points per text line
|
||
txts = result.txts # tuple of strings
|
||
scores = result.scores # tuple of floats
|
||
|
||
for i, (box, txt, score) in enumerate(zip(boxes, txts, scores)):
|
||
if not txt or not txt.strip():
|
||
continue
|
||
|
||
# box is [[x1,y1],[x2,y2],[x3,y3],[x4,y4]] (clockwise from top-left)
|
||
xs = [p[0] for p in box]
|
||
ys = [p[1] for p in box]
|
||
left = int(min(xs))
|
||
top = int(min(ys))
|
||
w = int(max(xs) - left)
|
||
h = int(max(ys) - top)
|
||
|
||
words.append({
|
||
'text': txt.strip(),
|
||
'left': left + region.x, # Absolute coords
|
||
'top': top + region.y,
|
||
'width': w,
|
||
'height': h,
|
||
'conf': int(score * 100), # 0-100 like Tesseract
|
||
'region_type': region.type,
|
||
})
|
||
|
||
return words
|
||
|
||
|
||
def ocr_region_trocr(img_bgr: np.ndarray, region: PageRegion, handwritten: bool = False) -> List[Dict[str, Any]]:
|
||
"""Run TrOCR on a region. Returns line-level word dicts (same format as ocr_region_rapid).
|
||
|
||
Uses trocr_service.get_trocr_model() + _split_into_lines() for line segmentation.
|
||
Bboxes are approximated from equal line-height distribution within the region.
|
||
Falls back to Tesseract if TrOCR is not available.
|
||
"""
|
||
from services.trocr_service import get_trocr_model, _split_into_lines, _check_trocr_available
|
||
|
||
if not _check_trocr_available():
|
||
logger.warning("TrOCR not available, falling back to Tesseract")
|
||
if region.height > 0 and region.width > 0:
|
||
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None
|
||
if ocr_img_crop is not None:
|
||
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
|
||
return []
|
||
|
||
crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width]
|
||
if crop.size == 0:
|
||
return []
|
||
|
||
try:
|
||
import torch
|
||
from PIL import Image as _PILImage
|
||
|
||
processor, model = get_trocr_model(handwritten=handwritten)
|
||
if processor is None or model is None:
|
||
logger.warning("TrOCR model not loaded, falling back to Tesseract")
|
||
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
|
||
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
|
||
|
||
pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
|
||
lines = _split_into_lines(pil_crop)
|
||
if not lines:
|
||
lines = [pil_crop]
|
||
|
||
device = next(model.parameters()).device
|
||
all_text = []
|
||
confidences = []
|
||
for line_img in lines:
|
||
pixel_values = processor(images=line_img, return_tensors="pt").pixel_values.to(device)
|
||
with torch.no_grad():
|
||
generated_ids = model.generate(pixel_values, max_length=128)
|
||
text_line = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
|
||
if text_line:
|
||
all_text.append(text_line)
|
||
confidences.append(0.85 if len(text_line) > 3 else 0.5)
|
||
|
||
if not all_text:
|
||
return []
|
||
|
||
avg_conf = int(sum(confidences) / len(confidences) * 100)
|
||
line_h = region.height // max(len(all_text), 1)
|
||
words = []
|
||
for i, line in enumerate(all_text):
|
||
words.append({
|
||
"text": line,
|
||
"left": region.x,
|
||
"top": region.y + i * line_h,
|
||
"width": region.width,
|
||
"height": line_h,
|
||
"conf": avg_conf,
|
||
"region_type": region.type,
|
||
})
|
||
return words
|
||
|
||
except Exception as e:
|
||
logger.error(f"ocr_region_trocr failed: {e}")
|
||
return []
|
||
|
||
|
||
def ocr_region_lighton(img_bgr: np.ndarray, region: PageRegion) -> List[Dict[str, Any]]:
|
||
"""Run LightOnOCR-2-1B on a region. Returns line-level word dicts (same format as ocr_region_rapid).
|
||
|
||
Falls back to RapidOCR or Tesseract if LightOnOCR is not available.
|
||
"""
|
||
from services.lighton_ocr_service import get_lighton_model, _check_lighton_available
|
||
|
||
if not _check_lighton_available():
|
||
logger.warning("LightOnOCR not available, falling back to RapidOCR/Tesseract")
|
||
if RAPIDOCR_AVAILABLE and img_bgr is not None:
|
||
return ocr_region_rapid(img_bgr, region)
|
||
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None
|
||
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6) if ocr_img_crop is not None else []
|
||
|
||
crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width]
|
||
if crop.size == 0:
|
||
return []
|
||
|
||
try:
|
||
import io
|
||
import torch
|
||
from PIL import Image as _PILImage
|
||
|
||
processor, model = get_lighton_model()
|
||
if processor is None or model is None:
|
||
logger.warning("LightOnOCR model not loaded, falling back to RapidOCR/Tesseract")
|
||
if RAPIDOCR_AVAILABLE and img_bgr is not None:
|
||
return ocr_region_rapid(img_bgr, region)
|
||
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
|
||
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
|
||
|
||
pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
|
||
conversation = [{"role": "user", "content": [{"type": "image"}]}]
|
||
inputs = processor.apply_chat_template(
|
||
conversation, images=[pil_crop],
|
||
add_generation_prompt=True, return_tensors="pt"
|
||
).to(model.device)
|
||
|
||
with torch.no_grad():
|
||
output_ids = model.generate(**inputs, max_new_tokens=1024)
|
||
|
||
text = processor.decode(output_ids[0], skip_special_tokens=True).strip()
|
||
if not text:
|
||
return []
|
||
|
||
lines = [l.strip() for l in text.split("\n") if l.strip()]
|
||
line_h = region.height // max(len(lines), 1)
|
||
words = []
|
||
for i, line in enumerate(lines):
|
||
words.append({
|
||
"text": line,
|
||
"left": region.x,
|
||
"top": region.y + i * line_h,
|
||
"width": region.width,
|
||
"height": line_h,
|
||
"conf": 85,
|
||
"region_type": region.type,
|
||
})
|
||
return words
|
||
|
||
except Exception as e:
|
||
logger.error(f"ocr_region_lighton failed: {e}")
|
||
return []
|
||
|
||
|
||
# --- Remote PaddleOCR (Hetzner x86_64) ---
|
||
|
||
|
||
async def ocr_region_paddle(
|
||
img_bgr: np.ndarray,
|
||
region: Optional["PageRegion"] = None,
|
||
) -> List[Dict[str, Any]]:
|
||
"""Run OCR via local RapidOCR (default) or remote PaddleOCR (fallback).
|
||
|
||
Uses RapidOCR (same PP-OCRv5 ONNX models) locally for speed and reliability.
|
||
Falls back to remote PaddleOCR service only if:
|
||
- env FORCE_REMOTE_PADDLE=1 is set, or
|
||
- RapidOCR fails or returns no words
|
||
"""
|
||
force_remote = os.environ.get("FORCE_REMOTE_PADDLE", "").strip() == "1"
|
||
|
||
if not force_remote:
|
||
try:
|
||
if region is None:
|
||
h, w = img_bgr.shape[:2]
|
||
_region = PageRegion(type="full_page", x=0, y=0, width=w, height=h)
|
||
else:
|
||
_region = region
|
||
|
||
words = ocr_region_rapid(img_bgr, _region)
|
||
if words:
|
||
logger.info("ocr_region_paddle: used local RapidOCR (%d words)", len(words))
|
||
return words
|
||
logger.warning("ocr_region_paddle: RapidOCR returned 0 words, trying remote")
|
||
except Exception as e:
|
||
logger.warning("ocr_region_paddle: RapidOCR failed (%s), trying remote", e)
|
||
|
||
# --- Remote PaddleOCR fallback (Hetzner x86_64) ---
|
||
from services.paddleocr_remote import ocr_remote_paddle
|
||
|
||
if region is not None:
|
||
crop = img_bgr[
|
||
region.y : region.y + region.height,
|
||
region.x : region.x + region.width,
|
||
]
|
||
offset_x, offset_y = region.x, region.y
|
||
else:
|
||
crop = img_bgr
|
||
offset_x, offset_y = 0, 0
|
||
|
||
if crop.size == 0:
|
||
return []
|
||
|
||
# Downscale large images to fit within Traefik's 60s timeout.
|
||
# PaddleOCR works well at ~1500px max dimension.
|
||
h, w = crop.shape[:2]
|
||
scale = 1.0
|
||
_MAX_DIM = 1500
|
||
if max(h, w) > _MAX_DIM:
|
||
scale = _MAX_DIM / max(h, w)
|
||
new_w, new_h = int(w * scale), int(h * scale)
|
||
crop = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
||
logger.info("ocr_region_paddle: downscaled %dx%d → %dx%d (scale=%.2f)",
|
||
w, h, new_w, new_h, scale)
|
||
|
||
# Encode as JPEG (smaller than PNG, faster upload)
|
||
success, jpg_buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, 90])
|
||
if not success:
|
||
logger.error("ocr_region_paddle: cv2.imencode failed")
|
||
return []
|
||
|
||
words, _w, _h = await ocr_remote_paddle(jpg_buf.tobytes(), filename="scan.jpg")
|
||
logger.info("ocr_region_paddle: used remote PaddleOCR (%d words)", len(words))
|
||
|
||
# Scale coordinates back to original size and shift to absolute image space
|
||
inv_scale = 1.0 / scale if scale != 1.0 else 1.0
|
||
for wd in words:
|
||
wd["left"] = int(wd["left"] * inv_scale) + offset_x
|
||
wd["top"] = int(wd["top"] * inv_scale) + offset_y
|
||
wd["width"] = int(wd["width"] * inv_scale)
|
||
wd["height"] = int(wd["height"] * inv_scale)
|
||
if region is not None:
|
||
wd["region_type"] = region.type
|
||
|
||
return words
|
||
|
||
|
||
# =============================================================================
|
||
# Post-Processing: Deterministic Quality Fixes
|
||
# =============================================================================
|
||
|
||
# --- A. Character Confusion Fix (I/1/l) ---
|
||
|
||
# Common OCR confusion pairs in vocabulary context
|
||
_CHAR_CONFUSION_RULES = [
|
||
# "1" at word start followed by lowercase → likely "I" or "l"
|
||
# Exception: NOT before "." or "," (numbered list prefix: "1. Kreuz", "1, 2, 3")
|
||
(re.compile(r'\b1([a-z])'), r'I\1'), # 1ch → Ich, 1want → Iwant
|
||
# Standalone "1" → "I" (English pronoun), but NOT "1." or "1," (list number)
|
||
(re.compile(r'(?<!\d)\b1\b(?![\d.,])'), 'I'), # "1 want" → "I want"
|
||
# "|" → "I", but NOT "|." or "|," (those are "1." list prefixes → spell-checker handles them)
|
||
(re.compile(r'(?<!\|)\|(?!\||[.,])'), 'I'), # |ch → Ich, | want → I want
|
||
]
|
||
|
||
# Cross-language indicators: if DE has these, EN "1" is almost certainly "I"
|
||
_DE_INDICATORS_FOR_EN_I = {'ich', 'mich', 'mir', 'mein', 'meine', 'meiner', 'meinem'}
|
||
|
||
|
||
def _fix_character_confusion(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""Fix common OCR character confusions using context.
|
||
|
||
Deterministic rules:
|
||
- "1" at word start → "I" or "l" based on context
|
||
- Cross-reference EN↔DE: if DE contains "ich/mich/mir", EN "1" → "I"
|
||
- "y " artifact at word boundaries → remove (e.g. "y you" → "you")
|
||
"""
|
||
for entry in entries:
|
||
en = entry.get('english', '') or ''
|
||
de = entry.get('german', '') or ''
|
||
ex = entry.get('example', '') or ''
|
||
|
||
# Apply general rules to all fields
|
||
for pattern, replacement in _CHAR_CONFUSION_RULES:
|
||
en = pattern.sub(replacement, en)
|
||
de = pattern.sub(replacement, de)
|
||
ex = pattern.sub(replacement, ex)
|
||
|
||
# Cross-reference: if DE has "ich"/"mich" indicators, fix EN "1" → "I"
|
||
de_lower_words = set(de.lower().replace(',', ' ').split())
|
||
if de_lower_words & _DE_INDICATORS_FOR_EN_I:
|
||
# Any remaining "1" in EN that looks like "I"
|
||
en = re.sub(r'\b1\b(?![\d.,])', 'I', en)
|
||
|
||
# Fix "y " artifact before repeated word: "y you" → "you"
|
||
en = re.sub(r'\by\s+([a-z])', r'\1', en)
|
||
ex = re.sub(r'\by\s+([a-z])', r'\1', ex)
|
||
|
||
entry['english'] = en.strip()
|
||
entry['german'] = de.strip()
|
||
entry['example'] = ex.strip()
|
||
|
||
return entries
|
||
|
||
|
||
# --- B. Comma-Separated Word Form Splitting ---
|
||
|
||
def _is_singular_plural_pair(parts: List[str]) -> bool:
|
||
"""Detect if comma-separated parts are singular/plural forms of the same word.
|
||
|
||
E.g. "mouse, mice" or "Maus, Mäuse" → True (should NOT be split).
|
||
"break, broke, broken" → False (different verb forms, OK to split).
|
||
|
||
Heuristic: exactly 2 parts that share a common prefix of >= 50% length,
|
||
OR one part is a known plural suffix of the other (e.g. +s, +es, +en).
|
||
"""
|
||
if len(parts) != 2:
|
||
return False
|
||
|
||
a, b = parts[0].lower().strip(), parts[1].lower().strip()
|
||
if not a or not b:
|
||
return False
|
||
|
||
# Common prefix heuristic: if words share >= 50% of the shorter word,
|
||
# they are likely forms of the same word (Maus/Mäuse, child/children).
|
||
min_len = min(len(a), len(b))
|
||
common = 0
|
||
for ca, cb in zip(a, b):
|
||
if ca == cb:
|
||
common += 1
|
||
else:
|
||
break
|
||
if common >= max(2, min_len * 0.5):
|
||
return True
|
||
|
||
# Umlaut relation: one form adds umlaut (a→ä, o→ö, u→ü)
|
||
umlaut_map = str.maketrans('aou', 'äöü')
|
||
if a.translate(umlaut_map) == b or b.translate(umlaut_map) == a:
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def _split_comma_entries(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""Split entries with comma-separated word forms into individual entries.
|
||
|
||
E.g. EN: "break, broke, broken" / DE: "brechen, brach, gebrochen"
|
||
→ 3 entries: break/brechen, broke/brach, broken/gebrochen
|
||
|
||
Does NOT split singular/plural pairs like "mouse, mice" / "Maus, Mäuse"
|
||
because those are forms of the same vocabulary entry.
|
||
|
||
Only splits when both EN and DE have the same number of comma-parts,
|
||
parts are short (word forms, not sentences), and at least 3 parts
|
||
(to avoid splitting pairs that likely belong together).
|
||
"""
|
||
result: List[Dict[str, Any]] = []
|
||
|
||
for entry in entries:
|
||
en = (entry.get('english', '') or '').strip()
|
||
de = (entry.get('german', '') or '').strip()
|
||
|
||
# Split by comma (but not inside brackets or parentheses)
|
||
en_parts = _split_by_comma(en)
|
||
de_parts = _split_by_comma(de)
|
||
|
||
# Only split if we have multiple parts and counts match
|
||
should_split = False
|
||
if len(en_parts) > 1 and len(de_parts) > 1 and len(en_parts) == len(de_parts):
|
||
# All parts must be short (word forms, not sentences)
|
||
if all(len(p.split()) <= 3 for p in en_parts) and all(len(p.split()) <= 3 for p in de_parts):
|
||
# Do NOT split singular/plural pairs (2 parts that are
|
||
# forms of the same word)
|
||
if _is_singular_plural_pair(en_parts) or _is_singular_plural_pair(de_parts):
|
||
should_split = False
|
||
else:
|
||
should_split = True
|
||
|
||
if not should_split:
|
||
result.append(entry)
|
||
continue
|
||
|
||
# Split into individual entries
|
||
for k in range(len(en_parts)):
|
||
sub = dict(entry) # shallow copy
|
||
sub['english'] = en_parts[k].strip()
|
||
sub['german'] = de_parts[k].strip() if k < len(de_parts) else ''
|
||
sub['example'] = '' # examples get attached later
|
||
sub['split_from_comma'] = True
|
||
result.append(sub)
|
||
|
||
# Re-number
|
||
for i, e in enumerate(result):
|
||
e['row_index'] = i
|
||
|
||
return result
|
||
|
||
|
||
def _split_by_comma(text: str) -> List[str]:
|
||
"""Split text by commas, but not inside brackets [...] or parens (...)."""
|
||
if ',' not in text:
|
||
return [text]
|
||
|
||
parts = []
|
||
depth_bracket = 0
|
||
depth_paren = 0
|
||
current = []
|
||
|
||
for ch in text:
|
||
if ch == '[':
|
||
depth_bracket += 1
|
||
elif ch == ']':
|
||
depth_bracket = max(0, depth_bracket - 1)
|
||
elif ch == '(':
|
||
depth_paren += 1
|
||
elif ch == ')':
|
||
depth_paren = max(0, depth_paren - 1)
|
||
elif ch == ',' and depth_bracket == 0 and depth_paren == 0:
|
||
parts.append(''.join(current).strip())
|
||
current = []
|
||
continue
|
||
current.append(ch)
|
||
|
||
if current:
|
||
parts.append(''.join(current).strip())
|
||
|
||
# Filter empty parts
|
||
return [p for p in parts if p]
|
||
|
||
|
||
# --- C. Example Sentence Attachment ---
|
||
|
||
def _find_best_vocab_match(example_text: str, vocab_entries: List[Dict[str, Any]]) -> int:
|
||
"""Find the vocab entry whose English word(s) best match the example sentence.
|
||
|
||
Returns index into vocab_entries, or -1 if no match found.
|
||
Uses word stem overlap: "a broken arm" matches "broken" or "break".
|
||
"""
|
||
if not vocab_entries or not example_text:
|
||
return -1
|
||
|
||
example_lower = example_text.lower()
|
||
example_words = set(re.findall(r'[a-zäöüß]+', example_lower))
|
||
|
||
best_idx = -1
|
||
best_score = 0
|
||
|
||
for i, entry in enumerate(vocab_entries):
|
||
en = (entry.get('english', '') or '').lower()
|
||
if not en:
|
||
continue
|
||
|
||
# Extract vocab words (split on space, comma, newline)
|
||
vocab_words = set(re.findall(r'[a-zäöüß]+', en))
|
||
|
||
# Score: how many vocab words appear in the example?
|
||
# Also check if example words share a common stem (first 4 chars)
|
||
direct_matches = vocab_words & example_words
|
||
score = len(direct_matches) * 10
|
||
|
||
# Stem matching: "broken" matches "break" via shared prefix "bro"/"bre"
|
||
if score == 0:
|
||
for vw in vocab_words:
|
||
if len(vw) < 3:
|
||
continue
|
||
stem = vw[:4] if len(vw) >= 4 else vw[:3]
|
||
for ew in example_words:
|
||
if len(ew) >= len(stem) and ew[:len(stem)] == stem:
|
||
score += 5
|
||
break
|
||
|
||
if score > best_score:
|
||
best_score = score
|
||
best_idx = i
|
||
|
||
return best_idx if best_score > 0 else -1
|
||
|
||
|
||
def _attach_example_sentences(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""Attach rows with EN text but no DE translation as examples to matching vocab entries.
|
||
|
||
Vocabulary worksheets often have:
|
||
Row 1: break, broke, broken / brechen, brach, gebrochen
|
||
Row 2: a broken arm (no DE → example for "broken")
|
||
Row 3: a broken plate (no DE → example for "broken")
|
||
Row 4: egg / Ei (has DE → new vocab entry)
|
||
|
||
Rules (deterministic, generic):
|
||
- A row is an "example row" if it has EN text but NO DE text (or very short DE ≤2 chars)
|
||
- Find the best matching vocab entry by checking which entry's English words
|
||
appear in the example sentence (semantic matching via word overlap)
|
||
- Fall back to the nearest preceding entry if no word match found
|
||
- Multiple examples get joined with " | "
|
||
"""
|
||
if not entries:
|
||
return entries
|
||
|
||
# Separate into vocab entries (have DE) and example candidates (no DE)
|
||
vocab_entries: List[Dict[str, Any]] = []
|
||
examples_for: Dict[int, List[str]] = {} # vocab_index → list of example texts
|
||
|
||
for entry in entries:
|
||
en = (entry.get('english', '') or '').strip()
|
||
de = (entry.get('german', '') or '').strip()
|
||
ex = (entry.get('example', '') or '').strip()
|
||
|
||
# Treat single-char DE as OCR noise, not real translation.
|
||
# "Ei" (2 chars) is a valid German word, so threshold is 1.
|
||
has_de = len(de) > 1
|
||
has_en = bool(en)
|
||
|
||
# Heuristic: a row without DE is an "example sentence" only if
|
||
# the EN text looks like a sentence (>= 4 words, or contains
|
||
# typical sentence punctuation). Short EN text (1-3 words) is
|
||
# more likely a vocab entry whose DE was missed by OCR.
|
||
_looks_like_sentence = (
|
||
len(en.split()) >= 4
|
||
or en.rstrip().endswith(('.', '!', '?'))
|
||
)
|
||
is_example_candidate = (
|
||
has_en and not has_de and _looks_like_sentence and vocab_entries
|
||
)
|
||
|
||
if is_example_candidate:
|
||
# This is an example sentence — find best matching vocab entry
|
||
example_text = en
|
||
|
||
match_idx = _find_best_vocab_match(en, vocab_entries)
|
||
if match_idx < 0:
|
||
# No word match → fall back to last entry
|
||
match_idx = len(vocab_entries) - 1
|
||
|
||
if match_idx not in examples_for:
|
||
examples_for[match_idx] = []
|
||
examples_for[match_idx].append(example_text)
|
||
else:
|
||
vocab_entries.append(entry)
|
||
|
||
# Attach examples to their matched vocab entries
|
||
for idx, example_list in examples_for.items():
|
||
if 0 <= idx < len(vocab_entries):
|
||
entry = vocab_entries[idx]
|
||
existing_ex = (entry.get('example', '') or '').strip()
|
||
new_examples = ' | '.join(example_list)
|
||
entry['example'] = f"{existing_ex} | {new_examples}" if existing_ex else new_examples
|
||
|
||
# Re-number
|
||
for i, e in enumerate(vocab_entries):
|
||
e['row_index'] = i
|
||
|
||
return vocab_entries
|
||
|
||
|
||
# --- D. Phonetic Bracket IPA Replacement ---
|
||
|
||
# Pattern: word followed by any bracket type containing phonetic content.
|
||
# Tesseract often garbles IPA brackets: [ˈdɑːns] → {'tfatno] or (cy) etc.
|
||
# Match any opener ([, {, () with any closer (], }, )) — even mixed pairs.
|
||
# This intentionally matches mixed brackets (e.g. {content]) because
|
||
# Tesseract frequently misrecognizes bracket characters.
|
||
_PHONETIC_BRACKET_RE = re.compile(
|
||
r'(\b[a-zA-ZäöüÄÖÜß]+)\s*[\[\{\(]([^\]\}\)]*?)[\]\}\)]'
|
||
)
|
||
|
||
# Unicode IPA characters — used to distinguish correct IPA (from dictionary
|
||
# lookup) from garbled OCR content when stripping orphan brackets.
|
||
_IPA_CHARS = frozenset('ˈˌːɑɒæɛəɜɪɔʊʌðŋθʃʒɹɡɾʔɐ')
|
||
|
||
# Minimum word confidence for full-page Tesseract results (0-100).
|
||
# Words below this threshold are OCR noise (scanner shadows, borders).
|
||
_MIN_WORD_CONF = 30
|
||
|
||
|
||
def _lookup_ipa(word: str, pronunciation: str = 'british') -> Optional[str]:
|
||
"""Look up IPA for a word using the selected pronunciation dictionary.
|
||
|
||
Args:
|
||
word: English word to look up.
|
||
pronunciation: 'british' (Britfone, MIT) or 'american' (eng_to_ipa, MIT).
|
||
|
||
Returns:
|
||
IPA string or None if not found.
|
||
"""
|
||
word_lower = word.lower().strip()
|
||
if not word_lower:
|
||
return None
|
||
|
||
if pronunciation == 'british' and _britfone_dict:
|
||
ipa = _britfone_dict.get(word_lower)
|
||
if ipa:
|
||
return ipa
|
||
# Fallback to American if not in Britfone
|
||
if _ipa_convert_american:
|
||
result = _ipa_convert_american(word_lower)
|
||
if result and '*' not in result:
|
||
return result
|
||
return None
|
||
|
||
if pronunciation == 'american' and _ipa_convert_american:
|
||
result = _ipa_convert_american(word_lower)
|
||
if result and '*' not in result:
|
||
return result
|
||
# Fallback to Britfone if not in CMU
|
||
if _britfone_dict:
|
||
ipa = _britfone_dict.get(word_lower)
|
||
if ipa:
|
||
return ipa
|
||
return None
|
||
|
||
# Try any available source
|
||
if _britfone_dict:
|
||
ipa = _britfone_dict.get(word_lower)
|
||
if ipa:
|
||
return ipa
|
||
if _ipa_convert_american:
|
||
result = _ipa_convert_american(word_lower)
|
||
if result and '*' not in result:
|
||
return result
|
||
|
||
return None
|
||
|
||
|
||
def _fix_phonetic_brackets(
|
||
entries: List[Dict[str, Any]],
|
||
pronunciation: str = 'british',
|
||
) -> List[Dict[str, Any]]:
|
||
"""Replace OCR'd phonetic transcriptions with dictionary IPA.
|
||
|
||
Detects patterns like "dance [du:ns]" and replaces with correct IPA:
|
||
- British: "dance [dˈɑːns]" (Britfone, MIT)
|
||
- American: "dance [dæns]" (eng_to_ipa/CMU, MIT)
|
||
|
||
Only replaces if the word before brackets is found in the dictionary.
|
||
"""
|
||
if not IPA_AVAILABLE:
|
||
return entries
|
||
|
||
# IPA phonetics only appear in the ENGLISH field of vocab tables.
|
||
# German and example fields contain meaningful parenthetical content:
|
||
# german: "Eis (gefrorenes Wasser)", "(Salat-)Gurke", "sauer (auf)"
|
||
# example: "(sich beschweren)", "(brauchen)", "(jammern)"
|
||
# These must NEVER be processed as phonetic transcriptions.
|
||
replaced_count = 0
|
||
for entry in entries:
|
||
text = entry.get('english', '') or ''
|
||
if not any(ch in text for ch in '[{('):
|
||
continue
|
||
new_text = _replace_phonetics_in_text(text, pronunciation)
|
||
if new_text != text:
|
||
logger.debug(f"_fix_phonetic_brackets: '{text}' → '{new_text}'")
|
||
replaced_count += 1
|
||
entry['english'] = new_text
|
||
|
||
if replaced_count:
|
||
logger.info(f"_fix_phonetic_brackets: {replaced_count} IPA replacements in {len(entries)} entries")
|
||
return entries
|
||
|
||
|
||
# Grammar particles that appear in brackets after English words:
|
||
# cross (with), complain (about/of), agree (on/with), look (sth) up
|
||
# These must NOT be replaced with IPA. Only used for the English field
|
||
# (German/example fields are never processed for IPA replacement).
|
||
_GRAMMAR_BRACKET_WORDS = frozenset({
|
||
# English prepositions/particles commonly in vocab tables
|
||
'with', 'about', 'of', 'for', 'to', 'from', 'in', 'on', 'at', 'by',
|
||
'up', 'out', 'off', 'into', 'over', 'down', 'away', 'back', 'through',
|
||
# English grammar abbreviations used in vocab tables
|
||
'sth', 'sb', 'adj', 'adv',
|
||
})
|
||
|
||
|
||
def _is_grammar_bracket_content(content: str) -> bool:
|
||
"""Return True if bracket content is grammar info in the ENGLISH field.
|
||
|
||
Grammar info: cross (with), complain (about/of), agree (on/with)
|
||
NOT grammar: [breik], [maus], {'tfatno], (cy), ['kju:kambo], [test]
|
||
|
||
Since we only process the English field, we only need to recognize
|
||
English grammar particles. Everything else is (garbled) IPA.
|
||
"""
|
||
if not content:
|
||
return False
|
||
|
||
# Split on / for patterns like (about/of), (on/with)
|
||
tokens = [t.strip().lower() for t in content.split('/') if t.strip()]
|
||
if not tokens:
|
||
return False
|
||
|
||
# ALL tokens must be known grammar words
|
||
return all(token in _GRAMMAR_BRACKET_WORDS for token in tokens)
|
||
|
||
|
||
def _replace_phonetics_in_text(
|
||
text: str,
|
||
pronunciation: str = 'british',
|
||
strip_orphans: bool = True,
|
||
) -> str:
|
||
"""Replace [phonetic] / {phonetic} / (phonetic) after words with dictionary IPA.
|
||
|
||
Tesseract garbles IPA brackets, e.g. China [ˈtʃaɪnə] → China {'tfatno].
|
||
We match any bracket type and replace with dictionary IPA if found.
|
||
Legitimate parenthetical content like (zer)brechen or (veranstaltung) is preserved.
|
||
|
||
Args:
|
||
strip_orphans: If True, strip orphan brackets that look like garbled IPA.
|
||
Set to False for column_text where brackets may be German content.
|
||
"""
|
||
if not IPA_AVAILABLE:
|
||
return text
|
||
|
||
def replacer(match):
|
||
word = match.group(1)
|
||
bracket_content = match.group(2).strip()
|
||
full_match = match.group(0)
|
||
|
||
# Skip if bracket content looks like regular text (multiple words)
|
||
if len(bracket_content.split()) > 3:
|
||
return full_match
|
||
|
||
# Look up IPA for the word before brackets
|
||
ipa = _lookup_ipa(word, pronunciation)
|
||
|
||
if ipa:
|
||
# Word has IPA → bracket content is phonetic (garbled or correct).
|
||
# Exception: grammar particles like cross (with) — keep those.
|
||
if _is_grammar_bracket_content(bracket_content):
|
||
return full_match
|
||
logger.debug(f"phonetic: '{full_match}' → '{word} [{ipa}]'")
|
||
return f"{word} [{ipa}]"
|
||
|
||
# No IPA for this word — keep as-is
|
||
return full_match
|
||
|
||
text = _PHONETIC_BRACKET_RE.sub(replacer, text)
|
||
|
||
if strip_orphans:
|
||
# Second pass: strip remaining orphan brackets that are garbled IPA.
|
||
# These have no word before them (the main regex requires \b word \s* bracket).
|
||
# Examples: "[mais]", "{'mani setva]", trailing "(kros]"
|
||
# Keep: grammar parens "(sich beschweren)", correct IPA "[dˈɑːns]"
|
||
def _strip_orphan_bracket(m):
|
||
content = m.group(1).strip()
|
||
# Keep grammar info: (sich beschweren), (about/of)
|
||
if _is_grammar_bracket_content(content):
|
||
return m.group(0)
|
||
# Keep correct IPA (contains Unicode IPA characters)
|
||
if any(ch in _IPA_CHARS for ch in content):
|
||
return m.group(0)
|
||
# Keep real-word parentheticals like (probieren), (Profit), (Geld).
|
||
# Garbled IPA fragments are short nonsense like (kros), (cy), (mais)
|
||
# — they never contain a real word ≥4 letters with proper casing.
|
||
content_alpha = re.sub(r'[^a-zA-ZäöüÄÖÜßéèêëàâîïôûùç]', '', content)
|
||
if len(content_alpha) >= 4:
|
||
return m.group(0)
|
||
logger.debug(f"phonetic: stripping orphan bracket '{m.group(0)}'")
|
||
return ''
|
||
|
||
text = re.sub(r'[\[\{\(]([^\]\}\)]*)[\]\}\)]', _strip_orphan_bracket, text)
|
||
|
||
text = text.strip()
|
||
|
||
return text
|
||
|
||
|
||
def _text_has_garbled_ipa(text: str) -> bool:
|
||
"""Check if text contains garbled IPA-like fragments from OCR.
|
||
|
||
Returns True if there is evidence of OCR-mangled phonetic
|
||
transcription, e.g. stress marks, length marks, or IPA special chars.
|
||
This is used to decide whether ``_insert_missing_ipa`` should run:
|
||
it must only insert IPA to *replace* garbled phonetics that are already
|
||
in the text — never to ADD phonetics where none existed on the page.
|
||
"""
|
||
# Bracketed text that doesn't contain valid IPA symbols is garbled OCR
|
||
# of a phonetic transcription, e.g. "[n, nn]" or "[1uedtX,1]".
|
||
stripped = text.strip()
|
||
if stripped.startswith('[') and stripped.endswith(']'):
|
||
inner = stripped[1:-1]
|
||
# Real IPA brackets contain IPA symbols (ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ)
|
||
if not any(c in inner for c in 'ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ'):
|
||
# Not a valid dictionary-style bracket like "(no pl)" — those
|
||
# use parentheses, not square brackets. Square brackets with
|
||
# no IPA chars are garbled phonetics.
|
||
return True
|
||
|
||
for w in text.strip().split():
|
||
# Skip delimiters and very short tokens
|
||
if len(w) <= 1 or w in ('–', '—', '-', '/', '|', ',', ';'):
|
||
continue
|
||
# Starts with stress mark (OCR read IPA stress ' as apostrophe)
|
||
if w.startswith("'") and len(w) > 1 and not w[1:].istitle():
|
||
return True
|
||
if w.startswith("\u02c8") or w.startswith("\u02cc"): # ˈ ˌ
|
||
return True
|
||
# Contains IPA length mark ':' in a short non-word fragment
|
||
if ':' in w and len(w) < 12:
|
||
# But not things like "3:00" (time) or common words
|
||
stripped = re.sub(r'[^a-zA-Z:]', '', w)
|
||
if ':' in stripped and not stripped.replace(':', '').isalpha():
|
||
continue
|
||
return True
|
||
# Contains IPA special characters
|
||
if any(c in w for c in 'əɪɛɒʊʌæɑɔʃʒθðŋ'):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
|
||
"""Insert IPA pronunciation for English words that have no brackets at all.
|
||
|
||
OCR sometimes garbles the phonetic transcription into plain-text fragments
|
||
(e.g. "scare skea" where "skea" is garbled /skɛə/). This scans the text
|
||
for the headword, inserts correct [IPA], and strips the garbled fragments.
|
||
|
||
Only inserts for words that:
|
||
- are standalone (not already followed by a bracket)
|
||
- have an IPA entry in the dictionary
|
||
- appear to be English headwords (at the start of text or after common
|
||
separators like ",", ";", "•")
|
||
|
||
This is intentionally conservative: it only inserts at the END of each
|
||
whitespace-separated token group to avoid breaking phrases.
|
||
"""
|
||
if not IPA_AVAILABLE:
|
||
return text
|
||
if not text or not text.strip():
|
||
return text
|
||
|
||
# Skip if already has brackets (IPA replacement handles those)
|
||
if any(ch in text for ch in '[{('):
|
||
return text
|
||
|
||
# Only process short text fragments (typical vocab cells).
|
||
# Long sentences / paragraphs should not get IPA insertions.
|
||
words = text.strip().split()
|
||
if len(words) > 6:
|
||
return text
|
||
|
||
# Try to insert IPA for the first alphanumeric word
|
||
# Typical patterns: "challenge", "profit", "film", "badge"
|
||
for i, w in enumerate(words):
|
||
# Clean punctuation for lookup
|
||
clean = re.sub(r'[^a-zA-ZäöüÄÖÜß\'-]', '', w)
|
||
if not clean or len(clean) < 2:
|
||
continue
|
||
# Skip German/grammar words
|
||
if clean.lower() in _GRAMMAR_BRACKET_WORDS:
|
||
continue
|
||
ipa = _lookup_ipa(clean, pronunciation)
|
||
# Fallback: try without hyphens (e.g. "second-hand" → "secondhand")
|
||
if not ipa and '-' in clean:
|
||
ipa = _lookup_ipa(clean.replace('-', ''), pronunciation)
|
||
# Fallback 1: IPA-marker split for merged tokens where OCR
|
||
# joined headword with its IPA (e.g. "schoolbagsku:lbæg").
|
||
# Find the first IPA marker character (:, æ, ɪ, etc.), walk
|
||
# backwards ≤3 chars for the onset consonant cluster, and
|
||
# split into headword + OCR IPA.
|
||
_IPA_SPLIT_CHARS = set(':əɪɛɒʊʌæɑɔʃʒθðŋˈˌ')
|
||
if not ipa:
|
||
first_marker = next(
|
||
(p for p, ch in enumerate(w) if ch in _IPA_SPLIT_CHARS), -1,
|
||
)
|
||
if first_marker >= 3:
|
||
split = first_marker
|
||
while (split > 0
|
||
and split > first_marker - 3
|
||
and w[split - 1].isalpha()
|
||
and w[split - 1].islower()):
|
||
split -= 1
|
||
if split >= 2:
|
||
headword = w[:split]
|
||
ocr_ipa = w[split:]
|
||
hw_ipa = _lookup_ipa(headword, pronunciation)
|
||
if hw_ipa:
|
||
words[i] = f"{headword} [{hw_ipa}]"
|
||
else:
|
||
# Word not in dictionary — use OCR IPA
|
||
words[i] = f"{headword} [{ocr_ipa}]"
|
||
words = words[:i + 1]
|
||
ipa = True # signal that we handled it
|
||
break
|
||
# Fallback 2: prefix matching for merged tokens WITHOUT IPA
|
||
# markers (e.g. "Scotland'skotland"). Find longest dictionary
|
||
# prefix using only alpha chars to avoid punctuation matches.
|
||
if not ipa:
|
||
alpha = re.sub(r'[^a-zA-Z]', '', clean)
|
||
if len(alpha) > 5: # need at least 6 chars for meaningful split
|
||
for end in range(len(alpha), 3, -1): # min prefix 4 chars
|
||
prefix = alpha[:end]
|
||
test_ipa = _lookup_ipa(prefix, pronunciation)
|
||
if test_ipa:
|
||
ipa = test_ipa
|
||
w = prefix
|
||
words[i] = prefix
|
||
break
|
||
if ipa:
|
||
words[i] = f"{w} [{ipa}]"
|
||
# Strip garbled OCR phonetics after the IPA bracket.
|
||
# On scanned vocab pages, printed IPA is read as garbled
|
||
# text (e.g. "scare skea" where "skea" is garbled /skɛə/).
|
||
# After inserting correct IPA, remove remaining words that
|
||
# aren't real English words, delimiters, or German text.
|
||
kept = words[:i + 1]
|
||
for j in range(i + 1, len(words)):
|
||
wj = words[j]
|
||
# Delimiter — keep this and everything after
|
||
if wj in ('–', '—', '-', '/', '|', ',', ';'):
|
||
kept.extend(words[j:])
|
||
break
|
||
# Starts with uppercase — likely German or proper noun
|
||
clean_j = re.sub(r'[^a-zA-Z]', '', wj)
|
||
if clean_j and clean_j[0].isupper():
|
||
kept.extend(words[j:])
|
||
break
|
||
# Known English word (≥2 chars) — keep it and rest
|
||
if clean_j and len(clean_j) >= 2:
|
||
if _lookup_ipa(clean_j, pronunciation):
|
||
kept.extend(words[j:])
|
||
break
|
||
# Otherwise — likely garbled phonetics, skip
|
||
words = kept
|
||
break
|
||
|
||
return ' '.join(words)
|
||
|
||
|
||
def _has_non_dict_trailing(text: str, pronunciation: str = 'british') -> bool:
|
||
"""Check if text has a headword followed by non-dictionary trailing words.
|
||
|
||
Used as an additional trigger for ``_insert_missing_ipa`` when
|
||
``_text_has_garbled_ipa`` returns False because the garbled IPA
|
||
happens to look like plain ASCII (e.g. "skea" for /skɛə/).
|
||
"""
|
||
if not IPA_AVAILABLE:
|
||
return False
|
||
words = text.strip().split()
|
||
if len(words) < 2 or len(words) > 6:
|
||
return False
|
||
# Find first dictionary word
|
||
hw_idx = -1
|
||
for i, w in enumerate(words):
|
||
clean = re.sub(r'[^a-zA-Z\'-]', '', w)
|
||
if not clean or len(clean) < 2:
|
||
continue
|
||
if clean.lower() in _GRAMMAR_BRACKET_WORDS:
|
||
continue
|
||
if _lookup_ipa(clean, pronunciation):
|
||
hw_idx = i
|
||
break
|
||
if hw_idx < 0 or hw_idx >= len(words) - 1:
|
||
return False
|
||
# Check ALL remaining words — if none are dictionary/delimiter/German,
|
||
# they are likely garbled IPA.
|
||
for j in range(hw_idx + 1, len(words)):
|
||
wj = words[j]
|
||
if wj in ('–', '—', '-', '/', '|', ',', ';'):
|
||
return False
|
||
clean_j = re.sub(r'[^a-zA-Z]', '', wj)
|
||
if clean_j and clean_j[0].isupper():
|
||
return False
|
||
if clean_j and len(clean_j) >= 2 and _lookup_ipa(clean_j, pronunciation):
|
||
return False
|
||
return True
|
||
|
||
|
||
def _strip_post_bracket_garbled(
|
||
text: str, pronunciation: str = 'british',
|
||
) -> str:
|
||
"""Strip garbled IPA fragments that trail after proper [IPA] brackets.
|
||
|
||
E.g. ``sea [sˈiː] si:`` → ``sea [sˈiː]``
|
||
``seat [sˈiːt] si:t`` → ``seat [sˈiːt]``
|
||
"""
|
||
if ']' not in text:
|
||
return text
|
||
last_bracket = text.rfind(']')
|
||
if last_bracket >= len(text) - 1:
|
||
return text
|
||
before = text[:last_bracket + 1].rstrip()
|
||
after = text[last_bracket + 1:].strip()
|
||
if not after:
|
||
return text
|
||
after_words = after.split()
|
||
kept: List[str] = []
|
||
for idx, w in enumerate(after_words):
|
||
# Delimiter — keep rest
|
||
if w in ('–', '—', '-', '/', '|', ',', ';'):
|
||
kept.extend(after_words[idx:])
|
||
break
|
||
# Contains IPA markers (length mark, IPA chars) — garbled, skip
|
||
if ':' in w or any(c in w for c in 'əɪɛɒʊʌæɑɔʃʒθðŋˈˌ'):
|
||
continue
|
||
clean = re.sub(r'[^a-zA-Z]', '', w)
|
||
# Uppercase — likely German, keep rest
|
||
if clean and clean[0].isupper():
|
||
kept.extend(after_words[idx:])
|
||
break
|
||
# Known English word — keep rest
|
||
if clean and len(clean) >= 2 and _lookup_ipa(clean, pronunciation):
|
||
kept.extend(after_words[idx:])
|
||
break
|
||
# Unknown short word — likely garbled, skip
|
||
if kept:
|
||
return before + ' ' + ' '.join(kept)
|
||
return before
|
||
|
||
|
||
def fix_ipa_continuation_cell(
|
||
garbled_text: str,
|
||
headword_text: str,
|
||
pronunciation: str = 'british',
|
||
) -> str:
|
||
"""Replace garbled IPA in a continuation row with proper IPA.
|
||
|
||
Continuation rows appear below the headword and contain only the
|
||
printed phonetic transcription, which OCR garbles into fragments
|
||
like ``ska:f – ska:vz`` (should be ``[skˈɑːf] – [skˈɑːvz]``).
|
||
|
||
Args:
|
||
garbled_text: The OCR-garbled IPA text from the continuation row.
|
||
headword_text: The headword text from the previous row
|
||
(e.g. ``scarf – scarves``).
|
||
pronunciation: ``'british'`` or ``'american'``.
|
||
|
||
Returns:
|
||
Corrected IPA text, or the original if no fix could be applied.
|
||
"""
|
||
if not IPA_AVAILABLE or not garbled_text or not headword_text:
|
||
return garbled_text
|
||
|
||
# Strip existing IPA brackets and parenthetical grammar annotations
|
||
# like "(no pl)", "(sth)", "(sb)" from headword text
|
||
clean_hw = re.sub(r'\[[^\]]*\]', '', headword_text)
|
||
clean_hw = re.sub(r'\([^)]*\)', '', clean_hw).strip()
|
||
if not clean_hw:
|
||
return garbled_text
|
||
|
||
# Split headword by delimiters (– — -)
|
||
# "scarf – scarves" → ["scarf", "scarves"]
|
||
# "see - saw - seen" → ["see", "saw", "seen"]
|
||
parts = re.split(r'\s*[–—]\s*|\s+-\s+', clean_hw)
|
||
parts = [p.strip() for p in parts if p.strip()]
|
||
|
||
if not parts:
|
||
return garbled_text
|
||
|
||
# Look up IPA for each headword part.
|
||
# Do NOT skip grammar words here — they are integral parts of the
|
||
# headword (e.g. "close down", "the United Kingdom"). Grammar
|
||
# annotations like "(sth)", "(no pl)" are already stripped above.
|
||
ipa_parts: List[str] = []
|
||
for part in parts:
|
||
# A part may be multi-word like "secondary school"
|
||
words = part.split()
|
||
word_ipas: List[str] = []
|
||
for w in words:
|
||
clean_w = re.sub(r'[^a-zA-Z\'-]', '', w)
|
||
if not clean_w or len(clean_w) < 2:
|
||
continue
|
||
ipa = _lookup_ipa(clean_w, pronunciation)
|
||
if ipa:
|
||
word_ipas.append(ipa)
|
||
if word_ipas:
|
||
ipa_parts.append('[' + ' '.join(word_ipas) + ']')
|
||
|
||
if not ipa_parts:
|
||
return garbled_text
|
||
|
||
# Join with delimiter
|
||
result = ' – '.join(ipa_parts)
|
||
logger.debug(
|
||
"fix_ipa_continuation: '%s' → '%s' (headwords: '%s')",
|
||
garbled_text, result, headword_text,
|
||
)
|
||
return result
|
||
|
||
|
||
def _insert_headword_ipa(text: str, pronunciation: str = 'british') -> str:
|
||
"""Insert IPA for the first English headword in a long mixed-language line.
|
||
|
||
Unlike _insert_missing_ipa (for short column_en cells), this handles
|
||
column_text lines of any length. It only inserts IPA for the FIRST word
|
||
if that word:
|
||
- has no bracket following it already
|
||
- has an IPA entry in the dictionary
|
||
- is not a number/symbol prefix like "».55"
|
||
|
||
Returns the text with [ipa] inserted after the first word, or unchanged.
|
||
"""
|
||
if not IPA_AVAILABLE:
|
||
return text
|
||
if not text or not text.strip():
|
||
return text
|
||
|
||
words = text.strip().split()
|
||
if not words:
|
||
return text
|
||
|
||
# Check if text already starts with a bracket (IPA already present)
|
||
if len(words) > 1 and words[1].startswith(('[', '{', '(')):
|
||
return text
|
||
|
||
# Try the first few words (skip numeric prefixes like "».55", "0.56")
|
||
for i in range(min(3, len(words))):
|
||
w = words[i]
|
||
clean = re.sub(r'[^a-zA-ZäöüÄÖÜß\'-]', '', w)
|
||
if not clean or len(clean) < 2:
|
||
continue
|
||
if clean.lower() in _GRAMMAR_BRACKET_WORDS:
|
||
continue
|
||
ipa = _lookup_ipa(clean, pronunciation)
|
||
if ipa:
|
||
words[i] = f"{w} [{ipa}]"
|
||
return ' '.join(words)
|
||
# Stop at first real word even if no IPA found
|
||
break
|
||
|
||
return text
|
||
|
||
|
||
def fix_cell_phonetics(
|
||
cells: List[Dict[str, Any]],
|
||
pronunciation: str = 'british',
|
||
) -> List[Dict[str, Any]]:
|
||
"""Apply IPA phonetic fixes to cell texts for overlay mode.
|
||
|
||
In the normal pipeline, _fix_phonetic_brackets operates on vocab entries
|
||
(entry['english']). But the overlay reads cell['text'] directly, so
|
||
phonetic fixes must be applied to cells too.
|
||
|
||
Processing depends on column type:
|
||
- column_en: Full processing (replace garbled IPA + strip orphan brackets
|
||
+ insert missing IPA). Safe because these cells contain only English
|
||
headwords.
|
||
- column_text: Light processing (replace garbled IPA ONLY). No orphan
|
||
bracket stripping (brackets may be German content like "(probieren)")
|
||
and no IPA insertion (would add tokens and break overlay positioning).
|
||
"""
|
||
if not IPA_AVAILABLE:
|
||
return cells
|
||
|
||
ipa_col_types = {'column_en', 'column_text'}
|
||
replaced = 0
|
||
|
||
for cell in cells:
|
||
col_type = cell.get('col_type', '')
|
||
if col_type not in ipa_col_types:
|
||
continue
|
||
text = cell.get('text', '') or ''
|
||
if not text.strip():
|
||
continue
|
||
|
||
if col_type == 'column_en':
|
||
# Full processing: replace garbled IPA, strip orphan brackets.
|
||
new_text = _replace_phonetics_in_text(text, pronunciation, strip_orphans=True)
|
||
if new_text == text:
|
||
# Insert IPA when garbled phonetics exist OR when trailing
|
||
# non-dictionary words suggest garbled IPA in plain ASCII.
|
||
if _text_has_garbled_ipa(text) or _has_non_dict_trailing(text, pronunciation):
|
||
new_text = _insert_missing_ipa(text, pronunciation)
|
||
# Strip trailing garbled fragments after proper [IPA] brackets
|
||
# (e.g. "sea [sˈiː] si:" → "sea [sˈiː]")
|
||
if ']' in new_text:
|
||
new_text = _strip_post_bracket_garbled(new_text, pronunciation)
|
||
else:
|
||
# column_text: replace garbled IPA, no orphan stripping
|
||
new_text = _replace_phonetics_in_text(text, pronunciation, strip_orphans=False)
|
||
# Insert headword IPA ONLY if there's a gap in word_boxes
|
||
# suggesting Tesseract missed an IPA bracket on the page.
|
||
# Without gap evidence, the original page had no IPA.
|
||
if new_text == text:
|
||
wb = cell.get('word_boxes', [])
|
||
if _has_ipa_gap(text, wb):
|
||
inserted = _insert_headword_ipa(text, pronunciation)
|
||
if inserted != text:
|
||
new_text = inserted
|
||
_sync_word_boxes_after_ipa_insert(cell, text, new_text)
|
||
|
||
if new_text != text:
|
||
logger.debug(f"fix_cell_phonetics: '{text}' → '{new_text}'")
|
||
cell['text'] = new_text
|
||
replaced += 1
|
||
|
||
if replaced:
|
||
logger.info(f"fix_cell_phonetics: {replaced} IPA fixes in {len(cells)} cells")
|
||
return cells
|
||
|
||
|
||
def _has_ipa_gap(text: str, word_boxes: List[Dict]) -> bool:
|
||
"""Check if word_boxes show a gap where IPA brackets should be.
|
||
|
||
On a typical vocab page, the layout is:
|
||
headword [ipa] German translation
|
||
|
||
If Tesseract missed the IPA bracket, the gap between the headword
|
||
and the next word (German translation) is unusually large (>80px)
|
||
because the IPA occupied physical space on the page.
|
||
|
||
If no IPA was on the page (e.g. "be good at sth."), the words are
|
||
close together (<30px).
|
||
"""
|
||
if not word_boxes or len(word_boxes) < 2:
|
||
return False
|
||
|
||
tokens = text.split()
|
||
if not tokens:
|
||
return False
|
||
|
||
# Find the headword index: skip numeric prefixes like "».55", "0.56"
|
||
hw_box_idx = 0
|
||
for i, wb in enumerate(word_boxes):
|
||
wt = wb.get('text', '')
|
||
clean = re.sub(r'[^a-zA-ZäöüÄÖÜß]', '', wt)
|
||
if len(clean) >= 2:
|
||
hw_box_idx = i
|
||
break
|
||
|
||
if hw_box_idx >= len(word_boxes) - 1:
|
||
return False
|
||
|
||
# Check gap between headword and the next word_box
|
||
hw = word_boxes[hw_box_idx]
|
||
next_wb = word_boxes[hw_box_idx + 1]
|
||
gap = next_wb['left'] - (hw['left'] + hw['width'])
|
||
|
||
return gap > 80
|
||
|
||
|
||
def _sync_word_boxes_after_ipa_insert(
|
||
cell: Dict[str, Any],
|
||
old_text: str,
|
||
new_text: str,
|
||
) -> None:
|
||
"""Insert a synthetic word_box for an IPA token added by IPA insertion.
|
||
|
||
E.g. "challenge ..." → "challenge [tʃælɪndʒ] ..."
|
||
Adds a new word_box right after the headword's box so the 1:1
|
||
token-to-box mapping in the frontend overlay stays consistent.
|
||
"""
|
||
word_boxes = cell.get('word_boxes')
|
||
if not word_boxes:
|
||
return
|
||
|
||
old_tokens = old_text.split()
|
||
new_tokens = new_text.split()
|
||
|
||
if len(new_tokens) != len(old_tokens) + 1:
|
||
return # unexpected change, skip
|
||
|
||
# Find the inserted token by walking both lists in parallel.
|
||
# One token in new_tokens won't match — that's the inserted IPA.
|
||
insert_idx = -1
|
||
j = 0 # index into old_tokens
|
||
for i in range(len(new_tokens)):
|
||
if j < len(old_tokens) and new_tokens[i] == old_tokens[j]:
|
||
j += 1
|
||
else:
|
||
insert_idx = i
|
||
break
|
||
|
||
if insert_idx < 0 or insert_idx >= len(new_tokens):
|
||
return
|
||
|
||
ipa_token = new_tokens[insert_idx]
|
||
|
||
# The headword is at insert_idx - 1 in old_tokens (and word_boxes)
|
||
ref_idx = insert_idx - 1
|
||
if ref_idx < 0 or ref_idx >= len(word_boxes):
|
||
return
|
||
|
||
ref_box = word_boxes[ref_idx]
|
||
ipa_box = {
|
||
'text': ipa_token,
|
||
'left': ref_box['left'] + ref_box['width'] + 2,
|
||
'top': ref_box['top'],
|
||
'width': ref_box['width'],
|
||
'height': ref_box['height'],
|
||
'conf': ref_box.get('conf', 90),
|
||
}
|
||
word_boxes.insert(insert_idx, ipa_box)
|
||
|
||
|
||
def _assign_row_words_to_columns(
|
||
row: RowGeometry,
|
||
columns: List[PageRegion],
|
||
) -> Dict[int, List[Dict]]:
|
||
"""Assign each word in a row to exactly one column.
|
||
|
||
Uses a two-pass strategy:
|
||
1. Containment: if a word's center falls within a column's horizontal
|
||
bounds (with padding), assign it to that column.
|
||
2. Nearest center: for words not contained by any column, fall back to
|
||
nearest column center distance.
|
||
|
||
This prevents long sentences in wide columns (e.g. example) from having
|
||
their rightmost words stolen by an adjacent column.
|
||
|
||
Args:
|
||
row: Row with words (relative coordinates).
|
||
columns: Sorted list of columns (absolute coordinates).
|
||
|
||
Returns:
|
||
Dict mapping col_index → list of words assigned to that column.
|
||
"""
|
||
result: Dict[int, List[Dict]] = {i: [] for i in range(len(columns))}
|
||
|
||
if not row.words or not columns:
|
||
return result
|
||
|
||
left_x = row.x # content ROI left (absolute)
|
||
|
||
# Build non-overlapping column assignment ranges using midpoints.
|
||
# For adjacent columns, the boundary is the midpoint between them.
|
||
# This prevents words near column borders from being assigned to
|
||
# the wrong column (e.g. "We" at the start of an example sentence
|
||
# being stolen by the preceding DE column).
|
||
n = len(columns)
|
||
col_ranges_rel = [] # (assign_left, assign_right) per column
|
||
for ci, col in enumerate(columns):
|
||
col_left_rel = col.x - left_x
|
||
col_right_rel = col_left_rel + col.width
|
||
|
||
# Left boundary: midpoint to previous column, or 0
|
||
if ci == 0:
|
||
assign_left = 0
|
||
else:
|
||
prev_right = columns[ci - 1].x - left_x + columns[ci - 1].width
|
||
assign_left = (prev_right + col_left_rel) / 2
|
||
|
||
# Right boundary: midpoint to next column, or infinity (row width)
|
||
if ci == n - 1:
|
||
assign_right = row.width + 100 # generous for last column
|
||
else:
|
||
next_left = columns[ci + 1].x - left_x
|
||
assign_right = (col_right_rel + next_left) / 2
|
||
|
||
col_ranges_rel.append((assign_left, assign_right))
|
||
|
||
for w in row.words:
|
||
w_left = w['left']
|
||
w_right = w_left + w['width']
|
||
w_center_x = w_left + w['width'] / 2
|
||
|
||
# Primary: overlap-based matching — assign to column with most overlap.
|
||
# This is more robust than center-based for narrow columns (page_ref)
|
||
# where the last character's center may fall into the next column.
|
||
best_col = -1
|
||
best_overlap = 0
|
||
for ci, col in enumerate(columns):
|
||
col_left_rel = col.x - left_x
|
||
col_right_rel = col_left_rel + col.width
|
||
overlap = max(0, min(w_right, col_right_rel) - max(w_left, col_left_rel))
|
||
if overlap > best_overlap:
|
||
best_overlap = overlap
|
||
best_col = ci
|
||
|
||
if best_col >= 0 and best_overlap > 0:
|
||
result[best_col].append(w)
|
||
else:
|
||
# Fallback: center-based range matching
|
||
assigned = False
|
||
for ci, (al, ar) in enumerate(col_ranges_rel):
|
||
if al <= w_center_x < ar:
|
||
result[ci].append(w)
|
||
assigned = True
|
||
break
|
||
|
||
if not assigned:
|
||
# Last resort: nearest column center
|
||
best_col = 0
|
||
col_left_0 = columns[0].x - left_x
|
||
best_dist = abs(w_center_x - (col_left_0 + columns[0].width / 2))
|
||
for ci in range(1, n):
|
||
col_left = columns[ci].x - left_x
|
||
dist = abs(w_center_x - (col_left + columns[ci].width / 2))
|
||
if dist < best_dist:
|
||
best_dist = dist
|
||
best_col = ci
|
||
result[best_col].append(w)
|
||
|
||
return result
|
||
|
||
|
||
# Regex: at least 2 consecutive letters (Latin + umlauts + accents)
|
||
_RE_REAL_WORD = re.compile(r'[a-zA-ZäöüÄÖÜßéèêëàâîïôûùç]{2,}')
|
||
_RE_ALPHA = re.compile(r'[a-zA-ZäöüÄÖÜßéèêëàâîïôûùç]')
|
||
|
||
# Common short EN/DE words (2-3 chars). Tokens at the end of a cell
|
||
# that do NOT appear here are treated as trailing OCR noise.
|
||
_COMMON_SHORT_WORDS: set = {
|
||
# EN 1-2 letter
|
||
'a', 'i', 'am', 'an', 'as', 'at', 'be', 'by', 'do', 'go', 'he',
|
||
'if', 'in', 'is', 'it', 'me', 'my', 'no', 'of', 'oh', 'ok', 'on',
|
||
'or', 'so', 'to', 'up', 'us', 'we',
|
||
# EN 3 letter
|
||
'ace', 'act', 'add', 'age', 'ago', 'aid', 'aim', 'air', 'all',
|
||
'and', 'ant', 'any', 'ape', 'arc', 'are', 'ark', 'arm', 'art',
|
||
'ask', 'ate', 'axe', 'bad', 'bag', 'ban', 'bar', 'bat', 'bay',
|
||
'bed', 'bee', 'bet', 'big', 'bin', 'bit', 'bow', 'box', 'boy',
|
||
'bud', 'bug', 'bun', 'bus', 'but', 'buy', 'cab', 'can', 'cap',
|
||
'car', 'cat', 'cop', 'cow', 'cry', 'cub', 'cup', 'cut', 'dad',
|
||
'dam', 'day', 'den', 'dew', 'did', 'die', 'dig', 'dim', 'dip',
|
||
'dog', 'dot', 'dry', 'due', 'dug', 'dye', 'ear', 'eat', 'eel',
|
||
'egg', 'elm', 'end', 'era', 'eve', 'ewe', 'eye', 'fan', 'far',
|
||
'fat', 'fax', 'fed', 'fee', 'few', 'fig', 'fin', 'fir', 'fit',
|
||
'fix', 'fly', 'foe', 'fog', 'for', 'fox', 'fry', 'fun', 'fur',
|
||
'gag', 'gap', 'gas', 'get', 'god', 'got', 'gum', 'gun', 'gut',
|
||
'guy', 'gym', 'had', 'ham', 'has', 'hat', 'hay', 'hen', 'her',
|
||
'hid', 'him', 'hip', 'his', 'hit', 'hog', 'hop', 'hot', 'how',
|
||
'hue', 'hug', 'hum', 'hut', 'ice', 'icy', 'ill', 'imp', 'ink',
|
||
'inn', 'ion', 'its', 'ivy', 'jam', 'jar', 'jaw', 'jay', 'jet',
|
||
'jig', 'job', 'jog', 'joy', 'jug', 'key', 'kid', 'kin', 'kit',
|
||
'lab', 'lad', 'lag', 'lap', 'law', 'lay', 'led', 'leg', 'let',
|
||
'lid', 'lie', 'lip', 'lit', 'log', 'lot', 'low', 'mad', 'man',
|
||
'map', 'mat', 'maw', 'may', 'men', 'met', 'mid', 'mix', 'mob',
|
||
'mog', 'mom', 'mop', 'mow', 'mrs', 'mud', 'mug', 'mum', 'nag',
|
||
'nap', 'net', 'new', 'nod', 'nor', 'not', 'now', 'nun', 'nut',
|
||
'oak', 'oar', 'oat', 'odd', 'off', 'oft', 'oil', 'old', 'one',
|
||
'opt', 'orb', 'ore', 'our', 'out', 'owe', 'owl', 'own', 'pad',
|
||
'pal', 'pan', 'pat', 'paw', 'pay', 'pea', 'peg', 'pen', 'per',
|
||
'pet', 'pie', 'pig', 'pin', 'pit', 'ply', 'pod', 'pop', 'pot',
|
||
'pro', 'pry', 'pub', 'pug', 'pun', 'pup', 'put', 'rag', 'ram',
|
||
'ran', 'rap', 'rat', 'raw', 'ray', 'red', 'ref', 'rib', 'rid',
|
||
'rig', 'rim', 'rip', 'rob', 'rod', 'roe', 'rot', 'row', 'rub',
|
||
'rug', 'rum', 'run', 'rut', 'rye', 'sac', 'sad', 'sag', 'sap',
|
||
'sat', 'saw', 'say', 'sea', 'set', 'sew', 'she', 'shy', 'sin',
|
||
'sip', 'sir', 'sis', 'sit', 'six', 'ski', 'sky', 'sly', 'sob',
|
||
'sod', 'son', 'sop', 'sot', 'sow', 'soy', 'spa', 'spy', 'sty',
|
||
'sub', 'sue', 'sum', 'sun', 'sup', 'tab', 'tad', 'tag', 'tan',
|
||
'tap', 'tar', 'tax', 'tea', 'ten', 'the', 'tie', 'tin', 'tip',
|
||
'toe', 'ton', 'too', 'top', 'tow', 'toy', 'try', 'tub', 'tug',
|
||
'two', 'urn', 'use', 'van', 'vat', 'vet', 'via', 'vie', 'vim',
|
||
'vow', 'wag', 'war', 'was', 'wax', 'way', 'web', 'wed', 'wet',
|
||
'who', 'why', 'wig', 'win', 'wit', 'woe', 'wok', 'won', 'woo',
|
||
'wow', 'yam', 'yap', 'yaw', 'yea', 'yes', 'yet', 'yew', 'you',
|
||
'zap', 'zip', 'zoo',
|
||
# DE 2-3 letter
|
||
'ab', 'da', 'du', 'ei', 'er', 'es', 'ja', 'ob', 'um', 'zu',
|
||
'als', 'alt', 'auf', 'aus', 'bei', 'bin', 'bis', 'das', 'dem',
|
||
'den', 'der', 'des', 'die', 'dir', 'ehe', 'ein', 'eng', 'gar',
|
||
'gib', 'gut', 'hat', 'her', 'ich', 'ihm', 'ihr', 'ins', 'ist',
|
||
'mal', 'man', 'mir', 'mit', 'nah', 'neu', 'nie', 'nur', 'nun',
|
||
'ort', 'rad', 'rat', 'rot', 'ruf', 'ruh', 'sei', 'sie', 'tag',
|
||
'tal', 'tat', 'tee', 'tor', 'tun', 'tut', 'uns', 'vom', 'von',
|
||
'vor', 'war', 'was', 'weg', 'wem', 'wen', 'wer', 'wie', 'wir',
|
||
'wut', 'zum', 'zur',
|
||
}
|
||
|
||
# Known abbreviations found in EN/DE textbooks and dictionaries.
|
||
# Stored WITHOUT trailing period (the noise filter strips periods).
|
||
# These rescue tokens like "sth." / "sb." / "usw." from being deleted.
|
||
_KNOWN_ABBREVIATIONS: set = {
|
||
# EN dictionary meta-words
|
||
'sth', 'sb', 'smth', 'smb', 'sbd',
|
||
# EN general
|
||
'etc', 'eg', 'ie', 'esp', 'approx', 'dept', 'govt', 'corp',
|
||
'inc', 'ltd', 'vs', 'cf', 'ibid', 'nb', 'ps', 'asap',
|
||
# EN references / textbook
|
||
'p', 'pp', 'ch', 'chap', 'fig', 'figs', 'no', 'nos', 'nr',
|
||
'vol', 'vols', 'ed', 'eds', 'rev', 'repr', 'trans', 'ff',
|
||
'fn', 'sec', 'par', 'para', 'app', 'abbr', 'ex', 'exs',
|
||
'ans', 'wb', 'tb', 'vocab',
|
||
# EN parts of speech / grammar
|
||
'adj', 'adv', 'prep', 'conj', 'pron', 'det', 'art', 'interj',
|
||
'aux', 'mod', 'inf', 'pt', 'pres', 'pret', 'ger',
|
||
'sg', 'pl', 'sing', 'irreg', 'reg', 'intr', 'intrans',
|
||
'refl', 'pass', 'imper', 'subj', 'ind', 'perf', 'fut',
|
||
'attr', 'pred', 'comp', 'superl', 'pos', 'neg',
|
||
'lit', 'colloq', 'sl', 'dial', 'arch', 'obs', 'fml', 'infml',
|
||
'syn', 'ant', 'opp', 'var', 'orig',
|
||
# EN titles
|
||
'mr', 'mrs', 'ms', 'dr', 'prof', 'st', 'jr', 'sr',
|
||
# EN pronunciation
|
||
'br', 'am', 'brit', 'amer',
|
||
# EN units
|
||
'hr', 'hrs', 'min', 'km', 'cm', 'mm', 'kg', 'mg', 'ml',
|
||
# DE general
|
||
'usw', 'bzw', 'evtl', 'ggf', 'ggfs', 'sog', 'eigtl', 'allg',
|
||
'bes', 'insb', 'insbes', 'bspw', 'ca',
|
||
'od', 'ua', 'sa', 'vgl', 'zb', 'dh', 'zt', 'idr',
|
||
'inkl', 'exkl', 'zzgl', 'abzgl',
|
||
# DE references
|
||
'abs', 'abschn', 'abt', 'anm', 'ausg', 'aufl', 'bd', 'bde',
|
||
'bearb', 'ebd', 'hrsg', 'hg', 'jg', 'jh', 'jhd', 'kap',
|
||
's', 'sp', 'zit', 'zs', 'vlg',
|
||
# DE grammar
|
||
'nom', 'akk', 'dat', 'gen', 'konj', 'subst', 'obj',
|
||
'praet', 'imp', 'part', 'mask', 'fem', 'neutr',
|
||
'trennb', 'untrennb', 'ugs', 'geh', 'pej',
|
||
# DE regional
|
||
'nordd', 'österr', 'schweiz',
|
||
# Linguistic
|
||
'lex', 'morph', 'phon', 'phonet', 'sem', 'synt', 'etym',
|
||
'deriv', 'pref', 'suf', 'suff', 'dim', 'coll',
|
||
'count', 'uncount', 'indef', 'def', 'poss', 'demon',
|
||
}
|
||
|
||
|
||
def _is_noise_tail_token(token: str) -> bool:
|
||
"""Check if a token at the END of cell text is trailing OCR noise.
|
||
|
||
Trailing fragments are very common OCR artifacts from image edges,
|
||
borders, and neighbouring cells. This is more aggressive than a
|
||
general word filter: any short token that isn't in the dictionary
|
||
of common EN/DE words is considered noise.
|
||
|
||
Examples of noise: "Es)", "3", "ee", "B"
|
||
Examples to keep: "sister.", "cupcakes.", "...", "mice", "[eg]"
|
||
"""
|
||
t = token.strip()
|
||
if not t:
|
||
return True
|
||
|
||
# Keep ellipsis
|
||
if t in ('...', '…'):
|
||
return False
|
||
|
||
# Keep phonetic brackets: [eg], [maus], ["a:mand], serva], etc.
|
||
if t.startswith('[') or t.startswith('["') or t.startswith("['"):
|
||
return False
|
||
if t.endswith(']'):
|
||
return False
|
||
|
||
# Pure non-alpha → noise ("3", ")", "|")
|
||
alpha_chars = _RE_ALPHA.findall(t)
|
||
if not alpha_chars:
|
||
return True
|
||
|
||
# Extract only alpha characters for dictionary lookup
|
||
cleaned = ''.join(alpha_chars)
|
||
|
||
# Known abbreviations (e.g. "sth.", "usw.", "adj.") — always keep
|
||
if cleaned.lower() in _KNOWN_ABBREVIATIONS:
|
||
return False
|
||
|
||
# Strip normal trailing punctuation before checking for internal noise.
|
||
stripped_punct = re.sub(r'[.,;:!?]+$', '', t) # "cupcakes." → "cupcakes"
|
||
t_check = stripped_punct if stripped_punct else t
|
||
|
||
# Check for legitimate punctuation patterns vs. real noise.
|
||
# Legitimate: "(auf)", "under-", "e.g.", "(on)", "selbst)", "(wir",
|
||
# "(Salat-)Gurke", "Tanz(veranstaltung)", "(zer)brechen"
|
||
# Noise: "3d", "B|", "x7"
|
||
# Strategy: strip common dictionary punctuation (parens, hyphens, slashes),
|
||
# THEN check if residual contains only alpha characters.
|
||
t_inner = t_check
|
||
# Remove all parentheses, hyphens, slashes, and dots — these are normal
|
||
# in dictionary entries: "(Salat-)Gurke", "Tanz(veranstaltung)",
|
||
# "(zer)brechen", "wir/uns", "e.g."
|
||
t_inner = re.sub(r'[()\-/.,;:!?]', '', t_inner)
|
||
# Now check: does the inner form still have non-alpha noise?
|
||
inner_alpha = ''.join(_RE_ALPHA.findall(t_inner))
|
||
has_internal_noise = (len(t_inner) > len(inner_alpha)) if t_inner else False
|
||
|
||
# Long alpha words (4+ chars) without internal noise are likely real
|
||
if len(cleaned) >= 4 and not has_internal_noise:
|
||
return False
|
||
|
||
# Short words: check dictionary (uses only alpha chars)
|
||
if cleaned.lower() in _COMMON_SHORT_WORDS and not has_internal_noise:
|
||
return False
|
||
|
||
# Default: short or suspicious → noise
|
||
return True
|
||
|
||
|
||
def _is_garbage_text(text: str) -> bool:
|
||
"""Check if entire cell text is OCR garbage from image areas.
|
||
|
||
Garbage text = no recognizable dictionary word. Catches
|
||
"(ci]oeu", "uanoaain." etc.
|
||
"""
|
||
words = _RE_REAL_WORD.findall(text)
|
||
if not words:
|
||
# Check if any token is a known abbreviation (e.g. "e.g.")
|
||
alpha_only = ''.join(_RE_ALPHA.findall(text)).lower()
|
||
if alpha_only in _KNOWN_ABBREVIATIONS:
|
||
return False
|
||
return True
|
||
|
||
for w in words:
|
||
wl = w.lower()
|
||
# Known short word or abbreviation → not garbage
|
||
if wl in _COMMON_SHORT_WORDS or wl in _KNOWN_ABBREVIATIONS:
|
||
return False
|
||
# Long word (>= 4 chars): check vowel/consonant ratio.
|
||
# Real EN/DE words have 20-60% vowels. Garbage like "uanoaain"
|
||
# or "cioeu" has unusual ratios (too many or too few vowels).
|
||
if len(wl) >= 4:
|
||
vowels = sum(1 for c in wl if c in 'aeiouäöü')
|
||
ratio = vowels / len(wl)
|
||
if 0.15 <= ratio <= 0.65:
|
||
return False # plausible vowel ratio → real word
|
||
|
||
return True
|
||
|
||
|
||
def _clean_cell_text(text: str) -> str:
|
||
"""Remove OCR noise from cell text. Generic filters:
|
||
|
||
1. If the entire text has no real alphabetic word (>= 2 letters), clear.
|
||
2. If the entire text is garbage (no dictionary word), clear.
|
||
3. Strip trailing noise tokens from the end of the text.
|
||
"""
|
||
stripped = text.strip()
|
||
if not stripped:
|
||
return ''
|
||
|
||
# --- Filter 1: No real word at all ---
|
||
if not _RE_REAL_WORD.search(stripped):
|
||
# Exception: dotted abbreviations like "e.g.", "z.B.", "i.e."
|
||
alpha_only = ''.join(_RE_ALPHA.findall(stripped)).lower()
|
||
if alpha_only not in _KNOWN_ABBREVIATIONS:
|
||
return ''
|
||
|
||
# --- Filter 2: Entire text is garbage ---
|
||
if _is_garbage_text(stripped):
|
||
return ''
|
||
|
||
# --- Filter 3: Strip trailing noise tokens ---
|
||
tokens = stripped.split()
|
||
while tokens and _is_noise_tail_token(tokens[-1]):
|
||
tokens.pop()
|
||
if not tokens:
|
||
return ''
|
||
|
||
return ' '.join(tokens)
|
||
|
||
|
||
def _clean_cell_text_lite(text: str) -> str:
|
||
"""Simplified noise filter for cell-first OCR (isolated cell crops).
|
||
|
||
Since each cell is OCR'd in isolation (no neighbour content visible),
|
||
trailing-noise stripping is unnecessary. Only 2 filters remain:
|
||
|
||
1. No real alphabetic word (>= 2 letters) and not a known abbreviation → empty.
|
||
2. Entire text is garbage (no dictionary word) → empty.
|
||
"""
|
||
stripped = text.strip()
|
||
if not stripped:
|
||
return ''
|
||
|
||
# --- Filter 1: No real word at all ---
|
||
if not _RE_REAL_WORD.search(stripped):
|
||
alpha_only = ''.join(_RE_ALPHA.findall(stripped)).lower()
|
||
if alpha_only not in _KNOWN_ABBREVIATIONS:
|
||
return ''
|
||
|
||
# --- Filter 2: Entire text is garbage ---
|
||
if _is_garbage_text(stripped):
|
||
return ''
|
||
|
||
return stripped
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Bold detection via stroke-width analysis (relative / page-level)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _measure_stroke_width(gray_crop: np.ndarray) -> float:
|
||
"""Measure mean stroke width in a binarised cell crop.
|
||
|
||
Returns a DPI-normalised value (mean stroke width as % of crop height),
|
||
or 0.0 if measurement is not possible.
|
||
"""
|
||
if gray_crop is None or gray_crop.size == 0:
|
||
return 0.0
|
||
h, w = gray_crop.shape[:2]
|
||
if h < 10 or w < 10:
|
||
return 0.0
|
||
|
||
# Binarise: text = white (255), background = black (0)
|
||
_, bw = cv2.threshold(gray_crop, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
|
||
if cv2.countNonZero(bw) < 20:
|
||
return 0.0
|
||
|
||
# Distance transform: value at each white pixel = distance to nearest black
|
||
dist = cv2.distanceTransform(bw, cv2.DIST_L2, 3)
|
||
|
||
# Skeleton via morphological thinning
|
||
kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
|
||
thin = bw.copy()
|
||
for _ in range(max(1, min(h, w) // 6)):
|
||
eroded = cv2.erode(thin, kernel)
|
||
if cv2.countNonZero(eroded) < 5:
|
||
break
|
||
thin = eroded
|
||
|
||
skeleton_pts = thin > 0
|
||
if not np.any(skeleton_pts):
|
||
return 0.0
|
||
mean_stroke = float(np.mean(dist[skeleton_pts]))
|
||
return mean_stroke / max(h, 1) * 100 # normalised: % of cell height
|
||
|
||
|
||
def _classify_bold_cells(cells: List[Dict[str, Any]], ocr_img: Optional[np.ndarray],
|
||
img_w: int, img_h: int) -> None:
|
||
"""Two-pass bold detection: measure all cells, then compare against median.
|
||
|
||
Cells with stroke width > 1.4× the page median are marked as bold.
|
||
This adapts automatically to font, DPI and scan quality.
|
||
Modifies cells in-place (sets 'is_bold' key).
|
||
"""
|
||
if ocr_img is None:
|
||
return
|
||
|
||
# Pass 1: measure stroke width for every cell with text
|
||
metrics: List[float] = []
|
||
cell_strokes: List[float] = []
|
||
for cell in cells:
|
||
sw = 0.0
|
||
if cell.get('text', '').strip():
|
||
bp = cell['bbox_px']
|
||
y1 = max(0, bp['y'])
|
||
y2 = min(img_h, bp['y'] + bp['h'])
|
||
x1 = max(0, bp['x'])
|
||
x2 = min(img_w, bp['x'] + bp['w'])
|
||
if y2 > y1 and x2 > x1:
|
||
sw = _measure_stroke_width(ocr_img[y1:y2, x1:x2])
|
||
cell_strokes.append(sw)
|
||
if sw > 0:
|
||
metrics.append(sw)
|
||
|
||
if len(metrics) < 3:
|
||
# Too few cells to compare — leave all as non-bold
|
||
return
|
||
|
||
median_sw = float(np.median(metrics))
|
||
if median_sw <= 0:
|
||
return
|
||
|
||
# Pass 2: cells significantly above median → bold
|
||
for cell, sw in zip(cells, cell_strokes):
|
||
cell['is_bold'] = sw > 0 and (sw / median_sw) > 1.4
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|