feat(klausur-service): Add Tesseract OCR, DSFA RAG, TrOCR, grid detection and vocab session store
New modules: - tesseract_vocab_extractor.py: Bounding-box OCR with multi-PSM pipeline - grid_detection_service.py: CV-based grid/table detection for worksheets - vocab_session_store.py: PostgreSQL persistence for vocab sessions - trocr_api.py: TrOCR handwriting recognition endpoint - dsfa_rag_api.py + dsfa_corpus_ingestion.py: DSFA RAG corpus search Changes: - Dockerfile: Install tesseract-ocr + deu/eng language packs - requirements.txt: Add PyMuPDF, pytesseract, Pillow - main.py: Register new routers, init DB pools + Qdrant collections Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
509
klausur-service/backend/services/grid_detection_service.py
Normal file
509
klausur-service/backend/services/grid_detection_service.py
Normal file
@@ -0,0 +1,509 @@
|
||||
"""
|
||||
Grid Detection Service v4
|
||||
|
||||
Detects table/grid structure from OCR bounding-box data.
|
||||
Converts pixel coordinates to percentage and mm coordinates (A4 format).
|
||||
Supports deskew correction, column detection, and multi-line cell handling.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
"""
|
||||
|
||||
import math
|
||||
import logging
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict, Any, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# A4 dimensions
|
||||
A4_WIDTH_MM = 210.0
|
||||
A4_HEIGHT_MM = 297.0
|
||||
|
||||
# Column margin (1mm)
|
||||
COLUMN_MARGIN_MM = 1.0
|
||||
COLUMN_MARGIN_PCT = (COLUMN_MARGIN_MM / A4_WIDTH_MM) * 100
|
||||
|
||||
|
||||
class CellStatus(str, Enum):
|
||||
EMPTY = "empty"
|
||||
RECOGNIZED = "recognized"
|
||||
PROBLEMATIC = "problematic"
|
||||
MANUAL = "manual"
|
||||
|
||||
|
||||
class ColumnType(str, Enum):
|
||||
ENGLISH = "english"
|
||||
GERMAN = "german"
|
||||
EXAMPLE = "example"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
@dataclass
|
||||
class OCRRegion:
|
||||
"""A word/phrase detected by OCR with bounding box coordinates in percentage (0-100)."""
|
||||
text: str
|
||||
confidence: float
|
||||
x: float # X position as percentage of page width
|
||||
y: float # Y position as percentage of page height
|
||||
width: float # Width as percentage of page width
|
||||
height: float # Height as percentage of page height
|
||||
|
||||
@property
|
||||
def x_mm(self) -> float:
|
||||
return round(self.x / 100 * A4_WIDTH_MM, 1)
|
||||
|
||||
@property
|
||||
def y_mm(self) -> float:
|
||||
return round(self.y / 100 * A4_HEIGHT_MM, 1)
|
||||
|
||||
@property
|
||||
def width_mm(self) -> float:
|
||||
return round(self.width / 100 * A4_WIDTH_MM, 1)
|
||||
|
||||
@property
|
||||
def height_mm(self) -> float:
|
||||
return round(self.height / 100 * A4_HEIGHT_MM, 2)
|
||||
|
||||
@property
|
||||
def center_x(self) -> float:
|
||||
return self.x + self.width / 2
|
||||
|
||||
@property
|
||||
def center_y(self) -> float:
|
||||
return self.y + self.height / 2
|
||||
|
||||
@property
|
||||
def right(self) -> float:
|
||||
return self.x + self.width
|
||||
|
||||
@property
|
||||
def bottom(self) -> float:
|
||||
return self.y + self.height
|
||||
|
||||
|
||||
@dataclass
|
||||
class GridCell:
|
||||
"""A cell in the detected grid with coordinates in percentage (0-100)."""
|
||||
row: int
|
||||
col: int
|
||||
x: float
|
||||
y: float
|
||||
width: float
|
||||
height: float
|
||||
text: str = ""
|
||||
confidence: float = 0.0
|
||||
status: CellStatus = CellStatus.EMPTY
|
||||
column_type: ColumnType = ColumnType.UNKNOWN
|
||||
logical_row: int = 0
|
||||
logical_col: int = 0
|
||||
is_continuation: bool = False
|
||||
|
||||
@property
|
||||
def x_mm(self) -> float:
|
||||
return round(self.x / 100 * A4_WIDTH_MM, 1)
|
||||
|
||||
@property
|
||||
def y_mm(self) -> float:
|
||||
return round(self.y / 100 * A4_HEIGHT_MM, 1)
|
||||
|
||||
@property
|
||||
def width_mm(self) -> float:
|
||||
return round(self.width / 100 * A4_WIDTH_MM, 1)
|
||||
|
||||
@property
|
||||
def height_mm(self) -> float:
|
||||
return round(self.height / 100 * A4_HEIGHT_MM, 2)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"row": self.row,
|
||||
"col": self.col,
|
||||
"x": round(self.x, 2),
|
||||
"y": round(self.y, 2),
|
||||
"width": round(self.width, 2),
|
||||
"height": round(self.height, 2),
|
||||
"x_mm": self.x_mm,
|
||||
"y_mm": self.y_mm,
|
||||
"width_mm": self.width_mm,
|
||||
"height_mm": self.height_mm,
|
||||
"text": self.text,
|
||||
"confidence": self.confidence,
|
||||
"status": self.status.value,
|
||||
"column_type": self.column_type.value,
|
||||
"logical_row": self.logical_row,
|
||||
"logical_col": self.logical_col,
|
||||
"is_continuation": self.is_continuation,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class GridResult:
|
||||
"""Result of grid detection."""
|
||||
rows: int = 0
|
||||
columns: int = 0
|
||||
cells: List[List[GridCell]] = field(default_factory=list)
|
||||
column_types: List[str] = field(default_factory=list)
|
||||
column_boundaries: List[float] = field(default_factory=list)
|
||||
row_boundaries: List[float] = field(default_factory=list)
|
||||
deskew_angle: float = 0.0
|
||||
stats: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
cells_dicts = []
|
||||
for row_cells in self.cells:
|
||||
cells_dicts.append([c.to_dict() for c in row_cells])
|
||||
|
||||
return {
|
||||
"rows": self.rows,
|
||||
"columns": self.columns,
|
||||
"cells": cells_dicts,
|
||||
"column_types": self.column_types,
|
||||
"column_boundaries": [round(b, 2) for b in self.column_boundaries],
|
||||
"row_boundaries": [round(b, 2) for b in self.row_boundaries],
|
||||
"deskew_angle": round(self.deskew_angle, 2),
|
||||
"stats": self.stats,
|
||||
"page_dimensions": {
|
||||
"width_mm": A4_WIDTH_MM,
|
||||
"height_mm": A4_HEIGHT_MM,
|
||||
"format": "A4",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class GridDetectionService:
|
||||
"""Detect grid/table structure from OCR bounding-box regions."""
|
||||
|
||||
def __init__(self, y_tolerance_pct: float = 1.5, padding_pct: float = 0.3,
|
||||
column_margin_mm: float = COLUMN_MARGIN_MM):
|
||||
self.y_tolerance_pct = y_tolerance_pct
|
||||
self.padding_pct = padding_pct
|
||||
self.column_margin_mm = column_margin_mm
|
||||
|
||||
def calculate_deskew_angle(self, regions: List[OCRRegion]) -> float:
|
||||
"""Calculate page skew angle from OCR region positions.
|
||||
|
||||
Uses left-edge alignment of regions to detect consistent tilt.
|
||||
Returns angle in degrees, clamped to ±5°.
|
||||
"""
|
||||
if len(regions) < 3:
|
||||
return 0.0
|
||||
|
||||
# Group by similar X position (same column)
|
||||
sorted_by_x = sorted(regions, key=lambda r: r.x)
|
||||
|
||||
# Find regions that are vertically aligned (similar X)
|
||||
x_tolerance = 3.0 # percent
|
||||
aligned_groups: List[List[OCRRegion]] = []
|
||||
current_group = [sorted_by_x[0]]
|
||||
|
||||
for r in sorted_by_x[1:]:
|
||||
if abs(r.x - current_group[0].x) <= x_tolerance:
|
||||
current_group.append(r)
|
||||
else:
|
||||
if len(current_group) >= 3:
|
||||
aligned_groups.append(current_group)
|
||||
current_group = [r]
|
||||
|
||||
if len(current_group) >= 3:
|
||||
aligned_groups.append(current_group)
|
||||
|
||||
if not aligned_groups:
|
||||
return 0.0
|
||||
|
||||
# Use the largest aligned group to calculate skew
|
||||
best_group = max(aligned_groups, key=len)
|
||||
best_group.sort(key=lambda r: r.y)
|
||||
|
||||
# Linear regression: X as function of Y
|
||||
n = len(best_group)
|
||||
sum_y = sum(r.y for r in best_group)
|
||||
sum_x = sum(r.x for r in best_group)
|
||||
sum_xy = sum(r.x * r.y for r in best_group)
|
||||
sum_y2 = sum(r.y ** 2 for r in best_group)
|
||||
|
||||
denom = n * sum_y2 - sum_y ** 2
|
||||
if denom == 0:
|
||||
return 0.0
|
||||
|
||||
slope = (n * sum_xy - sum_y * sum_x) / denom
|
||||
|
||||
# Convert slope to angle (slope is dx/dy in percent space)
|
||||
# Adjust for aspect ratio: A4 is 210/297 ≈ 0.707
|
||||
aspect = A4_WIDTH_MM / A4_HEIGHT_MM
|
||||
angle_rad = math.atan(slope * aspect)
|
||||
angle_deg = math.degrees(angle_rad)
|
||||
|
||||
# Clamp to ±5°
|
||||
return max(-5.0, min(5.0, round(angle_deg, 2)))
|
||||
|
||||
def apply_deskew_to_regions(self, regions: List[OCRRegion], angle: float) -> List[OCRRegion]:
|
||||
"""Apply deskew correction to region coordinates.
|
||||
|
||||
Rotates all coordinates around the page center by -angle.
|
||||
"""
|
||||
if abs(angle) < 0.01:
|
||||
return regions
|
||||
|
||||
angle_rad = math.radians(-angle)
|
||||
cos_a = math.cos(angle_rad)
|
||||
sin_a = math.sin(angle_rad)
|
||||
|
||||
# Page center
|
||||
cx, cy = 50.0, 50.0
|
||||
|
||||
result = []
|
||||
for r in regions:
|
||||
# Rotate center of region around page center
|
||||
rx = r.center_x - cx
|
||||
ry = r.center_y - cy
|
||||
new_cx = rx * cos_a - ry * sin_a + cx
|
||||
new_cy = rx * sin_a + ry * cos_a + cy
|
||||
new_x = new_cx - r.width / 2
|
||||
new_y = new_cy - r.height / 2
|
||||
|
||||
result.append(OCRRegion(
|
||||
text=r.text,
|
||||
confidence=r.confidence,
|
||||
x=round(new_x, 2),
|
||||
y=round(new_y, 2),
|
||||
width=r.width,
|
||||
height=r.height,
|
||||
))
|
||||
|
||||
return result
|
||||
|
||||
def _group_regions_into_rows(self, regions: List[OCRRegion]) -> List[List[OCRRegion]]:
|
||||
"""Group regions by Y position into rows."""
|
||||
if not regions:
|
||||
return []
|
||||
|
||||
sorted_regions = sorted(regions, key=lambda r: r.y)
|
||||
rows: List[List[OCRRegion]] = []
|
||||
current_row = [sorted_regions[0]]
|
||||
current_y = sorted_regions[0].center_y
|
||||
|
||||
for r in sorted_regions[1:]:
|
||||
if abs(r.center_y - current_y) <= self.y_tolerance_pct:
|
||||
current_row.append(r)
|
||||
else:
|
||||
current_row.sort(key=lambda r: r.x)
|
||||
rows.append(current_row)
|
||||
current_row = [r]
|
||||
current_y = r.center_y
|
||||
|
||||
if current_row:
|
||||
current_row.sort(key=lambda r: r.x)
|
||||
rows.append(current_row)
|
||||
|
||||
return rows
|
||||
|
||||
def _detect_column_boundaries(self, rows: List[List[OCRRegion]]) -> List[float]:
|
||||
"""Detect column boundaries from row data."""
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
# Collect all X starting positions
|
||||
all_x = []
|
||||
for row in rows:
|
||||
for r in row:
|
||||
all_x.append(r.x)
|
||||
|
||||
if not all_x:
|
||||
return []
|
||||
|
||||
all_x.sort()
|
||||
|
||||
# Gap-based clustering
|
||||
min_gap = 5.0 # percent
|
||||
clusters: List[List[float]] = []
|
||||
current = [all_x[0]]
|
||||
|
||||
for x in all_x[1:]:
|
||||
if x - current[-1] > min_gap:
|
||||
clusters.append(current)
|
||||
current = [x]
|
||||
else:
|
||||
current.append(x)
|
||||
|
||||
if current:
|
||||
clusters.append(current)
|
||||
|
||||
# Column boundaries: start of each cluster
|
||||
boundaries = [min(c) - self.padding_pct for c in clusters]
|
||||
# Add right boundary
|
||||
boundaries.append(100.0)
|
||||
|
||||
return boundaries
|
||||
|
||||
def _assign_column_types(self, boundaries: List[float]) -> List[str]:
|
||||
"""Assign column types based on position."""
|
||||
num_cols = max(0, len(boundaries) - 1)
|
||||
type_map = [ColumnType.ENGLISH, ColumnType.GERMAN, ColumnType.EXAMPLE]
|
||||
result = []
|
||||
for i in range(num_cols):
|
||||
if i < len(type_map):
|
||||
result.append(type_map[i].value)
|
||||
else:
|
||||
result.append(ColumnType.UNKNOWN.value)
|
||||
return result
|
||||
|
||||
def detect_grid(self, regions: List[OCRRegion]) -> GridResult:
|
||||
"""Detect grid structure from OCR regions.
|
||||
|
||||
Args:
|
||||
regions: List of OCR regions with percentage-based coordinates.
|
||||
|
||||
Returns:
|
||||
GridResult with detected rows, columns, and cells.
|
||||
"""
|
||||
if not regions:
|
||||
return GridResult(stats={"recognized": 0, "problematic": 0, "empty": 0, "manual": 0, "total": 0, "coverage": 0.0})
|
||||
|
||||
# Step 1: Calculate and apply deskew
|
||||
deskew_angle = self.calculate_deskew_angle(regions)
|
||||
corrected_regions = self.apply_deskew_to_regions(regions, deskew_angle)
|
||||
|
||||
# Step 2: Group into rows
|
||||
rows = self._group_regions_into_rows(corrected_regions)
|
||||
|
||||
# Step 3: Detect column boundaries
|
||||
col_boundaries = self._detect_column_boundaries(rows)
|
||||
column_types = self._assign_column_types(col_boundaries)
|
||||
num_cols = max(1, len(col_boundaries) - 1)
|
||||
|
||||
# Step 4: Build cell grid
|
||||
num_rows = len(rows)
|
||||
row_boundaries = []
|
||||
cells = []
|
||||
|
||||
recognized = 0
|
||||
problematic = 0
|
||||
empty = 0
|
||||
|
||||
for row_idx, row_regions in enumerate(rows):
|
||||
# Row Y boundary
|
||||
if row_regions:
|
||||
row_y = min(r.y for r in row_regions) - self.padding_pct
|
||||
row_bottom = max(r.bottom for r in row_regions) + self.padding_pct
|
||||
else:
|
||||
row_y = row_idx / num_rows * 100
|
||||
row_bottom = (row_idx + 1) / num_rows * 100
|
||||
|
||||
row_boundaries.append(row_y)
|
||||
row_height = row_bottom - row_y
|
||||
|
||||
row_cells = []
|
||||
for col_idx in range(num_cols):
|
||||
col_x = col_boundaries[col_idx]
|
||||
col_right = col_boundaries[col_idx + 1] if col_idx + 1 < len(col_boundaries) else 100.0
|
||||
col_width = col_right - col_x
|
||||
|
||||
# Find regions in this cell
|
||||
cell_regions = []
|
||||
for r in row_regions:
|
||||
r_center = r.center_x
|
||||
if col_x <= r_center < col_right:
|
||||
cell_regions.append(r)
|
||||
|
||||
if cell_regions:
|
||||
text = " ".join(r.text for r in cell_regions)
|
||||
avg_conf = sum(r.confidence for r in cell_regions) / len(cell_regions)
|
||||
status = CellStatus.RECOGNIZED if avg_conf >= 0.5 else CellStatus.PROBLEMATIC
|
||||
# Use actual bounding box from regions
|
||||
actual_x = min(r.x for r in cell_regions)
|
||||
actual_y = min(r.y for r in cell_regions)
|
||||
actual_right = max(r.right for r in cell_regions)
|
||||
actual_bottom = max(r.bottom for r in cell_regions)
|
||||
|
||||
cell = GridCell(
|
||||
row=row_idx,
|
||||
col=col_idx,
|
||||
x=actual_x,
|
||||
y=actual_y,
|
||||
width=actual_right - actual_x,
|
||||
height=actual_bottom - actual_y,
|
||||
text=text,
|
||||
confidence=round(avg_conf, 3),
|
||||
status=status,
|
||||
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
|
||||
logical_row=row_idx,
|
||||
logical_col=col_idx,
|
||||
)
|
||||
|
||||
if status == CellStatus.RECOGNIZED:
|
||||
recognized += 1
|
||||
else:
|
||||
problematic += 1
|
||||
else:
|
||||
cell = GridCell(
|
||||
row=row_idx,
|
||||
col=col_idx,
|
||||
x=col_x,
|
||||
y=row_y,
|
||||
width=col_width,
|
||||
height=row_height,
|
||||
status=CellStatus.EMPTY,
|
||||
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
|
||||
logical_row=row_idx,
|
||||
logical_col=col_idx,
|
||||
)
|
||||
empty += 1
|
||||
|
||||
row_cells.append(cell)
|
||||
cells.append(row_cells)
|
||||
|
||||
# Add final row boundary
|
||||
if rows and rows[-1]:
|
||||
row_boundaries.append(max(r.bottom for r in rows[-1]) + self.padding_pct)
|
||||
else:
|
||||
row_boundaries.append(100.0)
|
||||
|
||||
total = num_rows * num_cols
|
||||
coverage = (recognized + problematic) / max(total, 1)
|
||||
|
||||
return GridResult(
|
||||
rows=num_rows,
|
||||
columns=num_cols,
|
||||
cells=cells,
|
||||
column_types=column_types,
|
||||
column_boundaries=col_boundaries,
|
||||
row_boundaries=row_boundaries,
|
||||
deskew_angle=deskew_angle,
|
||||
stats={
|
||||
"recognized": recognized,
|
||||
"problematic": problematic,
|
||||
"empty": empty,
|
||||
"manual": 0,
|
||||
"total": total,
|
||||
"coverage": round(coverage, 3),
|
||||
},
|
||||
)
|
||||
|
||||
def convert_tesseract_regions(self, tess_words: List[dict],
|
||||
image_width: int, image_height: int) -> List[OCRRegion]:
|
||||
"""Convert Tesseract word data (pixels) to OCRRegions (percentages).
|
||||
|
||||
Args:
|
||||
tess_words: Word list from tesseract_vocab_extractor.extract_bounding_boxes.
|
||||
image_width: Image width in pixels.
|
||||
image_height: Image height in pixels.
|
||||
|
||||
Returns:
|
||||
List of OCRRegion with percentage-based coordinates.
|
||||
"""
|
||||
if not tess_words or image_width == 0 or image_height == 0:
|
||||
return []
|
||||
|
||||
regions = []
|
||||
for w in tess_words:
|
||||
regions.append(OCRRegion(
|
||||
text=w["text"],
|
||||
confidence=w.get("conf", 50) / 100.0,
|
||||
x=w["left"] / image_width * 100,
|
||||
y=w["top"] / image_height * 100,
|
||||
width=w["width"] / image_width * 100,
|
||||
height=w["height"] / image_height * 100,
|
||||
))
|
||||
|
||||
return regions
|
||||
Reference in New Issue
Block a user