feat: add color detection for OCR word boxes
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m51s
CI / test-nodejs-website (push) Has been cancelled
CI / test-python-agent-core (push) Has been cancelled
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m51s
CI / test-nodejs-website (push) Has been cancelled
CI / test-python-agent-core (push) Has been cancelled
New cv_color_detect.py module: - detect_word_colors(): annotates existing words with text color (HSV analysis) - recover_colored_text(): finds colored text regions missed by standard OCR (e.g. red ! markers) using HSV masks + contour detection Integrated into build-grid: words get color/color_name fields, recovered colored regions are merged into the word list before grid building. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
253
klausur-service/backend/cv_color_detect.py
Normal file
253
klausur-service/backend/cv_color_detect.py
Normal file
@@ -0,0 +1,253 @@
|
||||
"""
|
||||
Color detection for OCR word boxes.
|
||||
|
||||
Detects the text color of existing OCR words and recovers colored text
|
||||
regions (e.g. red markers, blue headings) that standard OCR may have missed.
|
||||
|
||||
Standard OCR (Tesseract, PaddleOCR) binarises images before processing,
|
||||
destroying all color information. This module adds it back by sampling
|
||||
HSV pixel values at word-box positions and finding colored regions that
|
||||
no word-box covers.
|
||||
|
||||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HSV color ranges (OpenCV: H 0-180, S 0-255, V 0-255)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_COLOR_RANGES: Dict[str, List[Tuple[np.ndarray, np.ndarray]]] = {
|
||||
"red": [
|
||||
(np.array([0, 70, 50]), np.array([10, 255, 255])),
|
||||
(np.array([170, 70, 50]), np.array([180, 255, 255])),
|
||||
],
|
||||
"orange": [
|
||||
(np.array([10, 70, 50]), np.array([25, 255, 255])),
|
||||
],
|
||||
"yellow": [
|
||||
(np.array([25, 70, 50]), np.array([35, 255, 255])),
|
||||
],
|
||||
"green": [
|
||||
(np.array([35, 70, 50]), np.array([85, 255, 255])),
|
||||
],
|
||||
"blue": [
|
||||
(np.array([100, 70, 50]), np.array([130, 255, 255])),
|
||||
],
|
||||
"purple": [
|
||||
(np.array([130, 70, 50]), np.array([170, 255, 255])),
|
||||
],
|
||||
}
|
||||
|
||||
_COLOR_HEX: Dict[str, str] = {
|
||||
"black": "#000000",
|
||||
"gray": "#6b7280",
|
||||
"red": "#dc2626",
|
||||
"orange": "#ea580c",
|
||||
"yellow": "#ca8a04",
|
||||
"green": "#16a34a",
|
||||
"blue": "#2563eb",
|
||||
"purple": "#9333ea",
|
||||
}
|
||||
|
||||
|
||||
def _hue_to_color_name(hue: float) -> str:
|
||||
"""Map OpenCV hue (0-180) to a color name."""
|
||||
if hue < 10 or hue > 170:
|
||||
return "red"
|
||||
if hue < 25:
|
||||
return "orange"
|
||||
if hue < 35:
|
||||
return "yellow"
|
||||
if hue < 85:
|
||||
return "green"
|
||||
if hue < 130:
|
||||
return "blue"
|
||||
return "purple"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. Color annotation for existing word boxes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_word_colors(
|
||||
img_bgr: np.ndarray,
|
||||
word_boxes: List[Dict],
|
||||
sat_threshold: int = 50,
|
||||
) -> None:
|
||||
"""Annotate each word_box in-place with its detected text color.
|
||||
|
||||
Adds ``color`` (hex string) and ``color_name`` (e.g. 'red', 'black')
|
||||
keys to each dict.
|
||||
|
||||
Algorithm per word:
|
||||
1. Crop the word region from the image.
|
||||
2. Build a text-pixel mask (dark pixels OR high-saturation pixels).
|
||||
3. Sample HSV values at mask positions.
|
||||
4. If mean saturation ≥ threshold → classify hue; else → black.
|
||||
"""
|
||||
if img_bgr is None or not word_boxes:
|
||||
return
|
||||
|
||||
img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
|
||||
img_h, img_w = img_bgr.shape[:2]
|
||||
|
||||
colored_count = 0
|
||||
|
||||
for wb in word_boxes:
|
||||
x1 = max(0, int(wb["left"]))
|
||||
y1 = max(0, int(wb["top"]))
|
||||
x2 = min(img_w, int(wb["left"] + wb["width"]))
|
||||
y2 = min(img_h, int(wb["top"] + wb["height"]))
|
||||
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
continue
|
||||
|
||||
crop_hsv = img_hsv[y1:y2, x1:x2]
|
||||
crop_gray = cv2.cvtColor(img_bgr[y1:y2, x1:x2], cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# Text pixels: dark in grayscale OR saturated (colored ink)
|
||||
_, dark_mask = cv2.threshold(crop_gray, 180, 255, cv2.THRESH_BINARY_INV)
|
||||
sat_mask = (crop_hsv[:, :, 1] > sat_threshold).astype(np.uint8) * 255
|
||||
text_mask = cv2.bitwise_or(dark_mask, sat_mask)
|
||||
|
||||
text_pixels = crop_hsv[text_mask > 0]
|
||||
|
||||
if len(text_pixels) < 3:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
continue
|
||||
|
||||
mean_sat = float(np.mean(text_pixels[:, 1]))
|
||||
|
||||
if mean_sat < sat_threshold:
|
||||
wb["color"] = _COLOR_HEX["black"]
|
||||
wb["color_name"] = "black"
|
||||
else:
|
||||
mean_hue = float(np.mean(text_pixels[:, 0]))
|
||||
name = _hue_to_color_name(mean_hue)
|
||||
wb["color"] = _COLOR_HEX.get(name, _COLOR_HEX["black"])
|
||||
wb["color_name"] = name
|
||||
colored_count += 1
|
||||
|
||||
if colored_count:
|
||||
logger.info("color annotation: %d / %d words are colored",
|
||||
colored_count, len(word_boxes))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Recover colored text that OCR missed
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def recover_colored_text(
|
||||
img_bgr: np.ndarray,
|
||||
existing_words: List[Dict],
|
||||
min_area: int = 40,
|
||||
max_regions: int = 60,
|
||||
) -> List[Dict]:
|
||||
"""Find colored text regions not covered by any existing word box.
|
||||
|
||||
Returns a list of recovered word dicts with ``color``, ``color_name``,
|
||||
and ``recovered=True`` fields. The ``text`` is set via a lightweight
|
||||
shape heuristic (e.g. ``!`` for tall narrow shapes) or ``?``.
|
||||
"""
|
||||
if img_bgr is None:
|
||||
return []
|
||||
|
||||
img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
|
||||
ih, iw = img_bgr.shape[:2]
|
||||
max_area = int(ih * iw * 0.005)
|
||||
|
||||
# --- Build occupancy mask from existing words (with 4px padding) ---
|
||||
occupied = np.zeros((ih, iw), dtype=np.uint8)
|
||||
pad = 4
|
||||
for wb in existing_words:
|
||||
x1 = max(0, int(wb["left"]) - pad)
|
||||
y1 = max(0, int(wb["top"]) - pad)
|
||||
x2 = min(iw, int(wb["left"] + wb["width"]) + pad)
|
||||
y2 = min(ih, int(wb["top"] + wb["height"]) + pad)
|
||||
occupied[y1:y2, x1:x2] = 255
|
||||
|
||||
recovered: List[Dict] = []
|
||||
|
||||
for color_name, ranges in _COLOR_RANGES.items():
|
||||
# Create mask for this color
|
||||
mask = np.zeros((ih, iw), dtype=np.uint8)
|
||||
for lower, upper in ranges:
|
||||
mask = cv2.bitwise_or(mask, cv2.inRange(img_hsv, lower, upper))
|
||||
|
||||
# Remove pixels already covered by existing OCR words
|
||||
mask = cv2.bitwise_and(mask, cv2.bitwise_not(occupied))
|
||||
|
||||
# Morphological cleanup:
|
||||
# - Close with tall kernel to merge ! stroke + dot
|
||||
# - Open to remove noise specks
|
||||
kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 8))
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_close)
|
||||
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_open)
|
||||
|
||||
contours, _ = cv2.findContours(
|
||||
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE,
|
||||
)
|
||||
|
||||
candidates = []
|
||||
for cnt in contours:
|
||||
area = cv2.contourArea(cnt)
|
||||
if area < min_area or area > max_area:
|
||||
continue
|
||||
bx, by, bw, bh = cv2.boundingRect(cnt)
|
||||
if bh < 6:
|
||||
continue
|
||||
candidates.append((area, bx, by, bw, bh))
|
||||
|
||||
# Keep largest first, limited count
|
||||
candidates.sort(key=lambda c: c[0], reverse=True)
|
||||
|
||||
for area, bx, by, bw, bh in candidates[:max_regions]:
|
||||
text = _identify_shape(bw, bh)
|
||||
recovered.append({
|
||||
"text": text,
|
||||
"left": bx,
|
||||
"top": by,
|
||||
"width": bw,
|
||||
"height": bh,
|
||||
"conf": 45,
|
||||
"color": _COLOR_HEX.get(color_name, "#000000"),
|
||||
"color_name": color_name,
|
||||
"recovered": True,
|
||||
})
|
||||
|
||||
if recovered:
|
||||
logger.info(
|
||||
"color recovery: %d colored regions found (%s)",
|
||||
len(recovered),
|
||||
", ".join(
|
||||
f"{c}: {sum(1 for r in recovered if r['color_name'] == c)}"
|
||||
for c in sorted({r["color_name"] for r in recovered})
|
||||
),
|
||||
)
|
||||
|
||||
return recovered
|
||||
|
||||
|
||||
def _identify_shape(w: int, h: int) -> str:
|
||||
"""Simple shape heuristic for common single-character text markers."""
|
||||
aspect = w / h if h > 0 else 1.0
|
||||
if aspect < 0.55 and h > 10:
|
||||
# Tall, narrow — likely exclamation mark
|
||||
return "!"
|
||||
if 0.6 < aspect < 1.5 and max(w, h) < 25:
|
||||
# Small, roughly square — bullet or dot
|
||||
return "•"
|
||||
return "?"
|
||||
@@ -20,6 +20,7 @@ import numpy as np
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
|
||||
from cv_box_detect import detect_boxes, split_page_into_zones
|
||||
from cv_color_detect import detect_word_colors, recover_colored_text
|
||||
from cv_words_first import _cluster_rows, _build_cells
|
||||
from ocr_pipeline_session_store import (
|
||||
get_session_db,
|
||||
@@ -438,15 +439,30 @@ async def build_grid(session_id: str):
|
||||
|
||||
zones_data: List[Dict[str, Any]] = []
|
||||
boxes_detected = 0
|
||||
recovered_count = 0
|
||||
img_bgr = None
|
||||
|
||||
content_x, content_y, content_w, content_h = _get_content_bounds(all_words)
|
||||
|
||||
if img_png:
|
||||
# Decode image for box detection
|
||||
# Decode image for color detection + box detection
|
||||
arr = np.frombuffer(img_png, dtype=np.uint8)
|
||||
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
||||
|
||||
if img_bgr is not None:
|
||||
# --- Color detection: annotate existing words ---
|
||||
detect_word_colors(img_bgr, all_words)
|
||||
|
||||
# --- Recover colored text that OCR missed ---
|
||||
recovered = recover_colored_text(img_bgr, all_words)
|
||||
if recovered:
|
||||
recovered_count = len(recovered)
|
||||
all_words.extend(recovered)
|
||||
logger.info(
|
||||
"build-grid session %s: +%d recovered colored words",
|
||||
session_id, recovered_count,
|
||||
)
|
||||
|
||||
# Detect bordered boxes
|
||||
boxes = detect_boxes(
|
||||
img_bgr,
|
||||
@@ -529,6 +545,14 @@ async def build_grid(session_id: str):
|
||||
total_columns = sum(len(z.get("columns", [])) for z in zones_data)
|
||||
total_rows = sum(len(z.get("rows", [])) for z in zones_data)
|
||||
|
||||
# Collect color statistics from all word_boxes in cells
|
||||
color_stats: Dict[str, int] = {}
|
||||
for z in zones_data:
|
||||
for cell in z.get("cells", []):
|
||||
for wb in cell.get("word_boxes", []):
|
||||
cn = wb.get("color_name", "black")
|
||||
color_stats[cn] = color_stats.get(cn, 0) + 1
|
||||
|
||||
result = {
|
||||
"session_id": session_id,
|
||||
"image_width": img_w,
|
||||
@@ -541,6 +565,8 @@ async def build_grid(session_id: str):
|
||||
"total_rows": total_rows,
|
||||
"total_cells": total_cells,
|
||||
"total_words": len(all_words),
|
||||
"recovered_colored": recovered_count,
|
||||
"color_stats": color_stats,
|
||||
},
|
||||
"formatting": {
|
||||
"bold_columns": [],
|
||||
|
||||
Reference in New Issue
Block a user