New modules: - tesseract_vocab_extractor.py: Bounding-box OCR with multi-PSM pipeline - grid_detection_service.py: CV-based grid/table detection for worksheets - vocab_session_store.py: PostgreSQL persistence for vocab sessions - trocr_api.py: TrOCR handwriting recognition endpoint - dsfa_rag_api.py + dsfa_corpus_ingestion.py: DSFA RAG corpus search Changes: - Dockerfile: Install tesseract-ocr + deu/eng language packs - requirements.txt: Add PyMuPDF, pytesseract, Pillow - main.py: Register new routers, init DB pools + Qdrant collections Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
347 lines
11 KiB
Python
347 lines
11 KiB
Python
"""
|
|
Tesseract-based OCR extraction with word-level bounding boxes.
|
|
|
|
Uses Tesseract for spatial information (WHERE text is) while
|
|
the Vision LLM handles semantic understanding (WHAT the text means).
|
|
|
|
Tesseract runs natively on ARM64 via Debian's apt package.
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from difflib import SequenceMatcher
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import pytesseract
|
|
from PIL import Image
|
|
TESSERACT_AVAILABLE = True
|
|
except ImportError:
|
|
TESSERACT_AVAILABLE = False
|
|
logger.warning("pytesseract or Pillow not installed - Tesseract OCR unavailable")
|
|
|
|
|
|
async def extract_bounding_boxes(image_bytes: bytes, lang: str = "eng+deu") -> dict:
|
|
"""Run Tesseract OCR and return word-level bounding boxes.
|
|
|
|
Args:
|
|
image_bytes: PNG/JPEG image as bytes.
|
|
lang: Tesseract language string (e.g. "eng+deu").
|
|
|
|
Returns:
|
|
Dict with 'words' list and 'image_width'/'image_height'.
|
|
"""
|
|
if not TESSERACT_AVAILABLE:
|
|
return {"words": [], "image_width": 0, "image_height": 0, "error": "Tesseract not available"}
|
|
|
|
image = Image.open(io.BytesIO(image_bytes))
|
|
data = pytesseract.image_to_data(image, lang=lang, output_type=pytesseract.Output.DICT)
|
|
|
|
words = []
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 20:
|
|
continue
|
|
words.append({
|
|
"text": text,
|
|
"left": data['left'][i],
|
|
"top": data['top'][i],
|
|
"width": data['width'][i],
|
|
"height": data['height'][i],
|
|
"conf": conf,
|
|
"block_num": data['block_num'][i],
|
|
"par_num": data['par_num'][i],
|
|
"line_num": data['line_num'][i],
|
|
"word_num": data['word_num'][i],
|
|
})
|
|
|
|
return {
|
|
"words": words,
|
|
"image_width": image.width,
|
|
"image_height": image.height,
|
|
}
|
|
|
|
|
|
def group_words_into_lines(words: List[dict], y_tolerance_px: int = 15) -> List[List[dict]]:
|
|
"""Group words by their Y position into lines.
|
|
|
|
Args:
|
|
words: List of word dicts from extract_bounding_boxes.
|
|
y_tolerance_px: Max pixel distance to consider words on the same line.
|
|
|
|
Returns:
|
|
List of lines, each line is a list of words sorted by X position.
|
|
"""
|
|
if not words:
|
|
return []
|
|
|
|
# Sort by Y then X
|
|
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
|
|
|
|
lines: List[List[dict]] = []
|
|
current_line: List[dict] = [sorted_words[0]]
|
|
current_y = sorted_words[0]['top']
|
|
|
|
for word in sorted_words[1:]:
|
|
if abs(word['top'] - current_y) <= y_tolerance_px:
|
|
current_line.append(word)
|
|
else:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
current_line = [word]
|
|
current_y = word['top']
|
|
|
|
if current_line:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
|
|
return lines
|
|
|
|
|
|
def detect_columns(lines: List[List[dict]], image_width: int) -> Dict[str, Any]:
|
|
"""Detect column boundaries from word positions.
|
|
|
|
Typical vocab table: Left=English, Middle=German, Right=Example sentences.
|
|
|
|
Returns:
|
|
Dict with column boundaries and type assignments.
|
|
"""
|
|
if not lines or image_width == 0:
|
|
return {"columns": [], "column_types": []}
|
|
|
|
# Collect all word X positions
|
|
all_x_positions = []
|
|
for line in lines:
|
|
for word in line:
|
|
all_x_positions.append(word['left'])
|
|
|
|
if not all_x_positions:
|
|
return {"columns": [], "column_types": []}
|
|
|
|
# Find X-position clusters (column starts)
|
|
all_x_positions.sort()
|
|
|
|
# Simple gap-based column detection
|
|
min_gap = image_width * 0.08 # 8% of page width = column gap
|
|
clusters = []
|
|
current_cluster = [all_x_positions[0]]
|
|
|
|
for x in all_x_positions[1:]:
|
|
if x - current_cluster[-1] > min_gap:
|
|
clusters.append(current_cluster)
|
|
current_cluster = [x]
|
|
else:
|
|
current_cluster.append(x)
|
|
|
|
if current_cluster:
|
|
clusters.append(current_cluster)
|
|
|
|
# Each cluster represents a column start
|
|
columns = []
|
|
for cluster in clusters:
|
|
col_start = min(cluster)
|
|
columns.append({
|
|
"x_start": col_start,
|
|
"x_start_pct": col_start / image_width * 100,
|
|
"word_count": len(cluster),
|
|
})
|
|
|
|
# Assign column types based on position (left→right: EN, DE, Example)
|
|
type_map = ["english", "german", "example"]
|
|
column_types = []
|
|
for i, col in enumerate(columns):
|
|
if i < len(type_map):
|
|
column_types.append(type_map[i])
|
|
else:
|
|
column_types.append("unknown")
|
|
|
|
return {
|
|
"columns": columns,
|
|
"column_types": column_types,
|
|
}
|
|
|
|
|
|
def words_to_vocab_entries(lines: List[List[dict]], columns: List[dict],
|
|
column_types: List[str], image_width: int,
|
|
image_height: int) -> List[dict]:
|
|
"""Convert grouped words into vocabulary entries using column positions.
|
|
|
|
Args:
|
|
lines: Grouped word lines from group_words_into_lines.
|
|
columns: Column boundaries from detect_columns.
|
|
column_types: Column type assignments.
|
|
image_width: Image width in pixels.
|
|
image_height: Image height in pixels.
|
|
|
|
Returns:
|
|
List of vocabulary entry dicts with english/german/example fields.
|
|
"""
|
|
if not columns or not lines:
|
|
return []
|
|
|
|
# Build column boundaries for word assignment
|
|
col_boundaries = []
|
|
for i, col in enumerate(columns):
|
|
start = col['x_start']
|
|
if i + 1 < len(columns):
|
|
end = columns[i + 1]['x_start']
|
|
else:
|
|
end = image_width
|
|
col_boundaries.append((start, end, column_types[i] if i < len(column_types) else "unknown"))
|
|
|
|
entries = []
|
|
for line in lines:
|
|
entry = {"english": "", "german": "", "example": ""}
|
|
line_words_by_col: Dict[str, List[str]] = {"english": [], "german": [], "example": []}
|
|
line_bbox: Dict[str, Optional[dict]] = {}
|
|
|
|
for word in line:
|
|
word_center_x = word['left'] + word['width'] / 2
|
|
assigned_type = "unknown"
|
|
for start, end, col_type in col_boundaries:
|
|
if start <= word_center_x < end:
|
|
assigned_type = col_type
|
|
break
|
|
|
|
if assigned_type in line_words_by_col:
|
|
line_words_by_col[assigned_type].append(word['text'])
|
|
# Track bounding box for the column
|
|
if assigned_type not in line_bbox or line_bbox[assigned_type] is None:
|
|
line_bbox[assigned_type] = {
|
|
"left": word['left'],
|
|
"top": word['top'],
|
|
"right": word['left'] + word['width'],
|
|
"bottom": word['top'] + word['height'],
|
|
}
|
|
else:
|
|
bb = line_bbox[assigned_type]
|
|
bb['left'] = min(bb['left'], word['left'])
|
|
bb['top'] = min(bb['top'], word['top'])
|
|
bb['right'] = max(bb['right'], word['left'] + word['width'])
|
|
bb['bottom'] = max(bb['bottom'], word['top'] + word['height'])
|
|
|
|
for col_type in ["english", "german", "example"]:
|
|
if line_words_by_col[col_type]:
|
|
entry[col_type] = " ".join(line_words_by_col[col_type])
|
|
if line_bbox.get(col_type):
|
|
bb = line_bbox[col_type]
|
|
entry[f"{col_type}_bbox"] = {
|
|
"x_pct": bb['left'] / image_width * 100,
|
|
"y_pct": bb['top'] / image_height * 100,
|
|
"w_pct": (bb['right'] - bb['left']) / image_width * 100,
|
|
"h_pct": (bb['bottom'] - bb['top']) / image_height * 100,
|
|
}
|
|
|
|
# Only add if at least one column has content
|
|
if entry["english"] or entry["german"]:
|
|
entries.append(entry)
|
|
|
|
return entries
|
|
|
|
|
|
def match_positions_to_vocab(tess_words: List[dict], llm_vocab: List[dict],
|
|
image_w: int, image_h: int,
|
|
threshold: float = 0.6) -> List[dict]:
|
|
"""Match Tesseract bounding boxes to LLM vocabulary entries.
|
|
|
|
For each LLM vocab entry, find the best-matching Tesseract word
|
|
and attach its bounding box coordinates.
|
|
|
|
Args:
|
|
tess_words: Word list from Tesseract with pixel coordinates.
|
|
llm_vocab: Vocabulary list from Vision LLM.
|
|
image_w: Image width in pixels.
|
|
image_h: Image height in pixels.
|
|
threshold: Minimum similarity ratio for a match.
|
|
|
|
Returns:
|
|
llm_vocab list with bbox_x_pct/bbox_y_pct/bbox_w_pct/bbox_h_pct added.
|
|
"""
|
|
if not tess_words or not llm_vocab or image_w == 0 or image_h == 0:
|
|
return llm_vocab
|
|
|
|
for entry in llm_vocab:
|
|
english = entry.get("english", "").lower().strip()
|
|
german = entry.get("german", "").lower().strip()
|
|
|
|
if not english and not german:
|
|
continue
|
|
|
|
# Try to match English word first, then German
|
|
for field in ["english", "german"]:
|
|
search_text = entry.get(field, "").lower().strip()
|
|
if not search_text:
|
|
continue
|
|
|
|
best_word = None
|
|
best_ratio = 0.0
|
|
|
|
for word in tess_words:
|
|
ratio = SequenceMatcher(None, search_text, word['text'].lower()).ratio()
|
|
if ratio > best_ratio:
|
|
best_ratio = ratio
|
|
best_word = word
|
|
|
|
if best_word and best_ratio >= threshold:
|
|
entry[f"bbox_x_pct"] = best_word['left'] / image_w * 100
|
|
entry[f"bbox_y_pct"] = best_word['top'] / image_h * 100
|
|
entry[f"bbox_w_pct"] = best_word['width'] / image_w * 100
|
|
entry[f"bbox_h_pct"] = best_word['height'] / image_h * 100
|
|
entry["bbox_match_field"] = field
|
|
entry["bbox_match_ratio"] = round(best_ratio, 3)
|
|
break # Found a match, no need to try the other field
|
|
|
|
return llm_vocab
|
|
|
|
|
|
async def run_tesseract_pipeline(image_bytes: bytes, lang: str = "eng+deu") -> dict:
|
|
"""Full Tesseract pipeline: extract words, group lines, detect columns, build vocab.
|
|
|
|
Args:
|
|
image_bytes: PNG/JPEG image as bytes.
|
|
lang: Tesseract language string.
|
|
|
|
Returns:
|
|
Dict with 'vocabulary', 'words', 'lines', 'columns', 'image_width', 'image_height'.
|
|
"""
|
|
# Step 1: Extract bounding boxes
|
|
bbox_data = await extract_bounding_boxes(image_bytes, lang=lang)
|
|
|
|
if bbox_data.get("error"):
|
|
return bbox_data
|
|
|
|
words = bbox_data["words"]
|
|
image_w = bbox_data["image_width"]
|
|
image_h = bbox_data["image_height"]
|
|
|
|
# Step 2: Group into lines
|
|
lines = group_words_into_lines(words)
|
|
|
|
# Step 3: Detect columns
|
|
col_info = detect_columns(lines, image_w)
|
|
|
|
# Step 4: Build vocabulary entries
|
|
vocab = words_to_vocab_entries(
|
|
lines,
|
|
col_info["columns"],
|
|
col_info["column_types"],
|
|
image_w,
|
|
image_h,
|
|
)
|
|
|
|
return {
|
|
"vocabulary": vocab,
|
|
"words": words,
|
|
"lines_count": len(lines),
|
|
"columns": col_info["columns"],
|
|
"column_types": col_info["column_types"],
|
|
"image_width": image_w,
|
|
"image_height": image_h,
|
|
"word_count": len(words),
|
|
}
|