Fix: Sidebar scrollable + add Eltern-Portal nav link
overflow-hidden → overflow-y-auto so all nav items are reachable. Added /parent (Eltern-Portal) link with people icon. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
471
klausur-service/backend/ocr/detect/box_detect.py
Normal file
471
klausur-service/backend/ocr/detect/box_detect.py
Normal file
@@ -0,0 +1,471 @@
|
||||
"""
|
||||
Embedded box detection and page zone splitting for the CV vocabulary pipeline.
|
||||
|
||||
Detects boxes (grammar tips, exercises, etc.) that span the page width and
|
||||
interrupt the normal column layout. Splits the page into vertical zones so
|
||||
that column detection can run independently per zone.
|
||||
|
||||
Two-stage algorithm (both run, results merged):
|
||||
1. Morphological line detection — finds bordered boxes via horizontal lines.
|
||||
2. Background shading detection — finds shaded/colored boxes via median-blur
|
||||
background analysis. Works for colored (blue, green) and grayscale
|
||||
(gray shading on B/W scans) boxes.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from cv_vocab_types import DetectedBox, PageZone
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = [
|
||||
"detect_boxes",
|
||||
"split_page_into_zones",
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stage 1: Morphological line detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _detect_boxes_by_lines(
|
||||
gray: np.ndarray,
|
||||
content_x: int,
|
||||
content_w: int,
|
||||
content_y: int,
|
||||
content_h: int,
|
||||
) -> List[DetectedBox]:
|
||||
"""Find boxes defined by pairs of long horizontal border lines.
|
||||
|
||||
Args:
|
||||
gray: Grayscale image (full page).
|
||||
content_x, content_w: Horizontal content bounds.
|
||||
content_y, content_h: Vertical content bounds.
|
||||
|
||||
Returns:
|
||||
List of DetectedBox for each detected bordered box.
|
||||
"""
|
||||
h, w = gray.shape[:2]
|
||||
|
||||
# Binarize: dark pixels → white on black background
|
||||
_, binary = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY_INV)
|
||||
|
||||
# Horizontal morphology kernel — at least 50% of content width
|
||||
kernel_w = max(50, content_w // 2)
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, 1))
|
||||
lines_img = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
|
||||
|
||||
# Horizontal projection: count line pixels per row
|
||||
h_proj = np.sum(lines_img[:, content_x:content_x + content_w] > 0, axis=1)
|
||||
line_threshold = content_w * 0.30
|
||||
|
||||
# Group consecutive rows with enough line pixels into line segments
|
||||
line_segments: List[Tuple[int, int]] = [] # (y_start, y_end)
|
||||
seg_start: Optional[int] = None
|
||||
for y in range(h):
|
||||
if h_proj[y] >= line_threshold:
|
||||
if seg_start is None:
|
||||
seg_start = y
|
||||
else:
|
||||
if seg_start is not None:
|
||||
line_segments.append((seg_start, y))
|
||||
seg_start = None
|
||||
if seg_start is not None:
|
||||
line_segments.append((seg_start, h))
|
||||
|
||||
if len(line_segments) < 2:
|
||||
return []
|
||||
|
||||
# Pair lines into boxes: top-line + bottom-line
|
||||
# Minimum box height: 30px. Maximum: 70% of content height.
|
||||
min_box_h = 30
|
||||
max_box_h = int(content_h * 0.70)
|
||||
|
||||
boxes: List[DetectedBox] = []
|
||||
used = set()
|
||||
for i, (top_start, top_end) in enumerate(line_segments):
|
||||
if i in used:
|
||||
continue
|
||||
for j in range(i + 1, len(line_segments)):
|
||||
if j in used:
|
||||
continue
|
||||
bot_start, bot_end = line_segments[j]
|
||||
box_y = top_start
|
||||
box_h = bot_end - top_start
|
||||
if box_h < min_box_h or box_h > max_box_h:
|
||||
continue
|
||||
|
||||
# Estimate border thickness from line segment heights
|
||||
border_top = top_end - top_start
|
||||
border_bot = bot_end - bot_start
|
||||
|
||||
box = DetectedBox(
|
||||
x=content_x,
|
||||
y=box_y,
|
||||
width=content_w,
|
||||
height=box_h,
|
||||
confidence=0.8,
|
||||
border_thickness=max(border_top, border_bot),
|
||||
)
|
||||
boxes.append(box)
|
||||
used.add(i)
|
||||
used.add(j)
|
||||
break # move to next top-line candidate
|
||||
|
||||
return boxes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stage 2: Background shading detection (color + grayscale)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _detect_boxes_by_shading(
|
||||
img_bgr: np.ndarray,
|
||||
content_x: int,
|
||||
content_w: int,
|
||||
content_y: int,
|
||||
content_h: int,
|
||||
) -> List[DetectedBox]:
|
||||
"""Find boxes with shaded/colored background (no visible border lines).
|
||||
|
||||
Uses heavy median blur to remove text and reveal the underlying background.
|
||||
Then detects rectangular regions where the background differs from white.
|
||||
Works for both colored boxes (blue, green) and grayscale shading (gray on
|
||||
B/W scans).
|
||||
|
||||
Args:
|
||||
img_bgr: BGR color image (full page).
|
||||
content_x, content_w: Horizontal content bounds.
|
||||
content_y, content_h: Vertical content bounds.
|
||||
|
||||
Returns:
|
||||
List of DetectedBox for each detected shaded box.
|
||||
"""
|
||||
h, w = img_bgr.shape[:2]
|
||||
|
||||
# --- Heavy median blur removes text strokes, keeps background ---
|
||||
blur_size = 31 # large kernel to wipe out text
|
||||
blurred = cv2.medianBlur(img_bgr, blur_size)
|
||||
blur_gray = cv2.cvtColor(blurred, cv2.COLOR_BGR2GRAY)
|
||||
blur_hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV)
|
||||
|
||||
# Estimate page background from top-left / top-right corners
|
||||
corner_size = max(20, min(h // 10, w // 10))
|
||||
corners = np.concatenate([
|
||||
blur_gray[:corner_size, :corner_size].ravel(),
|
||||
blur_gray[:corner_size, -corner_size:].ravel(),
|
||||
])
|
||||
page_bg = float(np.median(corners))
|
||||
|
||||
# Two masks: grayscale shading + color saturation
|
||||
# Grayscale: regions noticeably darker than the page background
|
||||
shade_thresh = max(page_bg - 30, 150)
|
||||
gray_mask = (blur_gray < shade_thresh).astype(np.uint8) * 255
|
||||
|
||||
# Color: regions with noticeable saturation (blue/green/etc. boxes)
|
||||
sat_mask = (blur_hsv[:, :, 1] > 20).astype(np.uint8) * 255
|
||||
|
||||
combined = cv2.bitwise_or(gray_mask, sat_mask)
|
||||
|
||||
# Morphological cleanup: close gaps, remove small noise
|
||||
kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (25, 10))
|
||||
combined = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel_close)
|
||||
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (10, 5))
|
||||
combined = cv2.morphologyEx(combined, cv2.MORPH_OPEN, kernel_open)
|
||||
|
||||
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
# Size thresholds: smaller boxes allowed (e.g. "German leihen" ~30% width)
|
||||
min_area = content_w * 30 # at least 30px tall at full width
|
||||
min_box_h = 25
|
||||
max_box_h = int(content_h * 0.70)
|
||||
min_width_ratio = 0.25 # boxes can be ~25% of content width
|
||||
|
||||
boxes: List[DetectedBox] = []
|
||||
for cnt in contours:
|
||||
area = cv2.contourArea(cnt)
|
||||
if area < min_area:
|
||||
continue
|
||||
|
||||
bx, by, bw, bh = cv2.boundingRect(cnt)
|
||||
|
||||
# Width filter
|
||||
if bw < content_w * min_width_ratio:
|
||||
continue
|
||||
|
||||
# Height filter
|
||||
if bh < min_box_h or bh > max_box_h:
|
||||
continue
|
||||
|
||||
# Rectangularity check: area / bounding-rect area > 0.6
|
||||
rect_area = bw * bh
|
||||
if rect_area > 0 and area / rect_area < 0.5:
|
||||
continue
|
||||
|
||||
# Verify that the background inside this region is actually shaded
|
||||
roi_gray = blur_gray[by:by + bh, bx:bx + bw]
|
||||
roi_hsv = blur_hsv[by:by + bh, bx:bx + bw]
|
||||
if roi_gray.size == 0:
|
||||
continue
|
||||
|
||||
median_val = float(np.median(roi_gray))
|
||||
median_sat = float(np.median(roi_hsv[:, :, 1]))
|
||||
|
||||
# Must be noticeably different from page background
|
||||
is_shaded = median_val < (page_bg - 15)
|
||||
is_colored = median_sat > 15
|
||||
|
||||
if not is_shaded and not is_colored:
|
||||
continue
|
||||
|
||||
conf = 0.7 if is_colored else 0.6
|
||||
|
||||
boxes.append(DetectedBox(
|
||||
x=bx,
|
||||
y=by,
|
||||
width=bw,
|
||||
height=bh,
|
||||
confidence=conf,
|
||||
border_thickness=0,
|
||||
))
|
||||
|
||||
return boxes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _validate_box(
|
||||
box: DetectedBox,
|
||||
gray: np.ndarray,
|
||||
content_w: int,
|
||||
content_h: int,
|
||||
median_row_gap: int,
|
||||
) -> bool:
|
||||
"""Validate that a detected box is genuine (not a table-row separator etc.)."""
|
||||
# Must span > 25% of content width (lowered from 60% to allow smaller boxes)
|
||||
if box.width < content_w * 0.25:
|
||||
return False
|
||||
|
||||
# Height constraints
|
||||
if box.height < 25 or box.height > content_h * 0.70:
|
||||
return False
|
||||
|
||||
# Must not be confused with a table-row separator:
|
||||
# real boxes are at least 3x the median row gap
|
||||
if median_row_gap > 0 and box.height < median_row_gap * 3:
|
||||
return False
|
||||
|
||||
# Must contain some text (ink density check)
|
||||
h, w = gray.shape[:2]
|
||||
y1 = max(0, box.y)
|
||||
y2 = min(h, box.y + box.height)
|
||||
x1 = max(0, box.x)
|
||||
x2 = min(w, box.x + box.width)
|
||||
roi = gray[y1:y2, x1:x2]
|
||||
if roi.size == 0:
|
||||
return False
|
||||
ink_ratio = np.sum(roi < 128) / roi.size
|
||||
if ink_ratio < 0.002: # nearly empty → not a real content box
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API: detect_boxes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _merge_overlapping_boxes(boxes: List[DetectedBox]) -> List[DetectedBox]:
|
||||
"""Merge boxes that overlap significantly (IoU > 0.3 or one contains the other).
|
||||
|
||||
When two boxes overlap, keep the one with higher confidence (or the larger
|
||||
one if confidences are equal).
|
||||
"""
|
||||
if len(boxes) <= 1:
|
||||
return boxes
|
||||
|
||||
# Sort by area descending so larger boxes are processed first
|
||||
boxes = sorted(boxes, key=lambda b: b.width * b.height, reverse=True)
|
||||
keep = [True] * len(boxes)
|
||||
|
||||
for i in range(len(boxes)):
|
||||
if not keep[i]:
|
||||
continue
|
||||
bi = boxes[i]
|
||||
for j in range(i + 1, len(boxes)):
|
||||
if not keep[j]:
|
||||
continue
|
||||
bj = boxes[j]
|
||||
|
||||
# Compute overlap
|
||||
x1 = max(bi.x, bj.x)
|
||||
y1 = max(bi.y, bj.y)
|
||||
x2 = min(bi.x + bi.width, bj.x + bj.width)
|
||||
y2 = min(bi.y + bi.height, bj.y + bj.height)
|
||||
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
continue # no overlap
|
||||
|
||||
inter = (x2 - x1) * (y2 - y1)
|
||||
area_i = bi.width * bi.height
|
||||
area_j = bj.width * bj.height
|
||||
smaller_area = min(area_i, area_j)
|
||||
|
||||
# If overlap covers > 50% of the smaller box, merge (drop the weaker)
|
||||
if smaller_area > 0 and inter / smaller_area > 0.50:
|
||||
# Keep the one with higher confidence; if equal, keep larger
|
||||
if bj.confidence > bi.confidence:
|
||||
keep[i] = False
|
||||
break
|
||||
else:
|
||||
keep[j] = False
|
||||
|
||||
return [b for b, k in zip(boxes, keep) if k]
|
||||
|
||||
|
||||
def detect_boxes(
|
||||
img_bgr: np.ndarray,
|
||||
content_x: int,
|
||||
content_w: int,
|
||||
content_y: int,
|
||||
content_h: int,
|
||||
median_row_gap: int = 0,
|
||||
) -> List[DetectedBox]:
|
||||
"""Detect embedded boxes on a page image.
|
||||
|
||||
Runs BOTH line-based and shading-based detection, then merges and
|
||||
deduplicates results.
|
||||
|
||||
Args:
|
||||
img_bgr: BGR color image (full page or cropped).
|
||||
content_x, content_w: Horizontal content bounds.
|
||||
content_y, content_h: Vertical content bounds.
|
||||
median_row_gap: Median row gap height (for filtering out table separators).
|
||||
|
||||
Returns:
|
||||
List of validated DetectedBox instances, sorted by y position.
|
||||
"""
|
||||
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# Stage 1: Line-based detection (bordered boxes)
|
||||
line_boxes = _detect_boxes_by_lines(gray, content_x, content_w, content_y, content_h)
|
||||
|
||||
# Stage 2: Shading-based detection (colored/gray background boxes)
|
||||
shade_boxes = _detect_boxes_by_shading(img_bgr, content_x, content_w, content_y, content_h)
|
||||
|
||||
logger.debug("BoxDetect: %d line-based, %d shading-based candidates",
|
||||
len(line_boxes), len(shade_boxes))
|
||||
|
||||
# Combine and deduplicate
|
||||
all_boxes = line_boxes + shade_boxes
|
||||
merged = _merge_overlapping_boxes(all_boxes)
|
||||
|
||||
# Validate
|
||||
validated = [b for b in merged if _validate_box(b, gray, content_w, content_h, median_row_gap)]
|
||||
|
||||
# Sort top to bottom
|
||||
validated.sort(key=lambda b: b.y)
|
||||
|
||||
if validated:
|
||||
logger.info("BoxDetect: %d box(es) detected (line=%d, shade=%d, merged=%d)",
|
||||
len(validated), len(line_boxes), len(shade_boxes), len(merged))
|
||||
else:
|
||||
logger.debug("BoxDetect: no boxes detected")
|
||||
|
||||
return validated
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Zone Splitting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def split_page_into_zones(
|
||||
content_x: int,
|
||||
content_y: int,
|
||||
content_w: int,
|
||||
content_h: int,
|
||||
boxes: List[DetectedBox],
|
||||
min_zone_height: int = 40,
|
||||
) -> List[PageZone]:
|
||||
"""Split a page into vertical zones based on detected boxes.
|
||||
|
||||
Regions above, between, and below boxes become 'content' zones;
|
||||
box regions become 'box' zones.
|
||||
|
||||
Args:
|
||||
content_x, content_y, content_w, content_h: Content area bounds.
|
||||
boxes: Detected boxes, sorted by y position.
|
||||
min_zone_height: Minimum height for a content zone to be kept.
|
||||
|
||||
Returns:
|
||||
List of PageZone, ordered top to bottom.
|
||||
"""
|
||||
if not boxes:
|
||||
# Single zone: entire content area
|
||||
return [PageZone(
|
||||
index=0,
|
||||
zone_type='content',
|
||||
y=content_y,
|
||||
height=content_h,
|
||||
x=content_x,
|
||||
width=content_w,
|
||||
)]
|
||||
|
||||
zones: List[PageZone] = []
|
||||
zone_idx = 0
|
||||
cursor_y = content_y
|
||||
content_bottom = content_y + content_h
|
||||
|
||||
for box in boxes:
|
||||
# Content zone above this box
|
||||
gap_above = box.y - cursor_y
|
||||
if gap_above >= min_zone_height:
|
||||
zones.append(PageZone(
|
||||
index=zone_idx,
|
||||
zone_type='content',
|
||||
y=cursor_y,
|
||||
height=gap_above,
|
||||
x=content_x,
|
||||
width=content_w,
|
||||
))
|
||||
zone_idx += 1
|
||||
|
||||
# Box zone
|
||||
zones.append(PageZone(
|
||||
index=zone_idx,
|
||||
zone_type='box',
|
||||
y=box.y,
|
||||
height=box.height,
|
||||
x=box.x,
|
||||
width=box.width,
|
||||
box=box,
|
||||
))
|
||||
zone_idx += 1
|
||||
|
||||
cursor_y = box.y + box.height
|
||||
|
||||
# Content zone below last box
|
||||
remaining = content_bottom - cursor_y
|
||||
if remaining >= min_zone_height:
|
||||
zones.append(PageZone(
|
||||
index=zone_idx,
|
||||
zone_type='content',
|
||||
y=cursor_y,
|
||||
height=remaining,
|
||||
x=content_x,
|
||||
width=content_w,
|
||||
))
|
||||
|
||||
logger.info(f"ZoneSplit: {len(zones)} zones from {len(boxes)} box(es): "
|
||||
f"{[z.zone_type for z in zones]}")
|
||||
|
||||
return zones
|
||||
339
klausur-service/backend/ocr/detect/box_layout.py
Normal file
339
klausur-service/backend/ocr/detect/box_layout.py
Normal file
@@ -0,0 +1,339 @@
|
||||
"""
|
||||
Box layout classifier — detects internal layout type of embedded boxes.
|
||||
|
||||
Classifies each box as: flowing | columnar | bullet_list | header_only
|
||||
and provides layout-appropriate grid building.
|
||||
|
||||
Used by the Box-Grid-Review step to rebuild box zones with correct structure.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import statistics
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Bullet / list-item patterns at the start of a line
|
||||
_BULLET_RE = re.compile(
|
||||
r'^[\-\u2022\u2013\u2014\u25CF\u25CB\u25AA\u25A0•·]\s' # dash, bullet chars
|
||||
r'|^\d{1,2}[.)]\s' # numbered: "1) " or "1. "
|
||||
r'|^[a-z][.)]\s' # lettered: "a) " or "a. "
|
||||
)
|
||||
|
||||
|
||||
def classify_box_layout(
|
||||
words: List[Dict],
|
||||
box_w: int,
|
||||
box_h: int,
|
||||
) -> str:
|
||||
"""Classify the internal layout of a detected box.
|
||||
|
||||
Args:
|
||||
words: OCR word dicts within the box (with top, left, width, height, text)
|
||||
box_w: Box width in pixels
|
||||
box_h: Box height in pixels
|
||||
|
||||
Returns:
|
||||
'header_only' | 'bullet_list' | 'columnar' | 'flowing'
|
||||
"""
|
||||
if not words:
|
||||
return "header_only"
|
||||
|
||||
# Group words into lines by y-proximity
|
||||
lines = _group_into_lines(words)
|
||||
|
||||
# Header only: very few words or single line
|
||||
total_words = sum(len(line) for line in lines)
|
||||
if total_words <= 5 or len(lines) <= 1:
|
||||
return "header_only"
|
||||
|
||||
# Bullet list: check if majority of lines start with bullet patterns
|
||||
bullet_count = 0
|
||||
for line in lines:
|
||||
first_text = line[0].get("text", "") if line else ""
|
||||
if _BULLET_RE.match(first_text):
|
||||
bullet_count += 1
|
||||
# Also check if first word IS a bullet char
|
||||
elif first_text.strip() in ("-", "–", "—", "•", "·", "▪", "▸"):
|
||||
bullet_count += 1
|
||||
if bullet_count >= len(lines) * 0.4 and bullet_count >= 2:
|
||||
return "bullet_list"
|
||||
|
||||
# Columnar: check for multiple distinct x-clusters
|
||||
if len(lines) >= 3 and _has_column_structure(words, box_w):
|
||||
return "columnar"
|
||||
|
||||
# Default: flowing text
|
||||
return "flowing"
|
||||
|
||||
|
||||
def _group_into_lines(words: List[Dict]) -> List[List[Dict]]:
|
||||
"""Group words into lines by y-proximity."""
|
||||
if not words:
|
||||
return []
|
||||
|
||||
sorted_words = sorted(words, key=lambda w: (w["top"], w["left"]))
|
||||
heights = [w["height"] for w in sorted_words if w.get("height", 0) > 0]
|
||||
median_h = statistics.median(heights) if heights else 20
|
||||
y_tolerance = max(median_h * 0.5, 5)
|
||||
|
||||
lines: List[List[Dict]] = []
|
||||
current_line: List[Dict] = [sorted_words[0]]
|
||||
current_y = sorted_words[0]["top"]
|
||||
|
||||
for w in sorted_words[1:]:
|
||||
if abs(w["top"] - current_y) <= y_tolerance:
|
||||
current_line.append(w)
|
||||
else:
|
||||
lines.append(sorted(current_line, key=lambda ww: ww["left"]))
|
||||
current_line = [w]
|
||||
current_y = w["top"]
|
||||
|
||||
if current_line:
|
||||
lines.append(sorted(current_line, key=lambda ww: ww["left"]))
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
def _has_column_structure(words: List[Dict], box_w: int) -> bool:
|
||||
"""Check if words have multiple distinct left-edge clusters (columns)."""
|
||||
if box_w <= 0:
|
||||
return False
|
||||
|
||||
lines = _group_into_lines(words)
|
||||
if len(lines) < 3:
|
||||
return False
|
||||
|
||||
# Collect left-edges of non-first words in each line
|
||||
# (first word of each line often aligns regardless of columns)
|
||||
left_edges = []
|
||||
for line in lines:
|
||||
for w in line[1:]: # skip first word
|
||||
left_edges.append(w["left"])
|
||||
|
||||
if len(left_edges) < 4:
|
||||
return False
|
||||
|
||||
# Check if left edges cluster into 2+ distinct groups
|
||||
left_edges.sort()
|
||||
gaps = [left_edges[i + 1] - left_edges[i] for i in range(len(left_edges) - 1)]
|
||||
if not gaps:
|
||||
return False
|
||||
|
||||
median_gap = statistics.median(gaps)
|
||||
# A column gap is typically > 15% of box width
|
||||
column_gap_threshold = box_w * 0.15
|
||||
large_gaps = [g for g in gaps if g > column_gap_threshold]
|
||||
|
||||
return len(large_gaps) >= 1
|
||||
|
||||
|
||||
def build_box_zone_grid(
|
||||
zone_words: List[Dict],
|
||||
box_x: int,
|
||||
box_y: int,
|
||||
box_w: int,
|
||||
box_h: int,
|
||||
zone_index: int,
|
||||
img_w: int,
|
||||
img_h: int,
|
||||
layout_type: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Build a grid for a box zone with layout-aware processing.
|
||||
|
||||
If layout_type is None, auto-detects it.
|
||||
For 'flowing' and 'bullet_list', forces single-column layout.
|
||||
For 'columnar', uses the standard multi-column detection.
|
||||
For 'header_only', creates a single cell.
|
||||
|
||||
Returns the same format as _build_zone_grid (columns, rows, cells, header_rows).
|
||||
"""
|
||||
from grid_editor_helpers import _build_zone_grid, _cluster_rows
|
||||
|
||||
if not zone_words:
|
||||
return {
|
||||
"columns": [],
|
||||
"rows": [],
|
||||
"cells": [],
|
||||
"header_rows": [],
|
||||
"box_layout_type": layout_type or "header_only",
|
||||
"box_grid_reviewed": False,
|
||||
}
|
||||
|
||||
# Auto-detect layout if not specified
|
||||
if not layout_type:
|
||||
layout_type = classify_box_layout(zone_words, box_w, box_h)
|
||||
|
||||
logger.info(
|
||||
"Box zone %d: layout_type=%s, %d words, %dx%d",
|
||||
zone_index, layout_type, len(zone_words), box_w, box_h,
|
||||
)
|
||||
|
||||
if layout_type == "header_only":
|
||||
# Single cell with all text concatenated
|
||||
all_text = " ".join(
|
||||
w.get("text", "") for w in sorted(zone_words, key=lambda ww: (ww["top"], ww["left"]))
|
||||
).strip()
|
||||
return {
|
||||
"columns": [{"col_index": 0, "index": 0, "label": "column_text", "col_type": "column_1",
|
||||
"x_min_px": box_x, "x_max_px": box_x + box_w,
|
||||
"x_min_pct": round(box_x / img_w * 100, 2) if img_w else 0,
|
||||
"x_max_pct": round((box_x + box_w) / img_w * 100, 2) if img_w else 0,
|
||||
"bold": False}],
|
||||
"rows": [{"index": 0, "row_index": 0,
|
||||
"y_min": box_y, "y_max": box_y + box_h, "y_center": box_y + box_h / 2,
|
||||
"y_min_px": box_y, "y_max_px": box_y + box_h,
|
||||
"y_min_pct": round(box_y / img_h * 100, 2) if img_h else 0,
|
||||
"y_max_pct": round((box_y + box_h) / img_h * 100, 2) if img_h else 0,
|
||||
"is_header": True}],
|
||||
"cells": [{
|
||||
"cell_id": f"Z{zone_index}_R0C0",
|
||||
"row_index": 0,
|
||||
"col_index": 0,
|
||||
"col_type": "column_1",
|
||||
"text": all_text,
|
||||
"word_boxes": zone_words,
|
||||
}],
|
||||
"header_rows": [0],
|
||||
"box_layout_type": layout_type,
|
||||
"box_grid_reviewed": False,
|
||||
}
|
||||
|
||||
if layout_type in ("flowing", "bullet_list"):
|
||||
# Force single column — each line becomes one row with one cell.
|
||||
# Detect bullet structure from indentation and merge continuation
|
||||
# lines into the bullet they belong to.
|
||||
lines = _group_into_lines(zone_words)
|
||||
column = {
|
||||
"col_index": 0, "index": 0, "label": "column_text", "col_type": "column_1",
|
||||
"x_min_px": box_x, "x_max_px": box_x + box_w,
|
||||
"x_min_pct": round(box_x / img_w * 100, 2) if img_w else 0,
|
||||
"x_max_pct": round((box_x + box_w) / img_w * 100, 2) if img_w else 0,
|
||||
"bold": False,
|
||||
}
|
||||
|
||||
# --- Detect indentation levels ---
|
||||
line_indents = []
|
||||
for line_words in lines:
|
||||
if not line_words:
|
||||
line_indents.append(0)
|
||||
continue
|
||||
min_left = min(w["left"] for w in line_words)
|
||||
line_indents.append(min_left - box_x)
|
||||
|
||||
# Find the minimum indent (= bullet/main level)
|
||||
valid_indents = [ind for ind in line_indents if ind >= 0]
|
||||
min_indent = min(valid_indents) if valid_indents else 0
|
||||
|
||||
# Indentation threshold: lines indented > 15px more than minimum
|
||||
# are continuation lines belonging to the previous bullet
|
||||
INDENT_THRESHOLD = 15
|
||||
|
||||
# --- Group lines into logical items (bullet + continuations) ---
|
||||
# Each item is a list of line indices
|
||||
items: List[List[int]] = []
|
||||
for li, indent in enumerate(line_indents):
|
||||
is_continuation = (indent > min_indent + INDENT_THRESHOLD) and len(items) > 0
|
||||
if is_continuation:
|
||||
items[-1].append(li)
|
||||
else:
|
||||
items.append([li])
|
||||
|
||||
logger.info(
|
||||
"Box zone %d flowing: %d lines → %d items (indents=%s, min=%d, threshold=%d)",
|
||||
zone_index, len(lines), len(items),
|
||||
[int(i) for i in line_indents], int(min_indent), INDENT_THRESHOLD,
|
||||
)
|
||||
|
||||
# --- Build rows and cells from grouped items ---
|
||||
rows = []
|
||||
cells = []
|
||||
header_rows = []
|
||||
|
||||
for row_idx, item_line_indices in enumerate(items):
|
||||
# Collect all words from all lines in this item
|
||||
item_words = []
|
||||
item_texts = []
|
||||
for li in item_line_indices:
|
||||
if li < len(lines):
|
||||
item_words.extend(lines[li])
|
||||
line_text = " ".join(w.get("text", "") for w in lines[li]).strip()
|
||||
if line_text:
|
||||
item_texts.append(line_text)
|
||||
|
||||
if not item_words:
|
||||
continue
|
||||
|
||||
y_min = min(w["top"] for w in item_words)
|
||||
y_max = max(w["top"] + w["height"] for w in item_words)
|
||||
y_center = (y_min + y_max) / 2
|
||||
|
||||
row = {
|
||||
"index": row_idx,
|
||||
"row_index": row_idx,
|
||||
"y_min": y_min,
|
||||
"y_max": y_max,
|
||||
"y_center": y_center,
|
||||
"y_min_px": y_min,
|
||||
"y_max_px": y_max,
|
||||
"y_min_pct": round(y_min / img_h * 100, 2) if img_h else 0,
|
||||
"y_max_pct": round(y_max / img_h * 100, 2) if img_h else 0,
|
||||
"is_header": False,
|
||||
}
|
||||
rows.append(row)
|
||||
|
||||
# Join multi-line text with newline for display
|
||||
merged_text = "\n".join(item_texts)
|
||||
|
||||
# Add bullet marker if this is a bullet item without one
|
||||
first_text = item_texts[0] if item_texts else ""
|
||||
is_bullet = len(item_line_indices) > 1 or _BULLET_RE.match(first_text)
|
||||
if is_bullet and not _BULLET_RE.match(first_text) and row_idx > 0:
|
||||
# Continuation item without bullet — add one
|
||||
merged_text = "• " + merged_text
|
||||
|
||||
cell = {
|
||||
"cell_id": f"Z{zone_index}_R{row_idx}C0",
|
||||
"row_index": row_idx,
|
||||
"col_index": 0,
|
||||
"col_type": "column_1",
|
||||
"text": merged_text,
|
||||
"word_boxes": item_words,
|
||||
}
|
||||
cells.append(cell)
|
||||
|
||||
# Detect header: first item if it has no continuation lines and is short
|
||||
if len(items) >= 2:
|
||||
first_item_texts = []
|
||||
for li in items[0]:
|
||||
if li < len(lines):
|
||||
first_item_texts.append(" ".join(w.get("text", "") for w in lines[li]).strip())
|
||||
first_text = " ".join(first_item_texts)
|
||||
if (len(first_text) < 40
|
||||
or first_text.isupper()
|
||||
or first_text.rstrip().endswith(':')):
|
||||
header_rows = [0]
|
||||
|
||||
return {
|
||||
"columns": [column],
|
||||
"rows": rows,
|
||||
"cells": cells,
|
||||
"header_rows": header_rows,
|
||||
"box_layout_type": layout_type,
|
||||
"box_grid_reviewed": False,
|
||||
}
|
||||
|
||||
# Columnar: use standard grid builder with independent column detection
|
||||
result = _build_zone_grid(
|
||||
zone_words, box_x, box_y, box_w, box_h,
|
||||
zone_index, img_w, img_h,
|
||||
global_columns=None, # detect columns independently
|
||||
)
|
||||
|
||||
# Colspan detection is now handled generically by _detect_colspan_cells
|
||||
# in grid_editor_helpers.py (called inside _build_zone_grid).
|
||||
|
||||
result["box_layout_type"] = layout_type
|
||||
result["box_grid_reviewed"] = False
|
||||
return result
|
||||
312
klausur-service/backend/ocr/detect/color_detect.py
Normal file
312
klausur-service/backend/ocr/detect/color_detect.py
Normal file
@@ -0,0 +1,312 @@
|
||||
"""
|
||||
Color detection for OCR word boxes.
|
||||
|
||||
Detects the text color of existing OCR words and recovers colored text
|
||||
regions (e.g. red markers, blue headings) that standard OCR may have missed.
|
||||
|
||||
Standard OCR (Tesseract, PaddleOCR) binarises images before processing,
|
||||
destroying all color information. This module adds it back by sampling
|
||||
HSV pixel values at word-box positions and finding colored regions that
|
||||
no word-box covers.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HSV color ranges (OpenCV: H 0-180, S 0-255, V 0-255)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_COLOR_RANGES: Dict[str, List[Tuple[np.ndarray, np.ndarray]]] = {
|
||||
"red": [
|
||||
(np.array([0, 70, 50]), np.array([10, 255, 255])),
|
||||
(np.array([170, 70, 50]), np.array([180, 255, 255])),
|
||||
],
|
||||
"orange": [
|
||||
(np.array([10, 70, 50]), np.array([25, 255, 255])),
|
||||
],
|
||||
"yellow": [
|
||||
(np.array([25, 70, 50]), np.array([35, 255, 255])),
|
||||
],
|
||||
"green": [
|
||||
(np.array([35, 70, 50]), np.array([85, 255, 255])),
|
||||
],
|
||||
"blue": [
|
||||
(np.array([100, 70, 50]), np.array([130, 255, 255])),
|
||||
],
|
||||
"purple": [
|
||||
(np.array([130, 70, 50]), np.array([170, 255, 255])),
|
||||
],
|
||||
}
|
||||
|
||||
_COLOR_HEX: Dict[str, str] = {
|
||||
"black": "#000000",
|
||||
"gray": "#6b7280",
|
||||
"red": "#dc2626",
|
||||
"orange": "#ea580c",
|
||||
"yellow": "#ca8a04",
|
||||
"green": "#16a34a",
|
||||
"blue": "#2563eb",
|
||||
"purple": "#9333ea",
|
||||
}
|
||||
|
||||
|
||||
def _hue_to_color_name(hue: float) -> str:
|
||||
"""Map OpenCV hue (0-180) to a color name."""
|
||||
if hue < 10 or hue > 170:
|
||||
return "red"
|
||||
if hue < 25:
|
||||
return "orange"
|
||||
if hue < 35:
|
||||
return "yellow"
|
||||
if hue < 85:
|
||||
return "green"
|
||||
if hue < 130:
|
||||
return "blue"
|
||||
return "purple"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. Color annotation for existing word boxes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_word_colors(
|
||||
img_bgr: np.ndarray,
|
||||
word_boxes: List[Dict],
|
||||
sat_threshold: int = 55,
|
||||
min_sat_ratio: float = 0.25,
|
||||
) -> None:
|
||||
"""Annotate each word_box in-place with its detected text color.
|
||||
|
||||
Adds ``color`` (hex string) and ``color_name`` (e.g. 'red', 'black')
|
||||
keys to each dict.
|
||||
|
||||
Algorithm per word:
|
||||
1. Crop the word region from the image.
|
||||
2. Otsu-threshold for text/background separation.
|
||||
3. Sample background color from border pixels of the crop.
|
||||
4. Remove text pixels that match the background (avoids colored
|
||||
backgrounds like blue boxes leaking into the result).
|
||||
5. Use **median** hue (robust to outliers) and require a minimum
|
||||
ratio of saturated pixels before classifying as colored.
|
||||
"""
|
||||
if img_bgr is None or not word_boxes:
|
||||
return
|
||||
|
||||
img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
|
||||
img_h, img_w = img_bgr.shape[:2]
|
||||
|
||||
colored_count = 0
|
||||
|
||||
for wb in word_boxes:
|
||||
x1 = max(0, int(wb["left"]))
|
||||
y1 = max(0, int(wb["top"]))
|
||||
x2 = min(img_w, int(wb["left"] + wb["width"]))
|
||||
y2 = min(img_h, int(wb["top"] + wb["height"]))
|
||||
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
continue
|
||||
|
||||
crop_hsv = img_hsv[y1:y2, x1:x2]
|
||||
crop_bgr = img_bgr[y1:y2, x1:x2]
|
||||
crop_gray = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2GRAY)
|
||||
ch, cw = crop_hsv.shape[:2]
|
||||
|
||||
# --- Text mask: Otsu (adaptive) + high-saturation pixels ---
|
||||
_, dark_mask = cv2.threshold(
|
||||
crop_gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU,
|
||||
)
|
||||
sat_mask = (crop_hsv[:, :, 1] > sat_threshold).astype(np.uint8) * 255
|
||||
text_mask = cv2.bitwise_or(dark_mask, sat_mask)
|
||||
|
||||
text_pixels = crop_hsv[text_mask > 0]
|
||||
|
||||
if len(text_pixels) < 3:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
continue
|
||||
|
||||
# --- Background subtraction via border pixels ---
|
||||
# Sample background from the 2px border ring of the crop
|
||||
if ch > 6 and cw > 6:
|
||||
border = 2
|
||||
bg_top = crop_hsv[:border, :].reshape(-1, 3)
|
||||
bg_bot = crop_hsv[-border:, :].reshape(-1, 3)
|
||||
bg_lft = crop_hsv[border:-border, :border].reshape(-1, 3)
|
||||
bg_rgt = crop_hsv[border:-border, -border:].reshape(-1, 3)
|
||||
bg_pixels = np.vstack([bg_top, bg_bot, bg_lft, bg_rgt])
|
||||
|
||||
bg_med_h = float(np.median(bg_pixels[:, 0]))
|
||||
bg_med_s = float(np.median(bg_pixels[:, 1]))
|
||||
|
||||
# If background is tinted (S > 15), remove text pixels
|
||||
# with similar hue to avoid false colored detections
|
||||
if bg_med_s > 15:
|
||||
hue_diff = np.minimum(
|
||||
np.abs(text_pixels[:, 0].astype(float) - bg_med_h),
|
||||
180.0 - np.abs(text_pixels[:, 0].astype(float) - bg_med_h),
|
||||
)
|
||||
keep = hue_diff > 20
|
||||
if np.any(keep):
|
||||
text_pixels = text_pixels[keep]
|
||||
|
||||
if len(text_pixels) < 3:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
continue
|
||||
|
||||
# --- Classification using MEDIAN (robust to outliers) ---
|
||||
median_sat = float(np.median(text_pixels[:, 1]))
|
||||
sat_count = int(np.sum(text_pixels[:, 1] > sat_threshold))
|
||||
sat_ratio = sat_count / len(text_pixels)
|
||||
|
||||
if median_sat < sat_threshold or sat_ratio < min_sat_ratio:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
else:
|
||||
# Use median hue of saturated pixels only for cleaner signal
|
||||
sat_pixels = text_pixels[text_pixels[:, 1] > sat_threshold]
|
||||
median_hue = float(np.median(sat_pixels[:, 0]))
|
||||
name = _hue_to_color_name(median_hue)
|
||||
|
||||
# Red requires higher saturation — scanner artifacts on black
|
||||
# text often produce a slight warm tint (hue ~0) with low
|
||||
# saturation that would otherwise be misclassified as red.
|
||||
if name == "red" and median_sat < 90:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
continue
|
||||
|
||||
wb["color"] = _COLOR_HEX.get(name, _COLOR_HEX["black"])
|
||||
wb["color_name"] = name
|
||||
colored_count += 1
|
||||
|
||||
if colored_count:
|
||||
logger.info("color annotation: %d / %d words are colored",
|
||||
colored_count, len(word_boxes))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Recover colored text that OCR missed
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def recover_colored_text(
|
||||
img_bgr: np.ndarray,
|
||||
existing_words: List[Dict],
|
||||
min_area: int = 40,
|
||||
max_regions: int = 60,
|
||||
) -> List[Dict]:
|
||||
"""Find colored text regions not covered by any existing word box.
|
||||
|
||||
Returns a list of recovered word dicts with ``color``, ``color_name``,
|
||||
and ``recovered=True`` fields. The ``text`` is set via a lightweight
|
||||
shape heuristic (e.g. ``!`` for tall narrow shapes) or ``?``.
|
||||
"""
|
||||
if img_bgr is None:
|
||||
return []
|
||||
|
||||
img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
|
||||
ih, iw = img_bgr.shape[:2]
|
||||
max_area = int(ih * iw * 0.005)
|
||||
|
||||
# --- Build occupancy mask from existing words (adaptive padding) ---
|
||||
# Pad word boxes generously to prevent colored-pixel artifacts in
|
||||
# narrow inter-word gaps from being recovered as false characters.
|
||||
heights = [wb["height"] for wb in existing_words if wb.get("height", 0) > 0]
|
||||
median_h = int(np.median(heights)) if heights else 20
|
||||
pad = max(8, int(median_h * 0.35))
|
||||
|
||||
occupied = np.zeros((ih, iw), dtype=np.uint8)
|
||||
for wb in existing_words:
|
||||
x1 = max(0, int(wb["left"]) - pad)
|
||||
y1 = max(0, int(wb["top"]) - pad)
|
||||
x2 = min(iw, int(wb["left"] + wb["width"]) + pad)
|
||||
y2 = min(ih, int(wb["top"] + wb["height"]) + pad)
|
||||
occupied[y1:y2, x1:x2] = 255
|
||||
|
||||
recovered: List[Dict] = []
|
||||
|
||||
for color_name, ranges in _COLOR_RANGES.items():
|
||||
# Create mask for this color
|
||||
mask = np.zeros((ih, iw), dtype=np.uint8)
|
||||
for lower, upper in ranges:
|
||||
mask = cv2.bitwise_or(mask, cv2.inRange(img_hsv, lower, upper))
|
||||
|
||||
# Remove pixels already covered by existing OCR words
|
||||
mask = cv2.bitwise_and(mask, cv2.bitwise_not(occupied))
|
||||
|
||||
# Morphological cleanup:
|
||||
# - Close with tall kernel to merge ! stroke + dot
|
||||
# - Open to remove noise specks
|
||||
kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 8))
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_close)
|
||||
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_open)
|
||||
|
||||
contours, _ = cv2.findContours(
|
||||
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE,
|
||||
)
|
||||
|
||||
candidates = []
|
||||
for cnt in contours:
|
||||
area = cv2.contourArea(cnt)
|
||||
if area < min_area or area > max_area:
|
||||
continue
|
||||
bx, by, bw, bh = cv2.boundingRect(cnt)
|
||||
if bh < 6:
|
||||
continue
|
||||
# Reject regions too wide to be single characters
|
||||
if bw > median_h * 4:
|
||||
continue
|
||||
candidates.append((area, bx, by, bw, bh))
|
||||
|
||||
# Keep largest first, limited count
|
||||
candidates.sort(key=lambda c: c[0], reverse=True)
|
||||
|
||||
for area, bx, by, bw, bh in candidates[:max_regions]:
|
||||
text = _identify_shape(bw, bh)
|
||||
recovered.append({
|
||||
"text": text,
|
||||
"left": bx,
|
||||
"top": by,
|
||||
"width": bw,
|
||||
"height": bh,
|
||||
"conf": 45,
|
||||
"color": _COLOR_HEX.get(color_name, "#000000"),
|
||||
"color_name": color_name,
|
||||
"recovered": True,
|
||||
})
|
||||
|
||||
if recovered:
|
||||
logger.info(
|
||||
"color recovery: %d colored regions found (%s)",
|
||||
len(recovered),
|
||||
", ".join(
|
||||
f"{c}: {sum(1 for r in recovered if r['color_name'] == c)}"
|
||||
for c in sorted({r["color_name"] for r in recovered})
|
||||
),
|
||||
)
|
||||
|
||||
return recovered
|
||||
|
||||
|
||||
def _identify_shape(w: int, h: int) -> str:
|
||||
"""Simple shape heuristic for common single-character text markers."""
|
||||
aspect = w / h if h > 0 else 1.0
|
||||
if aspect < 0.55 and h > 10:
|
||||
# Tall, narrow — likely exclamation mark
|
||||
return "!"
|
||||
if 0.6 < aspect < 1.5 and max(w, h) < 25:
|
||||
# Small, roughly square — bullet or dot
|
||||
return "•"
|
||||
return "?"
|
||||
413
klausur-service/backend/ocr/detect/doclayout_detect.py
Normal file
413
klausur-service/backend/ocr/detect/doclayout_detect.py
Normal file
@@ -0,0 +1,413 @@
|
||||
"""
|
||||
PP-DocLayout ONNX Document Layout Detection.
|
||||
|
||||
Uses PP-DocLayout ONNX model to detect document structure regions:
|
||||
table, figure, title, text, list, header, footer, equation, reference, abstract
|
||||
|
||||
Fallback: If ONNX model not available, returns empty list (caller should
|
||||
fall back to OpenCV-based detection in cv_graphic_detect.py).
|
||||
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = [
|
||||
"detect_layout_regions",
|
||||
"is_doclayout_available",
|
||||
"get_doclayout_status",
|
||||
"LayoutRegion",
|
||||
"DOCLAYOUT_CLASSES",
|
||||
]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Class labels (PP-DocLayout default order)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DOCLAYOUT_CLASSES = [
|
||||
"table", "figure", "title", "text", "list",
|
||||
"header", "footer", "equation", "reference", "abstract",
|
||||
]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class LayoutRegion:
|
||||
"""A detected document layout region."""
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
label: str # table, figure, title, text, list, etc.
|
||||
confidence: float
|
||||
label_index: int # raw class index
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ONNX model loading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_MODEL_SEARCH_PATHS = [
|
||||
# 1. Explicit environment variable
|
||||
os.environ.get("DOCLAYOUT_ONNX_PATH", ""),
|
||||
# 2. Docker default cache path
|
||||
"/root/.cache/huggingface/onnx/pp-doclayout/model.onnx",
|
||||
# 3. Local dev relative to working directory
|
||||
"models/onnx/pp-doclayout/model.onnx",
|
||||
]
|
||||
|
||||
_onnx_session: Optional[object] = None
|
||||
_model_path: Optional[str] = None
|
||||
_load_attempted: bool = False
|
||||
_load_error: Optional[str] = None
|
||||
|
||||
|
||||
def _find_model_path() -> Optional[str]:
|
||||
"""Search for the ONNX model file in known locations."""
|
||||
for p in _MODEL_SEARCH_PATHS:
|
||||
if p and Path(p).is_file():
|
||||
return str(Path(p).resolve())
|
||||
return None
|
||||
|
||||
|
||||
def _load_onnx_session():
|
||||
"""Lazy-load the ONNX runtime session (once)."""
|
||||
global _onnx_session, _model_path, _load_attempted, _load_error
|
||||
|
||||
if _load_attempted:
|
||||
return _onnx_session
|
||||
|
||||
_load_attempted = True
|
||||
|
||||
path = _find_model_path()
|
||||
if path is None:
|
||||
_load_error = "ONNX model not found in any search path"
|
||||
logger.info("PP-DocLayout: %s", _load_error)
|
||||
return None
|
||||
|
||||
try:
|
||||
import onnxruntime as ort # type: ignore[import-untyped]
|
||||
|
||||
sess_options = ort.SessionOptions()
|
||||
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
# Prefer CPU – keeps the GPU free for OCR / LLM.
|
||||
providers = ["CPUExecutionProvider"]
|
||||
_onnx_session = ort.InferenceSession(path, sess_options, providers=providers)
|
||||
_model_path = path
|
||||
logger.info("PP-DocLayout: model loaded from %s", path)
|
||||
except ImportError:
|
||||
_load_error = "onnxruntime not installed"
|
||||
logger.info("PP-DocLayout: %s", _load_error)
|
||||
except Exception as exc:
|
||||
_load_error = str(exc)
|
||||
logger.warning("PP-DocLayout: failed to load model from %s: %s", path, exc)
|
||||
|
||||
return _onnx_session
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def is_doclayout_available() -> bool:
|
||||
"""Return True if the ONNX model can be loaded successfully."""
|
||||
return _load_onnx_session() is not None
|
||||
|
||||
|
||||
def get_doclayout_status() -> Dict:
|
||||
"""Return diagnostic information about the DocLayout backend."""
|
||||
_load_onnx_session() # ensure we tried
|
||||
return {
|
||||
"available": _onnx_session is not None,
|
||||
"model_path": _model_path,
|
||||
"load_error": _load_error,
|
||||
"classes": DOCLAYOUT_CLASSES,
|
||||
"class_count": len(DOCLAYOUT_CLASSES),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pre-processing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_INPUT_SIZE = 800 # PP-DocLayout expects 800x800
|
||||
|
||||
|
||||
def preprocess_image(img_bgr: np.ndarray) -> tuple:
|
||||
"""Resize + normalize image for PP-DocLayout ONNX input.
|
||||
|
||||
Returns:
|
||||
(input_tensor, scale_x, scale_y, pad_x, pad_y)
|
||||
where scale/pad allow mapping boxes back to original coords.
|
||||
"""
|
||||
orig_h, orig_w = img_bgr.shape[:2]
|
||||
|
||||
# Compute scale to fit within _INPUT_SIZE keeping aspect ratio
|
||||
scale = min(_INPUT_SIZE / orig_w, _INPUT_SIZE / orig_h)
|
||||
new_w = int(orig_w * scale)
|
||||
new_h = int(orig_h * scale)
|
||||
|
||||
import cv2 # local import — cv2 is always available in this service
|
||||
resized = cv2.resize(img_bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
|
||||
|
||||
# Pad to _INPUT_SIZE x _INPUT_SIZE with gray (114)
|
||||
pad_x = (_INPUT_SIZE - new_w) // 2
|
||||
pad_y = (_INPUT_SIZE - new_h) // 2
|
||||
padded = np.full((_INPUT_SIZE, _INPUT_SIZE, 3), 114, dtype=np.uint8)
|
||||
padded[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = resized
|
||||
|
||||
# Normalize to [0, 1] float32
|
||||
blob = padded.astype(np.float32) / 255.0
|
||||
|
||||
# HWC → CHW
|
||||
blob = blob.transpose(2, 0, 1)
|
||||
|
||||
# Add batch dimension → (1, 3, 800, 800)
|
||||
blob = np.expand_dims(blob, axis=0)
|
||||
|
||||
return blob, scale, pad_x, pad_y
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Non-Maximum Suppression (NMS)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _compute_iou(box_a: np.ndarray, box_b: np.ndarray) -> float:
|
||||
"""Compute IoU between two boxes [x1, y1, x2, y2]."""
|
||||
ix1 = max(box_a[0], box_b[0])
|
||||
iy1 = max(box_a[1], box_b[1])
|
||||
ix2 = min(box_a[2], box_b[2])
|
||||
iy2 = min(box_a[3], box_b[3])
|
||||
|
||||
inter = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1)
|
||||
if inter == 0:
|
||||
return 0.0
|
||||
|
||||
area_a = (box_a[2] - box_a[0]) * (box_a[3] - box_a[1])
|
||||
area_b = (box_b[2] - box_b[0]) * (box_b[3] - box_b[1])
|
||||
union = area_a + area_b - inter
|
||||
return inter / union if union > 0 else 0.0
|
||||
|
||||
|
||||
def nms(boxes: np.ndarray, scores: np.ndarray, iou_threshold: float = 0.5) -> List[int]:
|
||||
"""Apply greedy Non-Maximum Suppression.
|
||||
|
||||
Args:
|
||||
boxes: (N, 4) array of [x1, y1, x2, y2].
|
||||
scores: (N,) confidence scores.
|
||||
iou_threshold: Overlap threshold for suppression.
|
||||
|
||||
Returns:
|
||||
List of kept indices.
|
||||
"""
|
||||
if len(boxes) == 0:
|
||||
return []
|
||||
|
||||
order = np.argsort(scores)[::-1].tolist()
|
||||
keep: List[int] = []
|
||||
|
||||
while order:
|
||||
i = order.pop(0)
|
||||
keep.append(i)
|
||||
remaining = []
|
||||
for j in order:
|
||||
if _compute_iou(boxes[i], boxes[j]) < iou_threshold:
|
||||
remaining.append(j)
|
||||
order = remaining
|
||||
|
||||
return keep
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Post-processing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _postprocess(
|
||||
outputs: list,
|
||||
scale: float,
|
||||
pad_x: int,
|
||||
pad_y: int,
|
||||
orig_w: int,
|
||||
orig_h: int,
|
||||
confidence_threshold: float,
|
||||
max_regions: int,
|
||||
) -> List[LayoutRegion]:
|
||||
"""Parse ONNX output tensors into LayoutRegion list.
|
||||
|
||||
PP-DocLayout ONNX typically outputs one tensor of shape
|
||||
(1, N, 6) or three tensors (boxes, scores, class_ids).
|
||||
We handle both common formats.
|
||||
"""
|
||||
regions: List[LayoutRegion] = []
|
||||
|
||||
# --- Determine output format ---
|
||||
if len(outputs) == 1:
|
||||
# Single tensor: (1, N, 4+1+1) = (batch, detections, [x1,y1,x2,y2,score,class])
|
||||
raw = np.squeeze(outputs[0]) # (N, 6) or (N, 5+num_classes)
|
||||
if raw.ndim == 1:
|
||||
raw = raw.reshape(1, -1)
|
||||
if raw.shape[0] == 0:
|
||||
return []
|
||||
|
||||
if raw.shape[1] == 6:
|
||||
# Format: x1, y1, x2, y2, score, class_id
|
||||
all_boxes = raw[:, :4]
|
||||
all_scores = raw[:, 4]
|
||||
all_classes = raw[:, 5].astype(int)
|
||||
elif raw.shape[1] > 6:
|
||||
# Format: x1, y1, x2, y2, obj_conf, cls0_conf, cls1_conf, ...
|
||||
all_boxes = raw[:, :4]
|
||||
cls_scores = raw[:, 5:]
|
||||
all_classes = np.argmax(cls_scores, axis=1)
|
||||
all_scores = raw[:, 4] * np.max(cls_scores, axis=1)
|
||||
else:
|
||||
logger.warning("PP-DocLayout: unexpected output shape %s", raw.shape)
|
||||
return []
|
||||
|
||||
elif len(outputs) == 3:
|
||||
# Three tensors: boxes (N,4), scores (N,), class_ids (N,)
|
||||
all_boxes = np.squeeze(outputs[0])
|
||||
all_scores = np.squeeze(outputs[1])
|
||||
all_classes = np.squeeze(outputs[2]).astype(int)
|
||||
if all_boxes.ndim == 1:
|
||||
all_boxes = all_boxes.reshape(1, 4)
|
||||
all_scores = np.array([all_scores])
|
||||
all_classes = np.array([all_classes])
|
||||
else:
|
||||
logger.warning("PP-DocLayout: unexpected %d output tensors", len(outputs))
|
||||
return []
|
||||
|
||||
# --- Confidence filter ---
|
||||
mask = all_scores >= confidence_threshold
|
||||
boxes = all_boxes[mask]
|
||||
scores = all_scores[mask]
|
||||
classes = all_classes[mask]
|
||||
|
||||
if len(boxes) == 0:
|
||||
return []
|
||||
|
||||
# --- NMS ---
|
||||
keep_idxs = nms(boxes, scores, iou_threshold=0.5)
|
||||
boxes = boxes[keep_idxs]
|
||||
scores = scores[keep_idxs]
|
||||
classes = classes[keep_idxs]
|
||||
|
||||
# --- Scale boxes back to original image coordinates ---
|
||||
for i in range(len(boxes)):
|
||||
x1, y1, x2, y2 = boxes[i]
|
||||
|
||||
# Remove padding offset
|
||||
x1 = (x1 - pad_x) / scale
|
||||
y1 = (y1 - pad_y) / scale
|
||||
x2 = (x2 - pad_x) / scale
|
||||
y2 = (y2 - pad_y) / scale
|
||||
|
||||
# Clamp to original dimensions
|
||||
x1 = max(0, min(x1, orig_w))
|
||||
y1 = max(0, min(y1, orig_h))
|
||||
x2 = max(0, min(x2, orig_w))
|
||||
y2 = max(0, min(y2, orig_h))
|
||||
|
||||
w = int(round(x2 - x1))
|
||||
h = int(round(y2 - y1))
|
||||
if w < 5 or h < 5:
|
||||
continue
|
||||
|
||||
cls_idx = int(classes[i])
|
||||
label = DOCLAYOUT_CLASSES[cls_idx] if 0 <= cls_idx < len(DOCLAYOUT_CLASSES) else f"class_{cls_idx}"
|
||||
|
||||
regions.append(LayoutRegion(
|
||||
x=int(round(x1)),
|
||||
y=int(round(y1)),
|
||||
width=w,
|
||||
height=h,
|
||||
label=label,
|
||||
confidence=round(float(scores[i]), 4),
|
||||
label_index=cls_idx,
|
||||
))
|
||||
|
||||
# Sort by confidence descending, limit
|
||||
regions.sort(key=lambda r: r.confidence, reverse=True)
|
||||
return regions[:max_regions]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main detection function
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def detect_layout_regions(
|
||||
img_bgr: np.ndarray,
|
||||
confidence_threshold: float = 0.5,
|
||||
max_regions: int = 50,
|
||||
) -> List[LayoutRegion]:
|
||||
"""Detect document layout regions using PP-DocLayout ONNX model.
|
||||
|
||||
Args:
|
||||
img_bgr: BGR color image (OpenCV format).
|
||||
confidence_threshold: Minimum confidence to keep a detection.
|
||||
max_regions: Maximum number of regions to return.
|
||||
|
||||
Returns:
|
||||
List of LayoutRegion sorted by confidence descending.
|
||||
Returns empty list if model is not available.
|
||||
"""
|
||||
session = _load_onnx_session()
|
||||
if session is None:
|
||||
return []
|
||||
|
||||
if img_bgr is None or img_bgr.size == 0:
|
||||
return []
|
||||
|
||||
orig_h, orig_w = img_bgr.shape[:2]
|
||||
|
||||
# Pre-process
|
||||
input_tensor, scale, pad_x, pad_y = preprocess_image(img_bgr)
|
||||
|
||||
# Run inference
|
||||
try:
|
||||
input_name = session.get_inputs()[0].name
|
||||
outputs = session.run(None, {input_name: input_tensor})
|
||||
except Exception as exc:
|
||||
logger.warning("PP-DocLayout inference failed: %s", exc)
|
||||
return []
|
||||
|
||||
# Post-process
|
||||
regions = _postprocess(
|
||||
outputs,
|
||||
scale=scale,
|
||||
pad_x=pad_x,
|
||||
pad_y=pad_y,
|
||||
orig_w=orig_w,
|
||||
orig_h=orig_h,
|
||||
confidence_threshold=confidence_threshold,
|
||||
max_regions=max_regions,
|
||||
)
|
||||
|
||||
if regions:
|
||||
label_counts: Dict[str, int] = {}
|
||||
for r in regions:
|
||||
label_counts[r.label] = label_counts.get(r.label, 0) + 1
|
||||
logger.info(
|
||||
"PP-DocLayout: %d regions (%s)",
|
||||
len(regions),
|
||||
", ".join(f"{k}: {v}" for k, v in sorted(label_counts.items())),
|
||||
)
|
||||
else:
|
||||
logger.debug("PP-DocLayout: no regions above threshold %.2f", confidence_threshold)
|
||||
|
||||
return regions
|
||||
422
klausur-service/backend/ocr/detect/graphic_detect.py
Normal file
422
klausur-service/backend/ocr/detect/graphic_detect.py
Normal file
@@ -0,0 +1,422 @@
|
||||
"""
|
||||
Graphical element detection for OCR pages.
|
||||
|
||||
Region-based approach:
|
||||
1. Build a color mask (saturation channel — black text is invisible).
|
||||
2. Dilate heavily to merge nearby colored pixels into regions.
|
||||
3. For each region, check overlap with OCR word boxes:
|
||||
- High word overlap → colored text (skip)
|
||||
- Low word overlap → colored graphic / image (keep)
|
||||
4. Separately detect large black-ink illustrations via ink mask.
|
||||
|
||||
Boxes and text colors are handled by cv_box_detect / cv_color_detect.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = ["detect_graphic_elements", "GraphicElement"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GraphicElement:
|
||||
"""A detected non-text graphical element."""
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
area: int
|
||||
shape: str # image, illustration
|
||||
color_name: str # dominant color or 'black'
|
||||
color_hex: str
|
||||
confidence: float
|
||||
contour: Any = field(default=None, repr=False)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Color helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_COLOR_HEX = {
|
||||
"black": "#000000",
|
||||
"gray": "#6b7280",
|
||||
"red": "#dc2626",
|
||||
"orange": "#ea580c",
|
||||
"yellow": "#ca8a04",
|
||||
"green": "#16a34a",
|
||||
"blue": "#2563eb",
|
||||
"purple": "#9333ea",
|
||||
}
|
||||
|
||||
|
||||
def _dominant_color(hsv_roi: np.ndarray, sat_threshold: int = 40) -> tuple:
|
||||
"""Return (color_name, color_hex) for an HSV region."""
|
||||
if hsv_roi.size == 0:
|
||||
return "black", _COLOR_HEX["black"]
|
||||
|
||||
pixels = hsv_roi.reshape(-1, 3)
|
||||
sat = pixels[:, 1]
|
||||
sat_mask = sat > sat_threshold
|
||||
sat_ratio = np.sum(sat_mask) / len(pixels) if len(pixels) > 0 else 0
|
||||
|
||||
if sat_ratio < 0.15:
|
||||
return "black", _COLOR_HEX["black"]
|
||||
|
||||
sat_pixels = pixels[sat_mask]
|
||||
if len(sat_pixels) < 3:
|
||||
return "black", _COLOR_HEX["black"]
|
||||
|
||||
med_hue = float(np.median(sat_pixels[:, 0]))
|
||||
|
||||
if med_hue < 10 or med_hue > 170:
|
||||
name = "red"
|
||||
elif med_hue < 25:
|
||||
name = "orange"
|
||||
elif med_hue < 35:
|
||||
name = "yellow"
|
||||
elif med_hue < 85:
|
||||
name = "green"
|
||||
elif med_hue < 130:
|
||||
name = "blue"
|
||||
else:
|
||||
name = "purple"
|
||||
|
||||
return name, _COLOR_HEX.get(name, _COLOR_HEX["black"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_graphic_elements(
|
||||
img_bgr: np.ndarray,
|
||||
word_boxes: List[Dict],
|
||||
detected_boxes: Optional[List[Dict]] = None,
|
||||
max_elements: int = 50,
|
||||
) -> List[GraphicElement]:
|
||||
"""Find non-text graphical regions on the page.
|
||||
|
||||
Region-based: dilate color mask to form regions, then check word
|
||||
overlap to distinguish colored text from colored graphics.
|
||||
|
||||
Args:
|
||||
img_bgr: BGR color image.
|
||||
word_boxes: List of OCR word dicts with left/top/width/height.
|
||||
detected_boxes: Optional list of detected box dicts (x/y/w/h).
|
||||
max_elements: Maximum number of elements to return.
|
||||
|
||||
Returns:
|
||||
List of GraphicElement, sorted by area descending.
|
||||
"""
|
||||
if img_bgr is None:
|
||||
return []
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Try PP-DocLayout ONNX first if available
|
||||
# ------------------------------------------------------------------
|
||||
import os
|
||||
backend = os.environ.get("GRAPHIC_DETECT_BACKEND", "auto")
|
||||
if backend in ("doclayout", "auto"):
|
||||
try:
|
||||
from cv_doclayout_detect import detect_layout_regions, is_doclayout_available
|
||||
if is_doclayout_available():
|
||||
regions = detect_layout_regions(img_bgr)
|
||||
if regions:
|
||||
_LABEL_TO_COLOR = {
|
||||
"figure": ("image", "green", _COLOR_HEX.get("green", "#16a34a")),
|
||||
"table": ("image", "blue", _COLOR_HEX.get("blue", "#2563eb")),
|
||||
}
|
||||
converted: List[GraphicElement] = []
|
||||
for r in regions:
|
||||
shape, color_name, color_hex = _LABEL_TO_COLOR.get(
|
||||
r.label,
|
||||
(r.label, "gray", _COLOR_HEX.get("gray", "#6b7280")),
|
||||
)
|
||||
converted.append(GraphicElement(
|
||||
x=r.x,
|
||||
y=r.y,
|
||||
width=r.width,
|
||||
height=r.height,
|
||||
area=r.width * r.height,
|
||||
shape=shape,
|
||||
color_name=color_name,
|
||||
color_hex=color_hex,
|
||||
confidence=r.confidence,
|
||||
contour=None,
|
||||
))
|
||||
converted.sort(key=lambda g: g.area, reverse=True)
|
||||
result = converted[:max_elements]
|
||||
if result:
|
||||
shape_counts: Dict[str, int] = {}
|
||||
for g in result:
|
||||
shape_counts[g.shape] = shape_counts.get(g.shape, 0) + 1
|
||||
logger.info(
|
||||
"GraphicDetect (PP-DocLayout): %d elements (%s)",
|
||||
len(result),
|
||||
", ".join(f"{s}: {c}" for s, c in sorted(shape_counts.items())),
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning("PP-DocLayout failed, falling back to OpenCV: %s", e)
|
||||
# ------------------------------------------------------------------
|
||||
# OpenCV fallback (original logic)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
h, w = img_bgr.shape[:2]
|
||||
|
||||
logger.debug("GraphicDetect: image %dx%d, %d word_boxes, %d detected_boxes",
|
||||
w, h, len(word_boxes), len(detected_boxes or []))
|
||||
|
||||
hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
|
||||
candidates: List[GraphicElement] = []
|
||||
|
||||
# --- Build word mask (for overlap checking) ---
|
||||
word_mask = np.zeros((h, w), dtype=np.uint8)
|
||||
for wb in word_boxes:
|
||||
x1 = max(0, int(wb.get("left", 0)))
|
||||
y1 = max(0, int(wb.get("top", 0)))
|
||||
x2 = min(w, int(wb.get("left", 0) + wb.get("width", 0)))
|
||||
y2 = min(h, int(wb.get("top", 0) + wb.get("height", 0)))
|
||||
word_mask[y1:y2, x1:x2] = 255
|
||||
|
||||
# =====================================================================
|
||||
# PASS 1 — COLORED IMAGE REGIONS
|
||||
# =====================================================================
|
||||
# Color mask: saturated pixels (black text has sat ≈ 0 → invisible)
|
||||
sat_mask = (hsv[:, :, 1] > 40).astype(np.uint8) * 255
|
||||
val_mask = (hsv[:, :, 2] < 240).astype(np.uint8) * 255
|
||||
color_pixels = cv2.bitwise_and(sat_mask, val_mask)
|
||||
|
||||
# Remove tiny speckle
|
||||
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
|
||||
color_pixels = cv2.morphologyEx(color_pixels, cv2.MORPH_OPEN, kernel_open)
|
||||
|
||||
# Count raw colored pixels before dilation (for density check later)
|
||||
color_pixel_raw = color_pixels.copy()
|
||||
|
||||
# Heavy dilation to merge nearby colored elements into regions.
|
||||
# A 25x25 kernel merges elements within ~12px of each other.
|
||||
kernel_dilate = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (25, 25))
|
||||
region_mask = cv2.dilate(color_pixels, kernel_dilate, iterations=1)
|
||||
|
||||
contours_regions, _ = cv2.findContours(
|
||||
region_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE,
|
||||
)
|
||||
logger.debug("GraphicDetect PASS1: %d color regions after dilation", len(contours_regions))
|
||||
|
||||
for cnt in contours_regions:
|
||||
bx, by, bw, bh = cv2.boundingRect(cnt)
|
||||
|
||||
# Skip tiny regions
|
||||
if bw < 15 or bh < 15:
|
||||
continue
|
||||
|
||||
# Skip page-spanning regions
|
||||
if bw > w * 0.6 or bh > h * 0.6:
|
||||
logger.debug("GraphicDetect PASS1 skip page-spanning (%d,%d) %dx%d", bx, by, bw, bh)
|
||||
continue
|
||||
|
||||
bbox_area = bw * bh
|
||||
|
||||
# Check: how much of this region's bounding box overlaps with words?
|
||||
roi_words = word_mask[by:by + bh, bx:bx + bw]
|
||||
word_pixel_count = int(np.sum(roi_words > 0))
|
||||
word_overlap = word_pixel_count / bbox_area if bbox_area > 0 else 0
|
||||
|
||||
# Check: how many OCR word centroids fall inside this region?
|
||||
# Colored text that OCR detected will have multiple centroids inside.
|
||||
# Actual images may have 0-1 spurious OCR artifacts.
|
||||
word_centroid_count = sum(
|
||||
1 for wb in word_boxes
|
||||
if (bx <= int(wb.get("left", 0) + wb.get("width", 0) / 2) <= bx + bw
|
||||
and by <= int(wb.get("top", 0) + wb.get("height", 0) / 2) <= by + bh)
|
||||
)
|
||||
|
||||
# Check: how many actual colored pixels are in this region?
|
||||
roi_color = color_pixel_raw[by:by + bh, bx:bx + bw]
|
||||
color_pixel_count = int(np.sum(roi_color > 0))
|
||||
|
||||
# Color pixel density (before any skip checks so we can log it)
|
||||
density = color_pixel_count / bbox_area if bbox_area > 0 else 0
|
||||
|
||||
# --- Skip heuristics for colored TEXT (not images) ---
|
||||
|
||||
# (a) High word-box pixel overlap → clearly text
|
||||
if word_overlap > 0.40:
|
||||
logger.info(
|
||||
"GraphicDetect PASS1 skip text-overlap (%d,%d) %dx%d "
|
||||
"overlap=%.0f%% centroids=%d",
|
||||
bx, by, bw, bh, word_overlap * 100, word_centroid_count,
|
||||
)
|
||||
continue
|
||||
|
||||
# (b) Multiple OCR words detected inside → colored text
|
||||
# (images rarely produce 2+ confident word detections)
|
||||
if word_centroid_count >= 2:
|
||||
logger.info(
|
||||
"GraphicDetect PASS1 skip multi-word (%d,%d) %dx%d "
|
||||
"centroids=%d overlap=%.0f%% density=%.0f%%",
|
||||
bx, by, bw, bh, word_centroid_count,
|
||||
word_overlap * 100, density * 100,
|
||||
)
|
||||
continue
|
||||
|
||||
# (c) Even 1 word + some pixel overlap → likely text
|
||||
if word_centroid_count >= 1 and word_overlap > 0.10:
|
||||
logger.info(
|
||||
"GraphicDetect PASS1 skip word+overlap (%d,%d) %dx%d "
|
||||
"centroids=%d overlap=%.0f%%",
|
||||
bx, by, bw, bh, word_centroid_count, word_overlap * 100,
|
||||
)
|
||||
continue
|
||||
|
||||
# Need a minimum number of colored pixels (not just dilated area)
|
||||
if color_pixel_count < 200:
|
||||
continue
|
||||
|
||||
# (d) Very low density → thin strokes, almost certainly text.
|
||||
# Large regions (photos/illustrations) can have low color density
|
||||
# because most pixels are grayscale ink. Use a lower threshold
|
||||
# for regions bigger than 100×80 px.
|
||||
_min_density = 0.05 if (bw > 100 and bh > 80) else 0.20
|
||||
if density < _min_density:
|
||||
logger.info(
|
||||
"GraphicDetect PASS1 skip low-density (%d,%d) %dx%d "
|
||||
"density=%.0f%% (min=%.0f%%, likely colored text)",
|
||||
bx, by, bw, bh, density * 100, _min_density * 100,
|
||||
)
|
||||
continue
|
||||
|
||||
# (e) Moderate density + small height → colored text line
|
||||
if density < 0.35 and bh < h * 0.05:
|
||||
logger.info(
|
||||
"GraphicDetect PASS1 skip text-height (%d,%d) %dx%d "
|
||||
"density=%.0f%% height=%.1f%%",
|
||||
bx, by, bw, bh, density * 100, 100.0 * bh / h,
|
||||
)
|
||||
continue
|
||||
|
||||
# Determine dominant color from the actual colored pixels
|
||||
roi_hsv = hsv[by:by + bh, bx:bx + bw]
|
||||
color_px_mask = roi_color > 0
|
||||
if np.sum(color_px_mask) > 0:
|
||||
masked_hsv = roi_hsv[color_px_mask]
|
||||
color_name, color_hex = _dominant_color(masked_hsv)
|
||||
else:
|
||||
color_name, color_hex = "black", _COLOR_HEX["black"]
|
||||
|
||||
# Confidence based on color density and low word overlap
|
||||
conf = min(0.95, 0.5 + density * 0.5)
|
||||
|
||||
logger.debug("GraphicDetect PASS1 accept (%d,%d) %dx%d px=%d density=%.0f%% overlap=%.0f%% %s",
|
||||
bx, by, bw, bh, color_pixel_count, density * 100, word_overlap * 100, color_name)
|
||||
candidates.append(GraphicElement(
|
||||
x=bx, y=by, width=bw, height=bh,
|
||||
area=color_pixel_count,
|
||||
shape="image",
|
||||
color_name=color_name, color_hex=color_hex,
|
||||
confidence=round(conf, 2), contour=cnt,
|
||||
))
|
||||
|
||||
# =====================================================================
|
||||
# PASS 2 — LARGE BLACK-INK ILLUSTRATIONS
|
||||
# =====================================================================
|
||||
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
|
||||
_, dark_mask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||||
|
||||
# Exclude words and colored regions already found
|
||||
exclusion = np.zeros((h, w), dtype=np.uint8)
|
||||
word_pad = 5
|
||||
for wb in word_boxes:
|
||||
x1 = max(0, int(wb.get("left", 0)) - word_pad)
|
||||
y1 = max(0, int(wb.get("top", 0)) - word_pad)
|
||||
x2 = min(w, int(wb.get("left", 0) + wb.get("width", 0)) + word_pad)
|
||||
y2 = min(h, int(wb.get("top", 0) + wb.get("height", 0)) + word_pad)
|
||||
exclusion[y1:y2, x1:x2] = 255
|
||||
|
||||
if detected_boxes:
|
||||
for box in detected_boxes:
|
||||
bbx = int(box.get("x", 0))
|
||||
bby = int(box.get("y", 0))
|
||||
bbw = int(box.get("w", box.get("width", 0)))
|
||||
bbh = int(box.get("h", box.get("height", 0)))
|
||||
inset = 8
|
||||
x1 = max(0, bbx + inset)
|
||||
y1 = max(0, bby + inset)
|
||||
x2 = min(w, bbx + bbw - inset)
|
||||
y2 = min(h, bby + bbh - inset)
|
||||
if x2 > x1 and y2 > y1:
|
||||
exclusion[y1:y2, x1:x2] = 255
|
||||
|
||||
ink_only = cv2.bitwise_and(dark_mask, cv2.bitwise_not(exclusion))
|
||||
ink_only = cv2.bitwise_and(ink_only, cv2.bitwise_not(color_pixels))
|
||||
|
||||
contours_ink, _ = cv2.findContours(
|
||||
ink_only, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE,
|
||||
)
|
||||
logger.debug("GraphicDetect PASS2 ink: %d contours", len(contours_ink))
|
||||
|
||||
for cnt in contours_ink:
|
||||
area = cv2.contourArea(cnt)
|
||||
bx, by, bw, bh = cv2.boundingRect(cnt)
|
||||
|
||||
if area < 5000 or min(bw, bh) < 40:
|
||||
continue
|
||||
if bw > w * 0.8 or bh > h * 0.8:
|
||||
continue
|
||||
|
||||
logger.debug("GraphicDetect PASS2 accept (%d,%d) %dx%d area=%d",
|
||||
bx, by, bw, bh, int(area))
|
||||
candidates.append(GraphicElement(
|
||||
x=bx, y=by, width=bw, height=bh,
|
||||
area=int(area), shape="illustration",
|
||||
color_name="black", color_hex="#000000",
|
||||
confidence=0.5, contour=cnt,
|
||||
))
|
||||
|
||||
# =====================================================================
|
||||
# Deduplicate and return
|
||||
# =====================================================================
|
||||
candidates.sort(key=lambda g: g.area, reverse=True)
|
||||
|
||||
final: List[GraphicElement] = []
|
||||
for c in candidates:
|
||||
overlap = False
|
||||
for f in final:
|
||||
ix1 = max(c.x, f.x)
|
||||
iy1 = max(c.y, f.y)
|
||||
ix2 = min(c.x + c.width, f.x + f.width)
|
||||
iy2 = min(c.y + c.height, f.y + f.height)
|
||||
if ix2 > ix1 and iy2 > iy1:
|
||||
inter = (ix2 - ix1) * (iy2 - iy1)
|
||||
smaller = min(c.width * c.height, f.width * f.height)
|
||||
if smaller > 0 and inter / smaller > 0.5:
|
||||
overlap = True
|
||||
break
|
||||
if not overlap:
|
||||
final.append(c)
|
||||
|
||||
result = final[:max_elements]
|
||||
|
||||
if result:
|
||||
shape_counts: Dict[str, int] = {}
|
||||
for g in result:
|
||||
shape_counts[g.shape] = shape_counts.get(g.shape, 0) + 1
|
||||
logger.info(
|
||||
"GraphicDetect: %d elements found (%s)",
|
||||
len(result),
|
||||
", ".join(f"{s}: {c}" for s, c in sorted(shape_counts.items())),
|
||||
)
|
||||
else:
|
||||
logger.info("GraphicDetect: no graphic elements found")
|
||||
|
||||
return result
|
||||
231
klausur-service/backend/ocr/detect/syllable/core.py
Normal file
231
klausur-service/backend/ocr/detect/syllable/core.py
Normal file
@@ -0,0 +1,231 @@
|
||||
"""
|
||||
Syllable Core — hyphenator init, word validation, pipe autocorrect.
|
||||
|
||||
Extracted from cv_syllable_detect.py for modularity.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# IPA/phonetic characters -- skip cells containing these
|
||||
_IPA_RE = re.compile(r'[\[\]\u02c8\u02cc\u02d0\u0283\u0292\u03b8\u00f0\u014b\u0251\u0252\u00e6\u0254\u0259\u025b\u025c\u026a\u028a\u028c]')
|
||||
|
||||
# Common German words that should NOT be merged with adjacent tokens.
|
||||
_STOP_WORDS = frozenset([
|
||||
# Articles
|
||||
'der', 'die', 'das', 'dem', 'den', 'des',
|
||||
'ein', 'eine', 'einem', 'einen', 'einer',
|
||||
# Pronouns
|
||||
'du', 'er', 'es', 'sie', 'wir', 'ihr', 'ich', 'man', 'sich',
|
||||
'dich', 'dir', 'mich', 'mir', 'uns', 'euch', 'ihm', 'ihn',
|
||||
# Prepositions
|
||||
'mit', 'von', 'zu', 'f\u00fcr', 'auf', 'in', 'an', 'um', 'am', 'im',
|
||||
'aus', 'bei', 'nach', 'vor', 'bis', 'durch', '\u00fcber', 'unter',
|
||||
'zwischen', 'ohne', 'gegen',
|
||||
# Conjunctions
|
||||
'und', 'oder', 'als', 'wie', 'wenn', 'dass', 'weil', 'aber',
|
||||
# Adverbs
|
||||
'auch', 'noch', 'nur', 'schon', 'sehr', 'nicht',
|
||||
# Verbs
|
||||
'ist', 'hat', 'wird', 'kann', 'soll', 'muss', 'darf',
|
||||
'sein', 'haben',
|
||||
# Other
|
||||
'kein', 'keine', 'keinem', 'keinen', 'keiner',
|
||||
])
|
||||
|
||||
# Cached hyphenators
|
||||
_hyph_de = None
|
||||
_hyph_en = None
|
||||
|
||||
# Cached spellchecker (for autocorrect_pipe_artifacts)
|
||||
_spell_de = None
|
||||
|
||||
|
||||
def _get_hyphenators():
|
||||
"""Lazy-load pyphen hyphenators (cached across calls)."""
|
||||
global _hyph_de, _hyph_en
|
||||
if _hyph_de is not None:
|
||||
return _hyph_de, _hyph_en
|
||||
try:
|
||||
import pyphen
|
||||
except ImportError:
|
||||
return None, None
|
||||
_hyph_de = pyphen.Pyphen(lang='de_DE')
|
||||
_hyph_en = pyphen.Pyphen(lang='en_US')
|
||||
return _hyph_de, _hyph_en
|
||||
|
||||
|
||||
def _get_spellchecker():
|
||||
"""Lazy-load German spellchecker (cached across calls)."""
|
||||
global _spell_de
|
||||
if _spell_de is not None:
|
||||
return _spell_de
|
||||
try:
|
||||
from spellchecker import SpellChecker
|
||||
except ImportError:
|
||||
return None
|
||||
_spell_de = SpellChecker(language='de')
|
||||
return _spell_de
|
||||
|
||||
|
||||
def _is_known_word(word: str, hyph_de, hyph_en) -> bool:
|
||||
"""Check whether pyphen recognises a word (DE or EN)."""
|
||||
if len(word) < 2:
|
||||
return False
|
||||
return ('|' in hyph_de.inserted(word, hyphen='|')
|
||||
or '|' in hyph_en.inserted(word, hyphen='|'))
|
||||
|
||||
|
||||
def _is_real_word(word: str) -> bool:
|
||||
"""Check whether spellchecker knows this word (case-insensitive)."""
|
||||
spell = _get_spellchecker()
|
||||
if spell is None:
|
||||
return False
|
||||
return word.lower() in spell
|
||||
|
||||
|
||||
def _hyphenate_word(word: str, hyph_de, hyph_en) -> Optional[str]:
|
||||
"""Try to hyphenate a word using DE then EN dictionary.
|
||||
|
||||
Returns word with | separators, or None if not recognized.
|
||||
"""
|
||||
hyph = hyph_de.inserted(word, hyphen='|')
|
||||
if '|' in hyph:
|
||||
return hyph
|
||||
hyph = hyph_en.inserted(word, hyphen='|')
|
||||
if '|' in hyph:
|
||||
return hyph
|
||||
return None
|
||||
|
||||
|
||||
def _autocorrect_piped_word(word_with_pipes: str) -> Optional[str]:
|
||||
"""Try to correct a word that has OCR pipe artifacts.
|
||||
|
||||
Printed syllable divider lines on dictionary pages confuse OCR:
|
||||
the vertical stroke is often read as an extra character (commonly
|
||||
``l``, ``I``, ``1``, ``i``) adjacent to where the pipe appears.
|
||||
|
||||
Uses ``spellchecker`` (frequency-based word list) for validation.
|
||||
|
||||
Strategy:
|
||||
1. Strip ``|`` -- if spellchecker knows the result, done.
|
||||
2. Try deleting each pipe-like character (l, I, 1, i, t).
|
||||
3. Fall back to spellchecker's own ``correction()`` method.
|
||||
4. Preserve the original casing of the first letter.
|
||||
"""
|
||||
stripped = word_with_pipes.replace('|', '')
|
||||
if not stripped or len(stripped) < 3:
|
||||
return stripped # too short to validate
|
||||
|
||||
# Step 1: if the stripped word is already a real word, done
|
||||
if _is_real_word(stripped):
|
||||
return stripped
|
||||
|
||||
# Step 2: try deleting pipe-like characters (most likely artifacts)
|
||||
_PIPE_LIKE = frozenset('lI1it')
|
||||
for idx in range(len(stripped)):
|
||||
if stripped[idx] not in _PIPE_LIKE:
|
||||
continue
|
||||
candidate = stripped[:idx] + stripped[idx + 1:]
|
||||
if len(candidate) >= 3 and _is_real_word(candidate):
|
||||
return candidate
|
||||
|
||||
# Step 3: use spellchecker's built-in correction
|
||||
spell = _get_spellchecker()
|
||||
if spell is not None:
|
||||
suggestion = spell.correction(stripped.lower())
|
||||
if suggestion and suggestion != stripped.lower():
|
||||
# Preserve original first-letter case
|
||||
if stripped[0].isupper():
|
||||
suggestion = suggestion[0].upper() + suggestion[1:]
|
||||
return suggestion
|
||||
|
||||
return None # could not fix
|
||||
|
||||
|
||||
def autocorrect_pipe_artifacts(
|
||||
zones_data: List[Dict], session_id: str,
|
||||
) -> int:
|
||||
"""Strip OCR pipe artifacts and correct garbled words in-place.
|
||||
|
||||
Printed syllable divider lines on dictionary scans are read by OCR
|
||||
as ``|`` characters embedded in words (e.g. ``Zel|le``, ``Ze|plpe|lin``).
|
||||
This function:
|
||||
|
||||
1. Strips ``|`` from every word in content cells.
|
||||
2. Validates with spellchecker (real dictionary lookup).
|
||||
3. If not recognised, tries deleting pipe-like characters or uses
|
||||
spellchecker's correction (e.g. ``Zeplpelin`` -> ``Zeppelin``).
|
||||
4. Updates both word-box texts and cell text.
|
||||
|
||||
Returns the number of cells modified.
|
||||
"""
|
||||
spell = _get_spellchecker()
|
||||
if spell is None:
|
||||
logger.warning("spellchecker not available -- pipe autocorrect limited")
|
||||
# Fall back: still strip pipes even without spellchecker
|
||||
pass
|
||||
|
||||
modified = 0
|
||||
for z in zones_data:
|
||||
for cell in z.get("cells", []):
|
||||
ct = cell.get("col_type", "")
|
||||
if not ct.startswith("column_"):
|
||||
continue
|
||||
|
||||
cell_changed = False
|
||||
|
||||
# --- Fix word boxes ---
|
||||
for wb in cell.get("word_boxes", []):
|
||||
wb_text = wb.get("text", "")
|
||||
if "|" not in wb_text:
|
||||
continue
|
||||
|
||||
# Separate trailing punctuation
|
||||
m = re.match(
|
||||
r'^([^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]*)'
|
||||
r'(.*?)'
|
||||
r'([^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]*)$',
|
||||
wb_text,
|
||||
)
|
||||
if not m:
|
||||
continue
|
||||
lead, core, trail = m.group(1), m.group(2), m.group(3)
|
||||
if "|" not in core:
|
||||
continue
|
||||
|
||||
corrected = _autocorrect_piped_word(core)
|
||||
if corrected is not None and corrected != core:
|
||||
wb["text"] = lead + corrected + trail
|
||||
cell_changed = True
|
||||
|
||||
# --- Rebuild cell text from word boxes ---
|
||||
if cell_changed:
|
||||
wbs = cell.get("word_boxes", [])
|
||||
if wbs:
|
||||
cell["text"] = " ".join(
|
||||
(wb.get("text") or "") for wb in wbs
|
||||
)
|
||||
modified += 1
|
||||
|
||||
# --- Fallback: strip residual | from cell text ---
|
||||
text = cell.get("text", "")
|
||||
if "|" in text:
|
||||
clean = text.replace("|", "")
|
||||
if clean != text:
|
||||
cell["text"] = clean
|
||||
if not cell_changed:
|
||||
modified += 1
|
||||
|
||||
if modified:
|
||||
logger.info(
|
||||
"build-grid session %s: autocorrected pipe artifacts in %d cells",
|
||||
session_id, modified,
|
||||
)
|
||||
return modified
|
||||
32
klausur-service/backend/ocr/detect/syllable/detect.py
Normal file
32
klausur-service/backend/ocr/detect/syllable/detect.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""
|
||||
Syllable divider insertion for dictionary pages — barrel re-export.
|
||||
|
||||
All implementation split into:
|
||||
cv_syllable_core — hyphenator init, word validation, pipe autocorrect
|
||||
cv_syllable_merge — word gap merging, syllabification, divider insertion
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
# Core: init, validation, autocorrect
|
||||
from cv_syllable_core import ( # noqa: F401
|
||||
_IPA_RE,
|
||||
_STOP_WORDS,
|
||||
_get_hyphenators,
|
||||
_get_spellchecker,
|
||||
_is_known_word,
|
||||
_is_real_word,
|
||||
_hyphenate_word,
|
||||
_autocorrect_piped_word,
|
||||
autocorrect_pipe_artifacts,
|
||||
)
|
||||
|
||||
# Merge: gap merging, syllabify, insert
|
||||
from cv_syllable_merge import ( # noqa: F401
|
||||
_try_merge_pipe_gaps,
|
||||
merge_word_gaps_in_zones,
|
||||
_try_merge_word_gaps,
|
||||
_syllabify_text,
|
||||
insert_syllable_dividers,
|
||||
)
|
||||
300
klausur-service/backend/ocr/detect/syllable/merge.py
Normal file
300
klausur-service/backend/ocr/detect/syllable/merge.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""
|
||||
Syllable Merge — word gap merging, syllabification, divider insertion.
|
||||
|
||||
Extracted from cv_syllable_detect.py for modularity.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
from cv_syllable_core import (
|
||||
_get_hyphenators,
|
||||
_hyphenate_word,
|
||||
_IPA_RE,
|
||||
_STOP_WORDS,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _try_merge_pipe_gaps(text: str, hyph_de) -> str:
|
||||
"""Merge fragments separated by single spaces where OCR split at a pipe.
|
||||
|
||||
Example: "Kaf fee" -> "Kaffee" (pyphen recognizes the merged word).
|
||||
Multi-step: "Ka bel jau" -> "Kabel jau" -> "Kabeljau".
|
||||
|
||||
Guards against false merges:
|
||||
- The FIRST token must be pure alpha (word start -- no attached punctuation)
|
||||
- The second token may have trailing punctuation (comma, period) which
|
||||
stays attached to the merged word: "Ka" + "fer," -> "Kafer,"
|
||||
- Common German function words (der, die, das, ...) are never merged
|
||||
- At least one fragment must be very short (<=3 alpha chars)
|
||||
"""
|
||||
parts = text.split(' ')
|
||||
if len(parts) < 2:
|
||||
return text
|
||||
|
||||
result = [parts[0]]
|
||||
i = 1
|
||||
while i < len(parts):
|
||||
prev = result[-1]
|
||||
curr = parts[i]
|
||||
|
||||
# Extract alpha-only core for lookup
|
||||
prev_alpha = re.sub(r'[^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]', '', prev)
|
||||
curr_alpha = re.sub(r'[^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]', '', curr)
|
||||
|
||||
# Guard 1: first token must be pure alpha (word-start fragment)
|
||||
# second token may have trailing punctuation
|
||||
# Guard 2: neither alpha core can be a common German function word
|
||||
# Guard 3: the shorter fragment must be <= 3 chars (pipe-gap signal)
|
||||
# Guard 4: combined length must be >= 4
|
||||
should_try = (
|
||||
prev == prev_alpha # first token: pure alpha (word start)
|
||||
and prev_alpha and curr_alpha
|
||||
and prev_alpha.lower() not in _STOP_WORDS
|
||||
and curr_alpha.lower() not in _STOP_WORDS
|
||||
and min(len(prev_alpha), len(curr_alpha)) <= 3
|
||||
and len(prev_alpha) + len(curr_alpha) >= 4
|
||||
)
|
||||
|
||||
if should_try:
|
||||
merged_alpha = prev_alpha + curr_alpha
|
||||
hyph = hyph_de.inserted(merged_alpha, hyphen='-')
|
||||
if '-' in hyph:
|
||||
# pyphen recognizes merged word -- collapse the space
|
||||
result[-1] = prev + curr
|
||||
i += 1
|
||||
continue
|
||||
|
||||
result.append(curr)
|
||||
i += 1
|
||||
|
||||
return ' '.join(result)
|
||||
|
||||
|
||||
def merge_word_gaps_in_zones(zones_data: List[Dict], session_id: str) -> int:
|
||||
"""Merge OCR word-gap fragments in cell texts using pyphen validation.
|
||||
|
||||
OCR often splits words at syllable boundaries into separate word_boxes,
|
||||
producing text like "zerknit tert" instead of "zerknittert". This
|
||||
function tries to merge adjacent fragments in every content cell.
|
||||
|
||||
More permissive than ``_try_merge_pipe_gaps`` (threshold 5 instead of 3)
|
||||
but still guarded by pyphen dictionary lookup and stop-word exclusion.
|
||||
|
||||
Returns the number of cells modified.
|
||||
"""
|
||||
hyph_de, _ = _get_hyphenators()
|
||||
if hyph_de is None:
|
||||
return 0
|
||||
|
||||
modified = 0
|
||||
for z in zones_data:
|
||||
for cell in z.get("cells", []):
|
||||
ct = cell.get("col_type", "")
|
||||
if not ct.startswith("column_"):
|
||||
continue
|
||||
text = cell.get("text", "")
|
||||
if not text or " " not in text:
|
||||
continue
|
||||
|
||||
# Skip IPA cells
|
||||
text_no_brackets = re.sub(r'\[[^\]]*\]', '', text)
|
||||
if _IPA_RE.search(text_no_brackets):
|
||||
continue
|
||||
|
||||
new_text = _try_merge_word_gaps(text, hyph_de)
|
||||
if new_text != text:
|
||||
cell["text"] = new_text
|
||||
modified += 1
|
||||
|
||||
if modified:
|
||||
logger.info(
|
||||
"build-grid session %s: merged word gaps in %d cells",
|
||||
session_id, modified,
|
||||
)
|
||||
return modified
|
||||
|
||||
|
||||
def _try_merge_word_gaps(text: str, hyph_de) -> str:
|
||||
"""Merge OCR word fragments with relaxed threshold (max_short=5).
|
||||
|
||||
Similar to ``_try_merge_pipe_gaps`` but allows slightly longer fragments
|
||||
(max_short=5 instead of 3). Still requires pyphen to recognize the
|
||||
merged word.
|
||||
"""
|
||||
parts = text.split(' ')
|
||||
if len(parts) < 2:
|
||||
return text
|
||||
|
||||
result = [parts[0]]
|
||||
i = 1
|
||||
while i < len(parts):
|
||||
prev = result[-1]
|
||||
curr = parts[i]
|
||||
|
||||
prev_alpha = re.sub(r'[^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]', '', prev)
|
||||
curr_alpha = re.sub(r'[^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]', '', curr)
|
||||
|
||||
should_try = (
|
||||
prev == prev_alpha
|
||||
and prev_alpha and curr_alpha
|
||||
and prev_alpha.lower() not in _STOP_WORDS
|
||||
and curr_alpha.lower() not in _STOP_WORDS
|
||||
and min(len(prev_alpha), len(curr_alpha)) <= 5
|
||||
and len(prev_alpha) + len(curr_alpha) >= 4
|
||||
)
|
||||
|
||||
if should_try:
|
||||
merged_alpha = prev_alpha + curr_alpha
|
||||
hyph = hyph_de.inserted(merged_alpha, hyphen='-')
|
||||
if '-' in hyph:
|
||||
result[-1] = prev + curr
|
||||
i += 1
|
||||
continue
|
||||
|
||||
result.append(curr)
|
||||
i += 1
|
||||
|
||||
return ' '.join(result)
|
||||
|
||||
|
||||
def _syllabify_text(text: str, hyph_de, hyph_en) -> str:
|
||||
"""Syllabify all significant words in a text string.
|
||||
|
||||
1. Strip existing | dividers
|
||||
2. Merge pipe-gap spaces where possible
|
||||
3. Apply pyphen to each word >= 3 alphabetic chars
|
||||
4. Words pyphen doesn't recognize stay as-is (no bad guesses)
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
|
||||
# Skip cells that contain IPA transcription characters outside brackets.
|
||||
text_no_brackets = re.sub(r'\[[^\]]*\]', '', text)
|
||||
if _IPA_RE.search(text_no_brackets):
|
||||
return text
|
||||
|
||||
# Phase 1: strip existing pipe dividers for clean normalization
|
||||
clean = text.replace('|', '')
|
||||
|
||||
# Phase 2: merge pipe-gap spaces (OCR fragments from pipe splitting)
|
||||
clean = _try_merge_pipe_gaps(clean, hyph_de)
|
||||
|
||||
# Phase 3: tokenize and syllabify each word
|
||||
# Split on whitespace and comma/semicolon sequences, keeping separators
|
||||
tokens = re.split(r'(\s+|[,;:]+\s*)', clean)
|
||||
|
||||
result = []
|
||||
for tok in tokens:
|
||||
if not tok or re.match(r'^[\s,;:]+$', tok):
|
||||
result.append(tok)
|
||||
continue
|
||||
|
||||
# Strip trailing/leading punctuation for pyphen lookup
|
||||
m = re.match(r'^([^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]*)(.*?)([^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]*)$', tok)
|
||||
if not m:
|
||||
result.append(tok)
|
||||
continue
|
||||
lead, word, trail = m.group(1), m.group(2), m.group(3)
|
||||
|
||||
if len(word) < 3 or not re.search(r'[a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df]', word):
|
||||
result.append(tok)
|
||||
continue
|
||||
|
||||
hyph = _hyphenate_word(word, hyph_de, hyph_en)
|
||||
if hyph:
|
||||
result.append(lead + hyph + trail)
|
||||
else:
|
||||
result.append(tok)
|
||||
|
||||
return ''.join(result)
|
||||
|
||||
|
||||
def insert_syllable_dividers(
|
||||
zones_data: List[Dict],
|
||||
img_bgr: np.ndarray,
|
||||
session_id: str,
|
||||
*,
|
||||
force: bool = False,
|
||||
col_filter: Optional[set] = None,
|
||||
) -> int:
|
||||
"""Insert pipe syllable dividers into dictionary cells.
|
||||
|
||||
For dictionary pages: process all content column cells, strip existing
|
||||
pipes, merge pipe-gap spaces, and re-syllabify using pyphen.
|
||||
|
||||
Pre-check: at least 1% of content cells must already contain ``|`` from
|
||||
OCR. This guards against pages with zero pipe characters.
|
||||
|
||||
Args:
|
||||
force: If True, skip the pipe-ratio pre-check and syllabify all
|
||||
content words regardless of whether the original has pipe dividers.
|
||||
col_filter: If set, only process cells whose col_type is in this set.
|
||||
None means process all content columns.
|
||||
|
||||
Returns the number of cells modified.
|
||||
"""
|
||||
hyph_de, hyph_en = _get_hyphenators()
|
||||
if hyph_de is None:
|
||||
logger.warning("pyphen not installed -- skipping syllable insertion")
|
||||
return 0
|
||||
|
||||
# Pre-check: count cells that already have | from OCR.
|
||||
if not force:
|
||||
total_col_cells = 0
|
||||
cells_with_pipes = 0
|
||||
for z in zones_data:
|
||||
for cell in z.get("cells", []):
|
||||
if cell.get("col_type", "").startswith("column_"):
|
||||
total_col_cells += 1
|
||||
if "|" in cell.get("text", ""):
|
||||
cells_with_pipes += 1
|
||||
|
||||
if total_col_cells > 0:
|
||||
pipe_ratio = cells_with_pipes / total_col_cells
|
||||
if pipe_ratio < 0.01:
|
||||
logger.info(
|
||||
"build-grid session %s: skipping syllable insertion -- "
|
||||
"only %.1f%% of cells have existing pipes (need >=1%%)",
|
||||
session_id, pipe_ratio * 100,
|
||||
)
|
||||
return 0
|
||||
|
||||
insertions = 0
|
||||
for z in zones_data:
|
||||
for cell in z.get("cells", []):
|
||||
ct = cell.get("col_type", "")
|
||||
if not ct.startswith("column_"):
|
||||
continue
|
||||
if col_filter is not None and ct not in col_filter:
|
||||
continue
|
||||
text = cell.get("text", "")
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# In auto mode (force=False), only normalize cells that already
|
||||
# have | from OCR (i.e. printed syllable dividers on the original
|
||||
# scan). Don't add new syllable marks to other words.
|
||||
if not force and "|" not in text:
|
||||
continue
|
||||
|
||||
new_text = _syllabify_text(text, hyph_de, hyph_en)
|
||||
if new_text != text:
|
||||
cell["text"] = new_text
|
||||
insertions += 1
|
||||
|
||||
if insertions:
|
||||
logger.info(
|
||||
"build-grid session %s: syllable dividers inserted/normalized "
|
||||
"in %d cells (pyphen)",
|
||||
session_id, insertions,
|
||||
)
|
||||
return insertions
|
||||
Reference in New Issue
Block a user