Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
510 lines
16 KiB
Python
510 lines
16 KiB
Python
"""
|
|
Grid Detection Service v4
|
|
|
|
Detects table/grid structure from OCR bounding-box data.
|
|
Converts pixel coordinates to percentage and mm coordinates (A4 format).
|
|
Supports deskew correction, column detection, and multi-line cell handling.
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
"""
|
|
|
|
import math
|
|
import logging
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Dict, Any, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# A4 dimensions
|
|
A4_WIDTH_MM = 210.0
|
|
A4_HEIGHT_MM = 297.0
|
|
|
|
# Column margin (1mm)
|
|
COLUMN_MARGIN_MM = 1.0
|
|
COLUMN_MARGIN_PCT = (COLUMN_MARGIN_MM / A4_WIDTH_MM) * 100
|
|
|
|
|
|
class CellStatus(str, Enum):
|
|
EMPTY = "empty"
|
|
RECOGNIZED = "recognized"
|
|
PROBLEMATIC = "problematic"
|
|
MANUAL = "manual"
|
|
|
|
|
|
class ColumnType(str, Enum):
|
|
ENGLISH = "english"
|
|
GERMAN = "german"
|
|
EXAMPLE = "example"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class OCRRegion:
|
|
"""A word/phrase detected by OCR with bounding box coordinates in percentage (0-100)."""
|
|
text: str
|
|
confidence: float
|
|
x: float # X position as percentage of page width
|
|
y: float # Y position as percentage of page height
|
|
width: float # Width as percentage of page width
|
|
height: float # Height as percentage of page height
|
|
|
|
@property
|
|
def x_mm(self) -> float:
|
|
return round(self.x / 100 * A4_WIDTH_MM, 1)
|
|
|
|
@property
|
|
def y_mm(self) -> float:
|
|
return round(self.y / 100 * A4_HEIGHT_MM, 1)
|
|
|
|
@property
|
|
def width_mm(self) -> float:
|
|
return round(self.width / 100 * A4_WIDTH_MM, 1)
|
|
|
|
@property
|
|
def height_mm(self) -> float:
|
|
return round(self.height / 100 * A4_HEIGHT_MM, 2)
|
|
|
|
@property
|
|
def center_x(self) -> float:
|
|
return self.x + self.width / 2
|
|
|
|
@property
|
|
def center_y(self) -> float:
|
|
return self.y + self.height / 2
|
|
|
|
@property
|
|
def right(self) -> float:
|
|
return self.x + self.width
|
|
|
|
@property
|
|
def bottom(self) -> float:
|
|
return self.y + self.height
|
|
|
|
|
|
@dataclass
|
|
class GridCell:
|
|
"""A cell in the detected grid with coordinates in percentage (0-100)."""
|
|
row: int
|
|
col: int
|
|
x: float
|
|
y: float
|
|
width: float
|
|
height: float
|
|
text: str = ""
|
|
confidence: float = 0.0
|
|
status: CellStatus = CellStatus.EMPTY
|
|
column_type: ColumnType = ColumnType.UNKNOWN
|
|
logical_row: int = 0
|
|
logical_col: int = 0
|
|
is_continuation: bool = False
|
|
|
|
@property
|
|
def x_mm(self) -> float:
|
|
return round(self.x / 100 * A4_WIDTH_MM, 1)
|
|
|
|
@property
|
|
def y_mm(self) -> float:
|
|
return round(self.y / 100 * A4_HEIGHT_MM, 1)
|
|
|
|
@property
|
|
def width_mm(self) -> float:
|
|
return round(self.width / 100 * A4_WIDTH_MM, 1)
|
|
|
|
@property
|
|
def height_mm(self) -> float:
|
|
return round(self.height / 100 * A4_HEIGHT_MM, 2)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"row": self.row,
|
|
"col": self.col,
|
|
"x": round(self.x, 2),
|
|
"y": round(self.y, 2),
|
|
"width": round(self.width, 2),
|
|
"height": round(self.height, 2),
|
|
"x_mm": self.x_mm,
|
|
"y_mm": self.y_mm,
|
|
"width_mm": self.width_mm,
|
|
"height_mm": self.height_mm,
|
|
"text": self.text,
|
|
"confidence": self.confidence,
|
|
"status": self.status.value,
|
|
"column_type": self.column_type.value,
|
|
"logical_row": self.logical_row,
|
|
"logical_col": self.logical_col,
|
|
"is_continuation": self.is_continuation,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class GridResult:
|
|
"""Result of grid detection."""
|
|
rows: int = 0
|
|
columns: int = 0
|
|
cells: List[List[GridCell]] = field(default_factory=list)
|
|
column_types: List[str] = field(default_factory=list)
|
|
column_boundaries: List[float] = field(default_factory=list)
|
|
row_boundaries: List[float] = field(default_factory=list)
|
|
deskew_angle: float = 0.0
|
|
stats: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> dict:
|
|
cells_dicts = []
|
|
for row_cells in self.cells:
|
|
cells_dicts.append([c.to_dict() for c in row_cells])
|
|
|
|
return {
|
|
"rows": self.rows,
|
|
"columns": self.columns,
|
|
"cells": cells_dicts,
|
|
"column_types": self.column_types,
|
|
"column_boundaries": [round(b, 2) for b in self.column_boundaries],
|
|
"row_boundaries": [round(b, 2) for b in self.row_boundaries],
|
|
"deskew_angle": round(self.deskew_angle, 2),
|
|
"stats": self.stats,
|
|
"page_dimensions": {
|
|
"width_mm": A4_WIDTH_MM,
|
|
"height_mm": A4_HEIGHT_MM,
|
|
"format": "A4",
|
|
},
|
|
}
|
|
|
|
|
|
class GridDetectionService:
|
|
"""Detect grid/table structure from OCR bounding-box regions."""
|
|
|
|
def __init__(self, y_tolerance_pct: float = 1.5, padding_pct: float = 0.3,
|
|
column_margin_mm: float = COLUMN_MARGIN_MM):
|
|
self.y_tolerance_pct = y_tolerance_pct
|
|
self.padding_pct = padding_pct
|
|
self.column_margin_mm = column_margin_mm
|
|
|
|
def calculate_deskew_angle(self, regions: List[OCRRegion]) -> float:
|
|
"""Calculate page skew angle from OCR region positions.
|
|
|
|
Uses left-edge alignment of regions to detect consistent tilt.
|
|
Returns angle in degrees, clamped to ±5°.
|
|
"""
|
|
if len(regions) < 3:
|
|
return 0.0
|
|
|
|
# Group by similar X position (same column)
|
|
sorted_by_x = sorted(regions, key=lambda r: r.x)
|
|
|
|
# Find regions that are vertically aligned (similar X)
|
|
x_tolerance = 3.0 # percent
|
|
aligned_groups: List[List[OCRRegion]] = []
|
|
current_group = [sorted_by_x[0]]
|
|
|
|
for r in sorted_by_x[1:]:
|
|
if abs(r.x - current_group[0].x) <= x_tolerance:
|
|
current_group.append(r)
|
|
else:
|
|
if len(current_group) >= 3:
|
|
aligned_groups.append(current_group)
|
|
current_group = [r]
|
|
|
|
if len(current_group) >= 3:
|
|
aligned_groups.append(current_group)
|
|
|
|
if not aligned_groups:
|
|
return 0.0
|
|
|
|
# Use the largest aligned group to calculate skew
|
|
best_group = max(aligned_groups, key=len)
|
|
best_group.sort(key=lambda r: r.y)
|
|
|
|
# Linear regression: X as function of Y
|
|
n = len(best_group)
|
|
sum_y = sum(r.y for r in best_group)
|
|
sum_x = sum(r.x for r in best_group)
|
|
sum_xy = sum(r.x * r.y for r in best_group)
|
|
sum_y2 = sum(r.y ** 2 for r in best_group)
|
|
|
|
denom = n * sum_y2 - sum_y ** 2
|
|
if denom == 0:
|
|
return 0.0
|
|
|
|
slope = (n * sum_xy - sum_y * sum_x) / denom
|
|
|
|
# Convert slope to angle (slope is dx/dy in percent space)
|
|
# Adjust for aspect ratio: A4 is 210/297 ≈ 0.707
|
|
aspect = A4_WIDTH_MM / A4_HEIGHT_MM
|
|
angle_rad = math.atan(slope * aspect)
|
|
angle_deg = math.degrees(angle_rad)
|
|
|
|
# Clamp to ±5°
|
|
return max(-5.0, min(5.0, round(angle_deg, 2)))
|
|
|
|
def apply_deskew_to_regions(self, regions: List[OCRRegion], angle: float) -> List[OCRRegion]:
|
|
"""Apply deskew correction to region coordinates.
|
|
|
|
Rotates all coordinates around the page center by -angle.
|
|
"""
|
|
if abs(angle) < 0.01:
|
|
return regions
|
|
|
|
angle_rad = math.radians(-angle)
|
|
cos_a = math.cos(angle_rad)
|
|
sin_a = math.sin(angle_rad)
|
|
|
|
# Page center
|
|
cx, cy = 50.0, 50.0
|
|
|
|
result = []
|
|
for r in regions:
|
|
# Rotate center of region around page center
|
|
rx = r.center_x - cx
|
|
ry = r.center_y - cy
|
|
new_cx = rx * cos_a - ry * sin_a + cx
|
|
new_cy = rx * sin_a + ry * cos_a + cy
|
|
new_x = new_cx - r.width / 2
|
|
new_y = new_cy - r.height / 2
|
|
|
|
result.append(OCRRegion(
|
|
text=r.text,
|
|
confidence=r.confidence,
|
|
x=round(new_x, 2),
|
|
y=round(new_y, 2),
|
|
width=r.width,
|
|
height=r.height,
|
|
))
|
|
|
|
return result
|
|
|
|
def _group_regions_into_rows(self, regions: List[OCRRegion]) -> List[List[OCRRegion]]:
|
|
"""Group regions by Y position into rows."""
|
|
if not regions:
|
|
return []
|
|
|
|
sorted_regions = sorted(regions, key=lambda r: r.y)
|
|
rows: List[List[OCRRegion]] = []
|
|
current_row = [sorted_regions[0]]
|
|
current_y = sorted_regions[0].center_y
|
|
|
|
for r in sorted_regions[1:]:
|
|
if abs(r.center_y - current_y) <= self.y_tolerance_pct:
|
|
current_row.append(r)
|
|
else:
|
|
current_row.sort(key=lambda r: r.x)
|
|
rows.append(current_row)
|
|
current_row = [r]
|
|
current_y = r.center_y
|
|
|
|
if current_row:
|
|
current_row.sort(key=lambda r: r.x)
|
|
rows.append(current_row)
|
|
|
|
return rows
|
|
|
|
def _detect_column_boundaries(self, rows: List[List[OCRRegion]]) -> List[float]:
|
|
"""Detect column boundaries from row data."""
|
|
if not rows:
|
|
return []
|
|
|
|
# Collect all X starting positions
|
|
all_x = []
|
|
for row in rows:
|
|
for r in row:
|
|
all_x.append(r.x)
|
|
|
|
if not all_x:
|
|
return []
|
|
|
|
all_x.sort()
|
|
|
|
# Gap-based clustering
|
|
min_gap = 5.0 # percent
|
|
clusters: List[List[float]] = []
|
|
current = [all_x[0]]
|
|
|
|
for x in all_x[1:]:
|
|
if x - current[-1] > min_gap:
|
|
clusters.append(current)
|
|
current = [x]
|
|
else:
|
|
current.append(x)
|
|
|
|
if current:
|
|
clusters.append(current)
|
|
|
|
# Column boundaries: start of each cluster
|
|
boundaries = [min(c) - self.padding_pct for c in clusters]
|
|
# Add right boundary
|
|
boundaries.append(100.0)
|
|
|
|
return boundaries
|
|
|
|
def _assign_column_types(self, boundaries: List[float]) -> List[str]:
|
|
"""Assign column types based on position."""
|
|
num_cols = max(0, len(boundaries) - 1)
|
|
type_map = [ColumnType.ENGLISH, ColumnType.GERMAN, ColumnType.EXAMPLE]
|
|
result = []
|
|
for i in range(num_cols):
|
|
if i < len(type_map):
|
|
result.append(type_map[i].value)
|
|
else:
|
|
result.append(ColumnType.UNKNOWN.value)
|
|
return result
|
|
|
|
def detect_grid(self, regions: List[OCRRegion]) -> GridResult:
|
|
"""Detect grid structure from OCR regions.
|
|
|
|
Args:
|
|
regions: List of OCR regions with percentage-based coordinates.
|
|
|
|
Returns:
|
|
GridResult with detected rows, columns, and cells.
|
|
"""
|
|
if not regions:
|
|
return GridResult(stats={"recognized": 0, "problematic": 0, "empty": 0, "manual": 0, "total": 0, "coverage": 0.0})
|
|
|
|
# Step 1: Calculate and apply deskew
|
|
deskew_angle = self.calculate_deskew_angle(regions)
|
|
corrected_regions = self.apply_deskew_to_regions(regions, deskew_angle)
|
|
|
|
# Step 2: Group into rows
|
|
rows = self._group_regions_into_rows(corrected_regions)
|
|
|
|
# Step 3: Detect column boundaries
|
|
col_boundaries = self._detect_column_boundaries(rows)
|
|
column_types = self._assign_column_types(col_boundaries)
|
|
num_cols = max(1, len(col_boundaries) - 1)
|
|
|
|
# Step 4: Build cell grid
|
|
num_rows = len(rows)
|
|
row_boundaries = []
|
|
cells = []
|
|
|
|
recognized = 0
|
|
problematic = 0
|
|
empty = 0
|
|
|
|
for row_idx, row_regions in enumerate(rows):
|
|
# Row Y boundary
|
|
if row_regions:
|
|
row_y = min(r.y for r in row_regions) - self.padding_pct
|
|
row_bottom = max(r.bottom for r in row_regions) + self.padding_pct
|
|
else:
|
|
row_y = row_idx / num_rows * 100
|
|
row_bottom = (row_idx + 1) / num_rows * 100
|
|
|
|
row_boundaries.append(row_y)
|
|
row_height = row_bottom - row_y
|
|
|
|
row_cells = []
|
|
for col_idx in range(num_cols):
|
|
col_x = col_boundaries[col_idx]
|
|
col_right = col_boundaries[col_idx + 1] if col_idx + 1 < len(col_boundaries) else 100.0
|
|
col_width = col_right - col_x
|
|
|
|
# Find regions in this cell
|
|
cell_regions = []
|
|
for r in row_regions:
|
|
r_center = r.center_x
|
|
if col_x <= r_center < col_right:
|
|
cell_regions.append(r)
|
|
|
|
if cell_regions:
|
|
text = " ".join(r.text for r in cell_regions)
|
|
avg_conf = sum(r.confidence for r in cell_regions) / len(cell_regions)
|
|
status = CellStatus.RECOGNIZED if avg_conf >= 0.5 else CellStatus.PROBLEMATIC
|
|
# Use actual bounding box from regions
|
|
actual_x = min(r.x for r in cell_regions)
|
|
actual_y = min(r.y for r in cell_regions)
|
|
actual_right = max(r.right for r in cell_regions)
|
|
actual_bottom = max(r.bottom for r in cell_regions)
|
|
|
|
cell = GridCell(
|
|
row=row_idx,
|
|
col=col_idx,
|
|
x=actual_x,
|
|
y=actual_y,
|
|
width=actual_right - actual_x,
|
|
height=actual_bottom - actual_y,
|
|
text=text,
|
|
confidence=round(avg_conf, 3),
|
|
status=status,
|
|
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
|
|
logical_row=row_idx,
|
|
logical_col=col_idx,
|
|
)
|
|
|
|
if status == CellStatus.RECOGNIZED:
|
|
recognized += 1
|
|
else:
|
|
problematic += 1
|
|
else:
|
|
cell = GridCell(
|
|
row=row_idx,
|
|
col=col_idx,
|
|
x=col_x,
|
|
y=row_y,
|
|
width=col_width,
|
|
height=row_height,
|
|
status=CellStatus.EMPTY,
|
|
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
|
|
logical_row=row_idx,
|
|
logical_col=col_idx,
|
|
)
|
|
empty += 1
|
|
|
|
row_cells.append(cell)
|
|
cells.append(row_cells)
|
|
|
|
# Add final row boundary
|
|
if rows and rows[-1]:
|
|
row_boundaries.append(max(r.bottom for r in rows[-1]) + self.padding_pct)
|
|
else:
|
|
row_boundaries.append(100.0)
|
|
|
|
total = num_rows * num_cols
|
|
coverage = (recognized + problematic) / max(total, 1)
|
|
|
|
return GridResult(
|
|
rows=num_rows,
|
|
columns=num_cols,
|
|
cells=cells,
|
|
column_types=column_types,
|
|
column_boundaries=col_boundaries,
|
|
row_boundaries=row_boundaries,
|
|
deskew_angle=deskew_angle,
|
|
stats={
|
|
"recognized": recognized,
|
|
"problematic": problematic,
|
|
"empty": empty,
|
|
"manual": 0,
|
|
"total": total,
|
|
"coverage": round(coverage, 3),
|
|
},
|
|
)
|
|
|
|
def convert_tesseract_regions(self, tess_words: List[dict],
|
|
image_width: int, image_height: int) -> List[OCRRegion]:
|
|
"""Convert Tesseract word data (pixels) to OCRRegions (percentages).
|
|
|
|
Args:
|
|
tess_words: Word list from tesseract_vocab_extractor.extract_bounding_boxes.
|
|
image_width: Image width in pixels.
|
|
image_height: Image height in pixels.
|
|
|
|
Returns:
|
|
List of OCRRegion with percentage-based coordinates.
|
|
"""
|
|
if not tess_words or image_width == 0 or image_height == 0:
|
|
return []
|
|
|
|
regions = []
|
|
for w in tess_words:
|
|
regions.append(OCRRegion(
|
|
text=w["text"],
|
|
confidence=w.get("conf", 50) / 100.0,
|
|
x=w["left"] / image_width * 100,
|
|
y=w["top"] / image_height * 100,
|
|
width=w["width"] / image_width * 100,
|
|
height=w["height"] / image_height * 100,
|
|
))
|
|
|
|
return regions
|