feat: generische Box-Erkennung fuer zonenbasierte Spaltenerkennung
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 29s
CI / test-go-edu-search (push) Successful in 30s
CI / test-python-klausur (push) Failing after 1m59s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 19s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 29s
CI / test-go-edu-search (push) Successful in 30s
CI / test-python-klausur (push) Failing after 1m59s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 19s
- Neue Datei cv_box_detect.py: 2-Stufen-Algorithmus (Linien + Farbe) - DetectedBox/PageZone Dataclasses in cv_vocab_types.py - detect_column_geometry_zoned() in cv_layout.py - API-Endpoints erweitert: zones/boxes_detected im column_result - Overlay-Funktionen zeichnen Box-Grenzen als gestrichelte Rechtecke - Fix: numpy array or-Verknuepfung an 7 Stellen in ocr_pipeline_api.py - 12 Unit-Tests fuer Box-Erkennung und Zone-Splitting Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
369
klausur-service/backend/cv_box_detect.py
Normal file
369
klausur-service/backend/cv_box_detect.py
Normal file
@@ -0,0 +1,369 @@
|
||||
"""
|
||||
Embedded box detection and page zone splitting for the CV vocabulary pipeline.
|
||||
|
||||
Detects boxes (grammar tips, exercises, etc.) that span the page width and
|
||||
interrupt the normal column layout. Splits the page into vertical zones so
|
||||
that column detection can run independently per zone.
|
||||
|
||||
Two-stage algorithm:
|
||||
1. Morphological line detection — finds bordered boxes via horizontal lines.
|
||||
2. Color/saturation fallback — finds shaded boxes without visible borders.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from cv_vocab_types import DetectedBox, PageZone
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = [
|
||||
"detect_boxes",
|
||||
"split_page_into_zones",
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stage 1: Morphological line detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _detect_boxes_by_lines(
|
||||
gray: np.ndarray,
|
||||
content_x: int,
|
||||
content_w: int,
|
||||
content_y: int,
|
||||
content_h: int,
|
||||
) -> List[DetectedBox]:
|
||||
"""Find boxes defined by pairs of long horizontal border lines.
|
||||
|
||||
Args:
|
||||
gray: Grayscale image (full page).
|
||||
content_x, content_w: Horizontal content bounds.
|
||||
content_y, content_h: Vertical content bounds.
|
||||
|
||||
Returns:
|
||||
List of DetectedBox for each detected bordered box.
|
||||
"""
|
||||
h, w = gray.shape[:2]
|
||||
|
||||
# Binarize: dark pixels → white on black background
|
||||
_, binary = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY_INV)
|
||||
|
||||
# Horizontal morphology kernel — at least 50% of content width
|
||||
kernel_w = max(50, content_w // 2)
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, 1))
|
||||
lines_img = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
|
||||
|
||||
# Horizontal projection: count line pixels per row
|
||||
h_proj = np.sum(lines_img[:, content_x:content_x + content_w] > 0, axis=1)
|
||||
line_threshold = content_w * 0.30
|
||||
|
||||
# Group consecutive rows with enough line pixels into line segments
|
||||
line_segments: List[Tuple[int, int]] = [] # (y_start, y_end)
|
||||
seg_start: Optional[int] = None
|
||||
for y in range(h):
|
||||
if h_proj[y] >= line_threshold:
|
||||
if seg_start is None:
|
||||
seg_start = y
|
||||
else:
|
||||
if seg_start is not None:
|
||||
line_segments.append((seg_start, y))
|
||||
seg_start = None
|
||||
if seg_start is not None:
|
||||
line_segments.append((seg_start, h))
|
||||
|
||||
if len(line_segments) < 2:
|
||||
return []
|
||||
|
||||
# Pair lines into boxes: top-line + bottom-line
|
||||
# Minimum box height: 30px. Maximum: 70% of content height.
|
||||
min_box_h = 30
|
||||
max_box_h = int(content_h * 0.70)
|
||||
|
||||
boxes: List[DetectedBox] = []
|
||||
used = set()
|
||||
for i, (top_start, top_end) in enumerate(line_segments):
|
||||
if i in used:
|
||||
continue
|
||||
for j in range(i + 1, len(line_segments)):
|
||||
if j in used:
|
||||
continue
|
||||
bot_start, bot_end = line_segments[j]
|
||||
box_y = top_start
|
||||
box_h = bot_end - top_start
|
||||
if box_h < min_box_h or box_h > max_box_h:
|
||||
continue
|
||||
|
||||
# Estimate border thickness from line segment heights
|
||||
border_top = top_end - top_start
|
||||
border_bot = bot_end - bot_start
|
||||
|
||||
box = DetectedBox(
|
||||
x=content_x,
|
||||
y=box_y,
|
||||
width=content_w,
|
||||
height=box_h,
|
||||
confidence=0.8,
|
||||
border_thickness=max(border_top, border_bot),
|
||||
)
|
||||
boxes.append(box)
|
||||
used.add(i)
|
||||
used.add(j)
|
||||
break # move to next top-line candidate
|
||||
|
||||
return boxes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stage 2: Color / saturation fallback
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _detect_boxes_by_color(
|
||||
img_bgr: np.ndarray,
|
||||
content_x: int,
|
||||
content_w: int,
|
||||
content_y: int,
|
||||
content_h: int,
|
||||
) -> List[DetectedBox]:
|
||||
"""Find boxes with shaded/colored background (no visible border lines).
|
||||
|
||||
Args:
|
||||
img_bgr: BGR color image (full page).
|
||||
content_x, content_w: Horizontal content bounds.
|
||||
content_y, content_h: Vertical content bounds.
|
||||
|
||||
Returns:
|
||||
List of DetectedBox for each detected shaded box.
|
||||
"""
|
||||
h, w = img_bgr.shape[:2]
|
||||
|
||||
hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
|
||||
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# Mask: pixels that are saturated OR noticeably darker than white
|
||||
sat_mask = hsv[:, :, 1] > 25
|
||||
dark_mask = gray < 220
|
||||
combined = (sat_mask | dark_mask).astype(np.uint8) * 255
|
||||
|
||||
# Close small gaps in the mask
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
|
||||
combined = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel)
|
||||
|
||||
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
min_area = content_w * content_h * 0.05
|
||||
min_box_h = 30
|
||||
max_box_h = int(content_h * 0.70)
|
||||
min_width_ratio = 0.60
|
||||
|
||||
boxes: List[DetectedBox] = []
|
||||
for cnt in contours:
|
||||
area = cv2.contourArea(cnt)
|
||||
if area < min_area:
|
||||
continue
|
||||
|
||||
# Approximate to polygon — check if roughly rectangular
|
||||
peri = cv2.arcLength(cnt, True)
|
||||
approx = cv2.approxPolyDP(cnt, 0.04 * peri, True)
|
||||
if len(approx) < 4 or len(approx) > 8:
|
||||
continue
|
||||
|
||||
bx, by, bw, bh = cv2.boundingRect(cnt)
|
||||
|
||||
# Width filter: must span most of the page
|
||||
if bw < content_w * min_width_ratio:
|
||||
continue
|
||||
|
||||
# Height filter
|
||||
if bh < min_box_h or bh > max_box_h:
|
||||
continue
|
||||
|
||||
boxes.append(DetectedBox(
|
||||
x=bx,
|
||||
y=by,
|
||||
width=bw,
|
||||
height=bh,
|
||||
confidence=0.6,
|
||||
border_thickness=0,
|
||||
))
|
||||
|
||||
return boxes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _validate_box(
|
||||
box: DetectedBox,
|
||||
gray: np.ndarray,
|
||||
content_w: int,
|
||||
content_h: int,
|
||||
median_row_gap: int,
|
||||
) -> bool:
|
||||
"""Validate that a detected box is genuine (not a table-row separator etc.)."""
|
||||
# Must span > 60% of content width
|
||||
if box.width < content_w * 0.60:
|
||||
return False
|
||||
|
||||
# Height constraints
|
||||
if box.height < 30 or box.height > content_h * 0.70:
|
||||
return False
|
||||
|
||||
# Must not be confused with a table-row separator:
|
||||
# real boxes are at least 3x the median row gap
|
||||
if median_row_gap > 0 and box.height < median_row_gap * 3:
|
||||
return False
|
||||
|
||||
# Must contain some text (ink density check)
|
||||
roi = gray[box.y:box.y + box.height, box.x:box.x + box.width]
|
||||
if roi.size == 0:
|
||||
return False
|
||||
ink_ratio = np.sum(roi < 128) / roi.size
|
||||
if ink_ratio < 0.002: # nearly empty → not a real content box
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API: detect_boxes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_boxes(
|
||||
img_bgr: np.ndarray,
|
||||
content_x: int,
|
||||
content_w: int,
|
||||
content_y: int,
|
||||
content_h: int,
|
||||
median_row_gap: int = 0,
|
||||
) -> List[DetectedBox]:
|
||||
"""Detect embedded boxes on a page image.
|
||||
|
||||
Runs line-based detection first, then color-based fallback if no
|
||||
bordered boxes are found.
|
||||
|
||||
Args:
|
||||
img_bgr: BGR color image (full page or cropped).
|
||||
content_x, content_w: Horizontal content bounds.
|
||||
content_y, content_h: Vertical content bounds.
|
||||
median_row_gap: Median row gap height (for filtering out table separators).
|
||||
|
||||
Returns:
|
||||
List of validated DetectedBox instances, sorted by y position.
|
||||
"""
|
||||
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# Stage 1: Line-based detection
|
||||
boxes = _detect_boxes_by_lines(gray, content_x, content_w, content_y, content_h)
|
||||
|
||||
# Stage 2: Color fallback if no bordered boxes found
|
||||
if not boxes:
|
||||
boxes = _detect_boxes_by_color(img_bgr, content_x, content_w, content_y, content_h)
|
||||
|
||||
# Validate
|
||||
validated = [b for b in boxes if _validate_box(b, gray, content_w, content_h, median_row_gap)]
|
||||
|
||||
# Sort top to bottom
|
||||
validated.sort(key=lambda b: b.y)
|
||||
|
||||
if validated:
|
||||
logger.info(f"BoxDetect: {len(validated)} box(es) detected "
|
||||
f"(from {len(boxes)} candidates)")
|
||||
else:
|
||||
logger.debug("BoxDetect: no boxes detected")
|
||||
|
||||
return validated
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Zone Splitting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def split_page_into_zones(
|
||||
content_x: int,
|
||||
content_y: int,
|
||||
content_w: int,
|
||||
content_h: int,
|
||||
boxes: List[DetectedBox],
|
||||
min_zone_height: int = 40,
|
||||
) -> List[PageZone]:
|
||||
"""Split a page into vertical zones based on detected boxes.
|
||||
|
||||
Regions above, between, and below boxes become 'content' zones;
|
||||
box regions become 'box' zones.
|
||||
|
||||
Args:
|
||||
content_x, content_y, content_w, content_h: Content area bounds.
|
||||
boxes: Detected boxes, sorted by y position.
|
||||
min_zone_height: Minimum height for a content zone to be kept.
|
||||
|
||||
Returns:
|
||||
List of PageZone, ordered top to bottom.
|
||||
"""
|
||||
if not boxes:
|
||||
# Single zone: entire content area
|
||||
return [PageZone(
|
||||
index=0,
|
||||
zone_type='content',
|
||||
y=content_y,
|
||||
height=content_h,
|
||||
x=content_x,
|
||||
width=content_w,
|
||||
)]
|
||||
|
||||
zones: List[PageZone] = []
|
||||
zone_idx = 0
|
||||
cursor_y = content_y
|
||||
content_bottom = content_y + content_h
|
||||
|
||||
for box in boxes:
|
||||
# Content zone above this box
|
||||
gap_above = box.y - cursor_y
|
||||
if gap_above >= min_zone_height:
|
||||
zones.append(PageZone(
|
||||
index=zone_idx,
|
||||
zone_type='content',
|
||||
y=cursor_y,
|
||||
height=gap_above,
|
||||
x=content_x,
|
||||
width=content_w,
|
||||
))
|
||||
zone_idx += 1
|
||||
|
||||
# Box zone
|
||||
zones.append(PageZone(
|
||||
index=zone_idx,
|
||||
zone_type='box',
|
||||
y=box.y,
|
||||
height=box.height,
|
||||
x=box.x,
|
||||
width=box.width,
|
||||
box=box,
|
||||
))
|
||||
zone_idx += 1
|
||||
|
||||
cursor_y = box.y + box.height
|
||||
|
||||
# Content zone below last box
|
||||
remaining = content_bottom - cursor_y
|
||||
if remaining >= min_zone_height:
|
||||
zones.append(PageZone(
|
||||
index=zone_idx,
|
||||
zone_type='content',
|
||||
y=cursor_y,
|
||||
height=remaining,
|
||||
x=content_x,
|
||||
width=content_w,
|
||||
))
|
||||
|
||||
logger.info(f"ZoneSplit: {len(zones)} zones from {len(boxes)} box(es): "
|
||||
f"{[z.zone_type for z in zones]}")
|
||||
|
||||
return zones
|
||||
@@ -13,10 +13,12 @@ import numpy as np
|
||||
|
||||
from cv_vocab_types import (
|
||||
ColumnGeometry,
|
||||
DetectedBox,
|
||||
DocumentTypeResult,
|
||||
ENGLISH_FUNCTION_WORDS,
|
||||
GERMAN_FUNCTION_WORDS,
|
||||
PageRegion,
|
||||
PageZone,
|
||||
RowGeometry,
|
||||
)
|
||||
|
||||
@@ -3034,3 +3036,133 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
|
||||
f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer','margin_top','margin_bottom')]}")
|
||||
|
||||
return regions
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Zone-aware column geometry detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_column_geometry_zoned(
|
||||
ocr_img: np.ndarray,
|
||||
dewarped_bgr: np.ndarray,
|
||||
) -> Optional[Tuple[
|
||||
List[ColumnGeometry], # flat column list (all zones)
|
||||
int, int, int, int, # left_x, right_x, top_y, bottom_y
|
||||
List[Dict], # word_dicts
|
||||
np.ndarray, # inv
|
||||
List[Dict], # zones (serializable)
|
||||
List[DetectedBox], # detected boxes
|
||||
]]:
|
||||
"""Zone-aware column geometry detection.
|
||||
|
||||
1. Finds content bounds.
|
||||
2. Runs box detection.
|
||||
3. If boxes found: splits page into zones, runs detect_column_geometry()
|
||||
per content zone on the corresponding sub-image.
|
||||
4. If no boxes: delegates entirely to detect_column_geometry() (backward compat).
|
||||
|
||||
Returns:
|
||||
Extended tuple: (geometries, left_x, right_x, top_y, bottom_y,
|
||||
word_dicts, inv, zones_data, boxes)
|
||||
or None if detection fails.
|
||||
"""
|
||||
from cv_box_detect import detect_boxes, split_page_into_zones
|
||||
|
||||
# First run normal detection to get content bounds and word data
|
||||
geo_result = detect_column_geometry(ocr_img, dewarped_bgr)
|
||||
if geo_result is None:
|
||||
return None
|
||||
|
||||
geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result
|
||||
content_w = right_x - left_x
|
||||
content_h = bottom_y - top_y
|
||||
|
||||
# Detect boxes in the image
|
||||
boxes = detect_boxes(
|
||||
dewarped_bgr, left_x, content_w, top_y, content_h,
|
||||
)
|
||||
|
||||
if not boxes:
|
||||
# No boxes — single zone, backward compatible
|
||||
zone_data = [{
|
||||
"index": 0,
|
||||
"zone_type": "content",
|
||||
"y": top_y,
|
||||
"height": content_h,
|
||||
"x": left_x,
|
||||
"width": content_w,
|
||||
"columns": [], # filled later by caller
|
||||
}]
|
||||
return (geometries, left_x, right_x, top_y, bottom_y,
|
||||
word_dicts, inv, zone_data, boxes)
|
||||
|
||||
# Split into zones
|
||||
zones = split_page_into_zones(left_x, top_y, content_w, content_h, boxes)
|
||||
|
||||
# Run column detection per content zone
|
||||
all_geometries: List[ColumnGeometry] = []
|
||||
zones_data: List[Dict] = []
|
||||
|
||||
for zone in zones:
|
||||
zone_dict: Dict = {
|
||||
"index": zone.index,
|
||||
"zone_type": zone.zone_type,
|
||||
"y": zone.y,
|
||||
"height": zone.height,
|
||||
"x": zone.x,
|
||||
"width": zone.width,
|
||||
"columns": [],
|
||||
}
|
||||
|
||||
if zone.box is not None:
|
||||
zone_dict["box"] = {
|
||||
"x": zone.box.x,
|
||||
"y": zone.box.y,
|
||||
"width": zone.box.width,
|
||||
"height": zone.box.height,
|
||||
"confidence": zone.box.confidence,
|
||||
"border_thickness": zone.box.border_thickness,
|
||||
}
|
||||
|
||||
if zone.zone_type == 'content' and zone.height >= 40:
|
||||
# Extract sub-image for this zone
|
||||
zone_y_end = zone.y + zone.height
|
||||
sub_ocr = ocr_img[zone.y:zone_y_end, :]
|
||||
sub_bgr = dewarped_bgr[zone.y:zone_y_end, :]
|
||||
|
||||
sub_result = detect_column_geometry(sub_ocr, sub_bgr)
|
||||
if sub_result is not None:
|
||||
sub_geoms, sub_lx, sub_rx, sub_ty, sub_by, _sub_words, _sub_inv = sub_result
|
||||
|
||||
# Offset column y-coordinates back to absolute page coords
|
||||
for g in sub_geoms:
|
||||
g.y += zone.y
|
||||
|
||||
zone_cols = []
|
||||
for g in sub_geoms:
|
||||
zone_cols.append({
|
||||
"index": g.index,
|
||||
"x": g.x,
|
||||
"y": g.y,
|
||||
"width": g.width,
|
||||
"height": g.height,
|
||||
"word_count": g.word_count,
|
||||
"width_ratio": g.width_ratio,
|
||||
"zone_index": zone.index,
|
||||
})
|
||||
zone_dict["columns"] = zone_cols
|
||||
all_geometries.extend(sub_geoms)
|
||||
else:
|
||||
logger.debug(f"ZonedColumns: zone {zone.index} column detection returned None")
|
||||
|
||||
zones_data.append(zone_dict)
|
||||
|
||||
# If per-zone detection produced no columns, fall back to the original
|
||||
if not all_geometries:
|
||||
all_geometries = geometries
|
||||
|
||||
logger.info(f"ZonedColumns: {len(boxes)} box(es), {len(zones)} zone(s), "
|
||||
f"{len(all_geometries)} total columns")
|
||||
|
||||
return (all_geometries, left_x, right_x, top_y, bottom_y,
|
||||
word_dicts, inv, zones_data, boxes)
|
||||
|
||||
@@ -19,6 +19,7 @@ from cv_preprocessing import * # noqa: F401,F403
|
||||
from cv_layout import * # noqa: F401,F403
|
||||
from cv_ocr_engines import * # noqa: F401,F403
|
||||
from cv_cell_grid import * # noqa: F401,F403
|
||||
from cv_box_detect import * # noqa: F401,F403
|
||||
from cv_review import * # noqa: F401,F403
|
||||
|
||||
# Private names used by consumers — not covered by wildcard re-exports.
|
||||
|
||||
@@ -154,3 +154,27 @@ class DocumentTypeResult:
|
||||
pipeline: str # 'cell_first' | 'full_page'
|
||||
skip_steps: List[str] = field(default_factory=list) # e.g. ['columns', 'rows']
|
||||
features: Dict[str, Any] = field(default_factory=dict) # debug info
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetectedBox:
|
||||
"""An embedded box (e.g. grammar tip, exercise) detected on the page."""
|
||||
x: int # absolute pixel position
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
confidence: float # 0.0-1.0
|
||||
border_thickness: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageZone:
|
||||
"""A horizontal zone of the page — either normal content or a detected box."""
|
||||
index: int # 0-based, top to bottom
|
||||
zone_type: str # 'content' | 'box'
|
||||
y: int # absolute pixel y
|
||||
height: int
|
||||
x: int
|
||||
width: int
|
||||
box: Optional[DetectedBox] = None
|
||||
columns: List[ColumnGeometry] = field(default_factory=list)
|
||||
|
||||
@@ -57,6 +57,7 @@ from cv_vocab_pipeline import (
|
||||
deskew_image_iterative,
|
||||
deskew_two_pass,
|
||||
detect_column_geometry,
|
||||
detect_column_geometry_zoned,
|
||||
detect_document_type,
|
||||
detect_row_geometry,
|
||||
expand_narrow_columns,
|
||||
@@ -1001,7 +1002,7 @@ async def detect_type(session_id: str):
|
||||
await _load_session_to_cache(session_id)
|
||||
cached = _get_cached(session_id)
|
||||
|
||||
img_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr")
|
||||
img_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
||||
if img_bgr is None:
|
||||
raise HTTPException(status_code=400, detail="Crop or dewarp must be completed first")
|
||||
|
||||
@@ -1052,7 +1053,7 @@ async def detect_columns(session_id: str):
|
||||
await _load_session_to_cache(session_id)
|
||||
cached = _get_cached(session_id)
|
||||
|
||||
img_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr")
|
||||
img_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
||||
if img_bgr is None:
|
||||
raise HTTPException(status_code=400, detail="Crop or dewarp must be completed before column detection")
|
||||
|
||||
@@ -1062,21 +1063,26 @@ async def detect_columns(session_id: str):
|
||||
ocr_img = create_ocr_image(img_bgr)
|
||||
h, w = ocr_img.shape[:2]
|
||||
|
||||
# Phase A: Geometry detection (returns word_dicts + inv for reuse)
|
||||
geo_result = detect_column_geometry(ocr_img, img_bgr)
|
||||
# Phase A: Zone-aware geometry detection
|
||||
zoned_result = detect_column_geometry_zoned(ocr_img, img_bgr)
|
||||
|
||||
if geo_result is None:
|
||||
if zoned_result is None:
|
||||
# Fallback to projection-based layout
|
||||
layout_img = create_layout_image(img_bgr)
|
||||
regions = analyze_layout(layout_img, ocr_img)
|
||||
zones_data = None
|
||||
boxes_detected = 0
|
||||
else:
|
||||
geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result
|
||||
geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv, zones_data, boxes = zoned_result
|
||||
content_w = right_x - left_x
|
||||
boxes_detected = len(boxes)
|
||||
|
||||
# Cache intermediates for row detection (avoids second Tesseract run)
|
||||
cached["_word_dicts"] = word_dicts
|
||||
cached["_inv"] = inv
|
||||
cached["_content_bounds"] = (left_x, right_x, top_y, bottom_y)
|
||||
cached["_zones_data"] = zones_data
|
||||
cached["_boxes_detected"] = boxes_detected
|
||||
|
||||
# Detect header/footer early so sub-column clustering ignores them
|
||||
header_y, footer_y = _detect_header_footer_gaps(inv, w, h) if inv is not None else (None, None)
|
||||
@@ -1106,8 +1112,13 @@ async def detect_columns(session_id: str):
|
||||
"columns": columns,
|
||||
"classification_methods": methods,
|
||||
"duration_seconds": round(duration, 2),
|
||||
"boxes_detected": boxes_detected,
|
||||
}
|
||||
|
||||
# Add zone data when boxes are present
|
||||
if zones_data and boxes_detected > 0:
|
||||
column_result["zones"] = zones_data
|
||||
|
||||
# Persist to DB — also invalidate downstream results (rows, words)
|
||||
await update_session_db(
|
||||
session_id,
|
||||
@@ -1124,13 +1135,14 @@ async def detect_columns(session_id: str):
|
||||
|
||||
col_count = len([c for c in columns if c["type"].startswith("column")])
|
||||
logger.info(f"OCR Pipeline: columns session {session_id}: "
|
||||
f"{col_count} columns detected ({duration:.2f}s)")
|
||||
f"{col_count} columns detected, {boxes_detected} box(es) ({duration:.2f}s)")
|
||||
|
||||
img_w = img_bgr.shape[1]
|
||||
await _append_pipeline_log(session_id, "columns", {
|
||||
"total_columns": len(columns),
|
||||
"column_widths_pct": [round(c["width"] / img_w * 100, 1) for c in columns],
|
||||
"column_types": [c["type"] for c in columns],
|
||||
"boxes_detected": boxes_detected,
|
||||
}, duration_ms=int(duration * 1000))
|
||||
|
||||
return {
|
||||
@@ -1266,6 +1278,27 @@ async def _get_columns_overlay(session_id: str) -> Response:
|
||||
# Blend overlay at 20% opacity
|
||||
cv2.addWeighted(overlay, 0.2, img, 0.8, 0, img)
|
||||
|
||||
# Draw detected box boundaries as dashed rectangles
|
||||
zones = column_result.get("zones", [])
|
||||
for zone in zones:
|
||||
if zone.get("zone_type") == "box" and zone.get("box"):
|
||||
box = zone["box"]
|
||||
bx, by = box["x"], box["y"]
|
||||
bw, bh = box["width"], box["height"]
|
||||
box_color = (0, 200, 255) # Yellow (BGR)
|
||||
# Draw dashed rectangle by drawing short line segments
|
||||
dash_len = 15
|
||||
for edge_x in range(bx, bx + bw, dash_len * 2):
|
||||
end_x = min(edge_x + dash_len, bx + bw)
|
||||
cv2.line(img, (edge_x, by), (end_x, by), box_color, 2)
|
||||
cv2.line(img, (edge_x, by + bh), (end_x, by + bh), box_color, 2)
|
||||
for edge_y in range(by, by + bh, dash_len * 2):
|
||||
end_y = min(edge_y + dash_len, by + bh)
|
||||
cv2.line(img, (bx, edge_y), (bx, end_y), box_color, 2)
|
||||
cv2.line(img, (bx + bw, edge_y), (bx + bw, end_y), box_color, 2)
|
||||
cv2.putText(img, "BOX", (bx + 10, by + bh - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, box_color, 2)
|
||||
|
||||
success, result_png = cv2.imencode(".png", img)
|
||||
if not success:
|
||||
raise HTTPException(status_code=500, detail="Failed to encode overlay image")
|
||||
@@ -1284,7 +1317,7 @@ async def detect_rows(session_id: str):
|
||||
await _load_session_to_cache(session_id)
|
||||
cached = _get_cached(session_id)
|
||||
|
||||
dewarped_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr")
|
||||
dewarped_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
||||
if dewarped_bgr is None:
|
||||
raise HTTPException(status_code=400, detail="Crop or dewarp must be completed before row detection")
|
||||
|
||||
@@ -1315,7 +1348,7 @@ async def detect_rows(session_id: str):
|
||||
# Build serializable result (exclude words to keep payload small)
|
||||
rows_data = []
|
||||
for r in rows:
|
||||
rows_data.append({
|
||||
rd = {
|
||||
"index": r.index,
|
||||
"x": r.x,
|
||||
"y": r.y,
|
||||
@@ -1324,7 +1357,9 @@ async def detect_rows(session_id: str):
|
||||
"word_count": r.word_count,
|
||||
"row_type": r.row_type,
|
||||
"gap_before": r.gap_before,
|
||||
})
|
||||
"zone_index": 0,
|
||||
}
|
||||
rows_data.append(rd)
|
||||
|
||||
type_counts = {}
|
||||
for r in rows:
|
||||
@@ -1456,7 +1491,7 @@ async def detect_words(
|
||||
await _load_session_to_cache(session_id)
|
||||
cached = _get_cached(session_id)
|
||||
|
||||
dewarped_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr")
|
||||
dewarped_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
||||
if dewarped_bgr is None:
|
||||
logger.warning("detect_words: no cropped/dewarped image for session %s (cache keys: %s)",
|
||||
session_id, [k for k in cached.keys() if k.endswith('_bgr')])
|
||||
@@ -1560,6 +1595,10 @@ async def detect_words(
|
||||
)
|
||||
duration = time.time() - t0
|
||||
|
||||
# Add zone_index to each cell (default 0 for backward compatibility)
|
||||
for cell in cells:
|
||||
cell.setdefault("zone_index", 0)
|
||||
|
||||
# Layout detection
|
||||
col_types = {c['type'] for c in columns_meta}
|
||||
is_vocab = bool(col_types & {'column_en', 'column_de'})
|
||||
@@ -2749,6 +2788,22 @@ async def _get_rows_overlay(session_id: str) -> Response:
|
||||
# Blend overlay at 15% opacity
|
||||
cv2.addWeighted(overlay, 0.15, img, 0.85, 0, img)
|
||||
|
||||
# Draw zone separator lines if zones exist
|
||||
column_result = session.get("column_result") or {}
|
||||
zones = column_result.get("zones", [])
|
||||
if zones:
|
||||
img_w_px = img.shape[1]
|
||||
zone_color = (0, 200, 255) # Yellow (BGR)
|
||||
dash_len = 20
|
||||
for zone in zones:
|
||||
if zone.get("zone_type") == "box":
|
||||
zy = zone["y"]
|
||||
zh = zone["height"]
|
||||
for line_y in [zy, zy + zh]:
|
||||
for sx in range(0, img_w_px, dash_len * 2):
|
||||
ex = min(sx + dash_len, img_w_px)
|
||||
cv2.line(img, (sx, line_y), (ex, line_y), zone_color, 2)
|
||||
|
||||
success, result_png = cv2.imencode(".png", img)
|
||||
if not success:
|
||||
raise HTTPException(status_code=500, detail="Failed to encode overlay image")
|
||||
@@ -3182,7 +3237,7 @@ async def run_auto(session_id: str, req: RunAutoRequest, request: Request):
|
||||
yield await _auto_sse_event("columns", "start", {})
|
||||
try:
|
||||
t0 = time.time()
|
||||
col_img = cached.get("cropped_bgr") or cached.get("dewarped_bgr")
|
||||
col_img = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
||||
if col_img is None:
|
||||
raise ValueError("Cropped/dewarped image not available")
|
||||
|
||||
@@ -3243,7 +3298,7 @@ async def run_auto(session_id: str, req: RunAutoRequest, request: Request):
|
||||
yield await _auto_sse_event("rows", "start", {})
|
||||
try:
|
||||
t0 = time.time()
|
||||
row_img = cached.get("cropped_bgr") or cached.get("dewarped_bgr")
|
||||
row_img = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
||||
session = await get_session_db(session_id)
|
||||
column_result = session.get("column_result") or cached.get("column_result")
|
||||
if not column_result or not column_result.get("columns"):
|
||||
@@ -3321,7 +3376,7 @@ async def run_auto(session_id: str, req: RunAutoRequest, request: Request):
|
||||
yield await _auto_sse_event("words", "start", {"engine": req.ocr_engine})
|
||||
try:
|
||||
t0 = time.time()
|
||||
word_img = cached.get("cropped_bgr") or cached.get("dewarped_bgr")
|
||||
word_img = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
||||
session = await get_session_db(session_id)
|
||||
|
||||
column_result = session.get("column_result") or cached.get("column_result")
|
||||
|
||||
226
klausur-service/backend/tests/test_cv_box_detect.py
Normal file
226
klausur-service/backend/tests/test_cv_box_detect.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""
|
||||
Tests for cv_box_detect.py — box detection and page zone splitting.
|
||||
|
||||
Lizenz: Apache 2.0
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
import cv2
|
||||
|
||||
from cv_box_detect import detect_boxes, split_page_into_zones
|
||||
from cv_vocab_types import DetectedBox, PageZone
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _white_image(width: int = 1200, height: int = 1800) -> np.ndarray:
|
||||
"""Create a plain white BGR image."""
|
||||
return np.ones((height, width, 3), dtype=np.uint8) * 255
|
||||
|
||||
|
||||
def _draw_bordered_box(img: np.ndarray, x: int, y: int, w: int, h: int,
|
||||
thickness: int = 3, fill_text: bool = True) -> np.ndarray:
|
||||
"""Draw a bordered box (rectangle) on the image with some inner text."""
|
||||
cv2.rectangle(img, (x, y), (x + w, y + h), (0, 0, 0), thickness)
|
||||
if fill_text:
|
||||
# Add some dark text inside so the box passes ink-density validation
|
||||
cv2.putText(img, "Grammar Tip: Use the present perfect.",
|
||||
(x + 20, y + h // 2),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)
|
||||
return img
|
||||
|
||||
|
||||
def _draw_colored_box(img: np.ndarray, x: int, y: int, w: int, h: int,
|
||||
color: tuple = (200, 230, 255)) -> np.ndarray:
|
||||
"""Draw a shaded/colored box (no border lines) with some inner text."""
|
||||
cv2.rectangle(img, (x, y), (x + w, y + h), color, -1)
|
||||
cv2.putText(img, "Exercise: Fill in the blanks.",
|
||||
(x + 20, y + h // 2),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)
|
||||
return img
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# detect_boxes tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDetectBoxes:
|
||||
"""Tests for the detect_boxes() function."""
|
||||
|
||||
def test_no_boxes_returns_empty(self):
|
||||
"""A plain white image should produce no detected boxes."""
|
||||
img = _white_image()
|
||||
boxes = detect_boxes(img, content_x=50, content_w=1100,
|
||||
content_y=50, content_h=1700)
|
||||
assert boxes == []
|
||||
|
||||
def test_single_border_box(self):
|
||||
"""A single bordered rectangle should be detected."""
|
||||
img = _white_image()
|
||||
_draw_bordered_box(img, x=60, y=500, w=1080, h=200, thickness=3)
|
||||
|
||||
boxes = detect_boxes(img, content_x=50, content_w=1100,
|
||||
content_y=50, content_h=1700)
|
||||
|
||||
assert len(boxes) >= 1
|
||||
box = boxes[0]
|
||||
assert isinstance(box, DetectedBox)
|
||||
assert box.confidence > 0
|
||||
# Box should roughly be in the right area
|
||||
assert 400 <= box.y <= 600
|
||||
assert box.height >= 100
|
||||
|
||||
def test_colored_box_fallback(self):
|
||||
"""A colored box without border lines should be detected by color fallback."""
|
||||
img = _white_image()
|
||||
_draw_colored_box(img, x=60, y=600, w=1080, h=180, color=(140, 200, 240))
|
||||
|
||||
boxes = detect_boxes(img, content_x=50, content_w=1100,
|
||||
content_y=50, content_h=1700)
|
||||
|
||||
assert len(boxes) >= 1
|
||||
box = boxes[0]
|
||||
assert isinstance(box, DetectedBox)
|
||||
# Color-detected boxes have lower confidence
|
||||
assert box.confidence > 0
|
||||
|
||||
def test_box_too_small_filtered(self):
|
||||
"""A box shorter than 30px should be filtered out."""
|
||||
img = _white_image()
|
||||
# Draw a thin horizontal band (20px high) — should not count as a box
|
||||
_draw_bordered_box(img, x=60, y=500, w=1080, h=20, thickness=1)
|
||||
|
||||
boxes = detect_boxes(img, content_x=50, content_w=1100,
|
||||
content_y=50, content_h=1700)
|
||||
|
||||
assert len(boxes) == 0
|
||||
|
||||
def test_box_too_narrow_filtered(self):
|
||||
"""A box narrower than 60% of content width should be filtered out."""
|
||||
img = _white_image()
|
||||
# Draw a narrow box (only 400px wide on a 1100px content area = 36%)
|
||||
_draw_bordered_box(img, x=60, y=500, w=400, h=200, thickness=3)
|
||||
|
||||
boxes = detect_boxes(img, content_x=50, content_w=1100,
|
||||
content_y=50, content_h=1700)
|
||||
|
||||
assert len(boxes) == 0
|
||||
|
||||
def test_boxes_sorted_by_y(self):
|
||||
"""Multiple boxes should be returned sorted top to bottom."""
|
||||
img = _white_image()
|
||||
_draw_bordered_box(img, x=60, y=1000, w=1080, h=150, thickness=3)
|
||||
_draw_bordered_box(img, x=60, y=400, w=1080, h=150, thickness=3)
|
||||
|
||||
boxes = detect_boxes(img, content_x=50, content_w=1100,
|
||||
content_y=50, content_h=1700)
|
||||
|
||||
if len(boxes) >= 2:
|
||||
assert boxes[0].y <= boxes[1].y
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# split_page_into_zones tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSplitPageIntoZones:
|
||||
"""Tests for the split_page_into_zones() function."""
|
||||
|
||||
def test_split_zones_no_boxes(self):
|
||||
"""Without boxes, should return a single content zone."""
|
||||
zones = split_page_into_zones(
|
||||
content_x=50, content_y=100, content_w=1100, content_h=1600,
|
||||
boxes=[],
|
||||
)
|
||||
|
||||
assert len(zones) == 1
|
||||
assert zones[0].zone_type == 'content'
|
||||
assert zones[0].y == 100
|
||||
assert zones[0].height == 1600
|
||||
|
||||
def test_split_zones_one_box(self):
|
||||
"""One box should create up to 3 zones: above, box, below."""
|
||||
box = DetectedBox(x=50, y=500, width=1100, height=200,
|
||||
confidence=0.8, border_thickness=3)
|
||||
zones = split_page_into_zones(
|
||||
content_x=50, content_y=100, content_w=1100, content_h=1600,
|
||||
boxes=[box],
|
||||
)
|
||||
|
||||
# Should have 3 zones: content above, box, content below
|
||||
assert len(zones) == 3
|
||||
assert zones[0].zone_type == 'content'
|
||||
assert zones[0].y == 100
|
||||
assert zones[0].height == 400 # 500 - 100
|
||||
|
||||
assert zones[1].zone_type == 'box'
|
||||
assert zones[1].y == 500
|
||||
assert zones[1].height == 200
|
||||
assert zones[1].box is not None
|
||||
|
||||
assert zones[2].zone_type == 'content'
|
||||
assert zones[2].y == 700 # 500 + 200
|
||||
assert zones[2].height == 1000 # (100+1600) - 700
|
||||
|
||||
def test_split_zones_two_boxes(self):
|
||||
"""Two boxes should create up to 5 zones."""
|
||||
box1 = DetectedBox(x=50, y=400, width=1100, height=150,
|
||||
confidence=0.8, border_thickness=3)
|
||||
box2 = DetectedBox(x=50, y=900, width=1100, height=150,
|
||||
confidence=0.8, border_thickness=3)
|
||||
zones = split_page_into_zones(
|
||||
content_x=50, content_y=100, content_w=1100, content_h=1600,
|
||||
boxes=[box1, box2],
|
||||
)
|
||||
|
||||
assert len(zones) == 5
|
||||
types = [z.zone_type for z in zones]
|
||||
assert types == ['content', 'box', 'content', 'box', 'content']
|
||||
|
||||
def test_split_zones_min_height(self):
|
||||
"""Content zones smaller than min_zone_height should be dropped."""
|
||||
# Box very close to the top — gap above is only 10px
|
||||
box = DetectedBox(x=50, y=110, width=1100, height=200,
|
||||
confidence=0.8, border_thickness=3)
|
||||
zones = split_page_into_zones(
|
||||
content_x=50, content_y=100, content_w=1100, content_h=1600,
|
||||
boxes=[box],
|
||||
min_zone_height=40,
|
||||
)
|
||||
|
||||
# Gap above box is only 10px < 40px min → should be skipped
|
||||
assert zones[0].zone_type == 'box'
|
||||
# Remaining should be content below the box
|
||||
assert any(z.zone_type == 'content' for z in zones)
|
||||
|
||||
def test_zone_indices_sequential(self):
|
||||
"""Zone indices should be sequential starting from 0."""
|
||||
box = DetectedBox(x=50, y=500, width=1100, height=200,
|
||||
confidence=0.8, border_thickness=3)
|
||||
zones = split_page_into_zones(
|
||||
content_x=50, content_y=100, content_w=1100, content_h=1600,
|
||||
boxes=[box],
|
||||
)
|
||||
|
||||
indices = [z.index for z in zones]
|
||||
assert indices == list(range(len(zones)))
|
||||
|
||||
def test_backward_compat_no_boxes(self):
|
||||
"""Without boxes, result should be identical: single zone covering full area."""
|
||||
zones = split_page_into_zones(
|
||||
content_x=50, content_y=100, content_w=1100, content_h=1600,
|
||||
boxes=[],
|
||||
)
|
||||
|
||||
assert len(zones) == 1
|
||||
z = zones[0]
|
||||
assert z.zone_type == 'content'
|
||||
assert z.x == 50
|
||||
assert z.y == 100
|
||||
assert z.width == 1100
|
||||
assert z.height == 1600
|
||||
assert z.box is None
|
||||
Reference in New Issue
Block a user