This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/klausur-service/backend/tesseract_vocab_extractor.py
BreakPilot Dev 53219e3eaf feat(klausur-service): Add Tesseract OCR, DSFA RAG, TrOCR, grid detection and vocab session store
New modules:
- tesseract_vocab_extractor.py: Bounding-box OCR with multi-PSM pipeline
- grid_detection_service.py: CV-based grid/table detection for worksheets
- vocab_session_store.py: PostgreSQL persistence for vocab sessions
- trocr_api.py: TrOCR handwriting recognition endpoint
- dsfa_rag_api.py + dsfa_corpus_ingestion.py: DSFA RAG corpus search

Changes:
- Dockerfile: Install tesseract-ocr + deu/eng language packs
- requirements.txt: Add PyMuPDF, pytesseract, Pillow
- main.py: Register new routers, init DB pools + Qdrant collections

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 00:00:19 +01:00

347 lines
11 KiB
Python

"""
Tesseract-based OCR extraction with word-level bounding boxes.
Uses Tesseract for spatial information (WHERE text is) while
the Vision LLM handles semantic understanding (WHAT the text means).
Tesseract runs natively on ARM64 via Debian's apt package.
Lizenz: Apache 2.0 (kommerziell nutzbar)
"""
import io
import logging
from typing import List, Dict, Any, Optional
from difflib import SequenceMatcher
logger = logging.getLogger(__name__)
try:
import pytesseract
from PIL import Image
TESSERACT_AVAILABLE = True
except ImportError:
TESSERACT_AVAILABLE = False
logger.warning("pytesseract or Pillow not installed - Tesseract OCR unavailable")
async def extract_bounding_boxes(image_bytes: bytes, lang: str = "eng+deu") -> dict:
"""Run Tesseract OCR and return word-level bounding boxes.
Args:
image_bytes: PNG/JPEG image as bytes.
lang: Tesseract language string (e.g. "eng+deu").
Returns:
Dict with 'words' list and 'image_width'/'image_height'.
"""
if not TESSERACT_AVAILABLE:
return {"words": [], "image_width": 0, "image_height": 0, "error": "Tesseract not available"}
image = Image.open(io.BytesIO(image_bytes))
data = pytesseract.image_to_data(image, lang=lang, output_type=pytesseract.Output.DICT)
words = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if not text or conf < 20:
continue
words.append({
"text": text,
"left": data['left'][i],
"top": data['top'][i],
"width": data['width'][i],
"height": data['height'][i],
"conf": conf,
"block_num": data['block_num'][i],
"par_num": data['par_num'][i],
"line_num": data['line_num'][i],
"word_num": data['word_num'][i],
})
return {
"words": words,
"image_width": image.width,
"image_height": image.height,
}
def group_words_into_lines(words: List[dict], y_tolerance_px: int = 15) -> List[List[dict]]:
"""Group words by their Y position into lines.
Args:
words: List of word dicts from extract_bounding_boxes.
y_tolerance_px: Max pixel distance to consider words on the same line.
Returns:
List of lines, each line is a list of words sorted by X position.
"""
if not words:
return []
# Sort by Y then X
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
lines: List[List[dict]] = []
current_line: List[dict] = [sorted_words[0]]
current_y = sorted_words[0]['top']
for word in sorted_words[1:]:
if abs(word['top'] - current_y) <= y_tolerance_px:
current_line.append(word)
else:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
current_line = [word]
current_y = word['top']
if current_line:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
return lines
def detect_columns(lines: List[List[dict]], image_width: int) -> Dict[str, Any]:
"""Detect column boundaries from word positions.
Typical vocab table: Left=English, Middle=German, Right=Example sentences.
Returns:
Dict with column boundaries and type assignments.
"""
if not lines or image_width == 0:
return {"columns": [], "column_types": []}
# Collect all word X positions
all_x_positions = []
for line in lines:
for word in line:
all_x_positions.append(word['left'])
if not all_x_positions:
return {"columns": [], "column_types": []}
# Find X-position clusters (column starts)
all_x_positions.sort()
# Simple gap-based column detection
min_gap = image_width * 0.08 # 8% of page width = column gap
clusters = []
current_cluster = [all_x_positions[0]]
for x in all_x_positions[1:]:
if x - current_cluster[-1] > min_gap:
clusters.append(current_cluster)
current_cluster = [x]
else:
current_cluster.append(x)
if current_cluster:
clusters.append(current_cluster)
# Each cluster represents a column start
columns = []
for cluster in clusters:
col_start = min(cluster)
columns.append({
"x_start": col_start,
"x_start_pct": col_start / image_width * 100,
"word_count": len(cluster),
})
# Assign column types based on position (left→right: EN, DE, Example)
type_map = ["english", "german", "example"]
column_types = []
for i, col in enumerate(columns):
if i < len(type_map):
column_types.append(type_map[i])
else:
column_types.append("unknown")
return {
"columns": columns,
"column_types": column_types,
}
def words_to_vocab_entries(lines: List[List[dict]], columns: List[dict],
column_types: List[str], image_width: int,
image_height: int) -> List[dict]:
"""Convert grouped words into vocabulary entries using column positions.
Args:
lines: Grouped word lines from group_words_into_lines.
columns: Column boundaries from detect_columns.
column_types: Column type assignments.
image_width: Image width in pixels.
image_height: Image height in pixels.
Returns:
List of vocabulary entry dicts with english/german/example fields.
"""
if not columns or not lines:
return []
# Build column boundaries for word assignment
col_boundaries = []
for i, col in enumerate(columns):
start = col['x_start']
if i + 1 < len(columns):
end = columns[i + 1]['x_start']
else:
end = image_width
col_boundaries.append((start, end, column_types[i] if i < len(column_types) else "unknown"))
entries = []
for line in lines:
entry = {"english": "", "german": "", "example": ""}
line_words_by_col: Dict[str, List[str]] = {"english": [], "german": [], "example": []}
line_bbox: Dict[str, Optional[dict]] = {}
for word in line:
word_center_x = word['left'] + word['width'] / 2
assigned_type = "unknown"
for start, end, col_type in col_boundaries:
if start <= word_center_x < end:
assigned_type = col_type
break
if assigned_type in line_words_by_col:
line_words_by_col[assigned_type].append(word['text'])
# Track bounding box for the column
if assigned_type not in line_bbox or line_bbox[assigned_type] is None:
line_bbox[assigned_type] = {
"left": word['left'],
"top": word['top'],
"right": word['left'] + word['width'],
"bottom": word['top'] + word['height'],
}
else:
bb = line_bbox[assigned_type]
bb['left'] = min(bb['left'], word['left'])
bb['top'] = min(bb['top'], word['top'])
bb['right'] = max(bb['right'], word['left'] + word['width'])
bb['bottom'] = max(bb['bottom'], word['top'] + word['height'])
for col_type in ["english", "german", "example"]:
if line_words_by_col[col_type]:
entry[col_type] = " ".join(line_words_by_col[col_type])
if line_bbox.get(col_type):
bb = line_bbox[col_type]
entry[f"{col_type}_bbox"] = {
"x_pct": bb['left'] / image_width * 100,
"y_pct": bb['top'] / image_height * 100,
"w_pct": (bb['right'] - bb['left']) / image_width * 100,
"h_pct": (bb['bottom'] - bb['top']) / image_height * 100,
}
# Only add if at least one column has content
if entry["english"] or entry["german"]:
entries.append(entry)
return entries
def match_positions_to_vocab(tess_words: List[dict], llm_vocab: List[dict],
image_w: int, image_h: int,
threshold: float = 0.6) -> List[dict]:
"""Match Tesseract bounding boxes to LLM vocabulary entries.
For each LLM vocab entry, find the best-matching Tesseract word
and attach its bounding box coordinates.
Args:
tess_words: Word list from Tesseract with pixel coordinates.
llm_vocab: Vocabulary list from Vision LLM.
image_w: Image width in pixels.
image_h: Image height in pixels.
threshold: Minimum similarity ratio for a match.
Returns:
llm_vocab list with bbox_x_pct/bbox_y_pct/bbox_w_pct/bbox_h_pct added.
"""
if not tess_words or not llm_vocab or image_w == 0 or image_h == 0:
return llm_vocab
for entry in llm_vocab:
english = entry.get("english", "").lower().strip()
german = entry.get("german", "").lower().strip()
if not english and not german:
continue
# Try to match English word first, then German
for field in ["english", "german"]:
search_text = entry.get(field, "").lower().strip()
if not search_text:
continue
best_word = None
best_ratio = 0.0
for word in tess_words:
ratio = SequenceMatcher(None, search_text, word['text'].lower()).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_word = word
if best_word and best_ratio >= threshold:
entry[f"bbox_x_pct"] = best_word['left'] / image_w * 100
entry[f"bbox_y_pct"] = best_word['top'] / image_h * 100
entry[f"bbox_w_pct"] = best_word['width'] / image_w * 100
entry[f"bbox_h_pct"] = best_word['height'] / image_h * 100
entry["bbox_match_field"] = field
entry["bbox_match_ratio"] = round(best_ratio, 3)
break # Found a match, no need to try the other field
return llm_vocab
async def run_tesseract_pipeline(image_bytes: bytes, lang: str = "eng+deu") -> dict:
"""Full Tesseract pipeline: extract words, group lines, detect columns, build vocab.
Args:
image_bytes: PNG/JPEG image as bytes.
lang: Tesseract language string.
Returns:
Dict with 'vocabulary', 'words', 'lines', 'columns', 'image_width', 'image_height'.
"""
# Step 1: Extract bounding boxes
bbox_data = await extract_bounding_boxes(image_bytes, lang=lang)
if bbox_data.get("error"):
return bbox_data
words = bbox_data["words"]
image_w = bbox_data["image_width"]
image_h = bbox_data["image_height"]
# Step 2: Group into lines
lines = group_words_into_lines(words)
# Step 3: Detect columns
col_info = detect_columns(lines, image_w)
# Step 4: Build vocabulary entries
vocab = words_to_vocab_entries(
lines,
col_info["columns"],
col_info["column_types"],
image_w,
image_h,
)
return {
"vocabulary": vocab,
"words": words,
"lines_count": len(lines),
"columns": col_info["columns"],
"column_types": col_info["column_types"],
"image_width": image_w,
"image_height": image_h,
"word_count": len(words),
}