From 2bd63ec40210bd8f960c7e3d95a6da51fb4419e2 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 15 Mar 2026 00:50:09 +0100 Subject: [PATCH] feat: add color detection for OCR word boxes New cv_color_detect.py module: - detect_word_colors(): annotates existing words with text color (HSV analysis) - recover_colored_text(): finds colored text regions missed by standard OCR (e.g. red ! markers) using HSV masks + contour detection Integrated into build-grid: words get color/color_name fields, recovered colored regions are merged into the word list before grid building. Co-Authored-By: Claude Opus 4.6 --- klausur-service/backend/cv_color_detect.py | 253 +++++++++++++++++++++ klausur-service/backend/grid_editor_api.py | 28 ++- 2 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 klausur-service/backend/cv_color_detect.py diff --git a/klausur-service/backend/cv_color_detect.py b/klausur-service/backend/cv_color_detect.py new file mode 100644 index 0000000..6b0143f --- /dev/null +++ b/klausur-service/backend/cv_color_detect.py @@ -0,0 +1,253 @@ +""" +Color detection for OCR word boxes. + +Detects the text color of existing OCR words and recovers colored text +regions (e.g. red markers, blue headings) that standard OCR may have missed. + +Standard OCR (Tesseract, PaddleOCR) binarises images before processing, +destroying all color information. This module adds it back by sampling +HSV pixel values at word-box positions and finding colored regions that +no word-box covers. + +Lizenz: Apache 2.0 (kommerziell nutzbar) +DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. +""" + +import logging +from typing import Any, Dict, List, Optional, Tuple + +import cv2 +import numpy as np + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# HSV color ranges (OpenCV: H 0-180, S 0-255, V 0-255) +# --------------------------------------------------------------------------- + +_COLOR_RANGES: Dict[str, List[Tuple[np.ndarray, np.ndarray]]] = { + "red": [ + (np.array([0, 70, 50]), np.array([10, 255, 255])), + (np.array([170, 70, 50]), np.array([180, 255, 255])), + ], + "orange": [ + (np.array([10, 70, 50]), np.array([25, 255, 255])), + ], + "yellow": [ + (np.array([25, 70, 50]), np.array([35, 255, 255])), + ], + "green": [ + (np.array([35, 70, 50]), np.array([85, 255, 255])), + ], + "blue": [ + (np.array([100, 70, 50]), np.array([130, 255, 255])), + ], + "purple": [ + (np.array([130, 70, 50]), np.array([170, 255, 255])), + ], +} + +_COLOR_HEX: Dict[str, str] = { + "black": "#000000", + "gray": "#6b7280", + "red": "#dc2626", + "orange": "#ea580c", + "yellow": "#ca8a04", + "green": "#16a34a", + "blue": "#2563eb", + "purple": "#9333ea", +} + + +def _hue_to_color_name(hue: float) -> str: + """Map OpenCV hue (0-180) to a color name.""" + if hue < 10 or hue > 170: + return "red" + if hue < 25: + return "orange" + if hue < 35: + return "yellow" + if hue < 85: + return "green" + if hue < 130: + return "blue" + return "purple" + + +# --------------------------------------------------------------------------- +# 1. Color annotation for existing word boxes +# --------------------------------------------------------------------------- + +def detect_word_colors( + img_bgr: np.ndarray, + word_boxes: List[Dict], + sat_threshold: int = 50, +) -> None: + """Annotate each word_box in-place with its detected text color. + + Adds ``color`` (hex string) and ``color_name`` (e.g. 'red', 'black') + keys to each dict. + + Algorithm per word: + 1. Crop the word region from the image. + 2. Build a text-pixel mask (dark pixels OR high-saturation pixels). + 3. Sample HSV values at mask positions. + 4. If mean saturation ≥ threshold → classify hue; else → black. + """ + if img_bgr is None or not word_boxes: + return + + img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) + img_h, img_w = img_bgr.shape[:2] + + colored_count = 0 + + for wb in word_boxes: + x1 = max(0, int(wb["left"])) + y1 = max(0, int(wb["top"])) + x2 = min(img_w, int(wb["left"] + wb["width"])) + y2 = min(img_h, int(wb["top"] + wb["height"])) + + if x2 <= x1 or y2 <= y1: + wb["color"] = _COLOR_HEX["black"] + wb["color_name"] = "black" + continue + + crop_hsv = img_hsv[y1:y2, x1:x2] + crop_gray = cv2.cvtColor(img_bgr[y1:y2, x1:x2], cv2.COLOR_BGR2GRAY) + + # Text pixels: dark in grayscale OR saturated (colored ink) + _, dark_mask = cv2.threshold(crop_gray, 180, 255, cv2.THRESH_BINARY_INV) + sat_mask = (crop_hsv[:, :, 1] > sat_threshold).astype(np.uint8) * 255 + text_mask = cv2.bitwise_or(dark_mask, sat_mask) + + text_pixels = crop_hsv[text_mask > 0] + + if len(text_pixels) < 3: + wb["color"] = _COLOR_HEX["black"] + wb["color_name"] = "black" + continue + + mean_sat = float(np.mean(text_pixels[:, 1])) + + if mean_sat < sat_threshold: + wb["color"] = _COLOR_HEX["black"] + wb["color_name"] = "black" + else: + mean_hue = float(np.mean(text_pixels[:, 0])) + name = _hue_to_color_name(mean_hue) + wb["color"] = _COLOR_HEX.get(name, _COLOR_HEX["black"]) + wb["color_name"] = name + colored_count += 1 + + if colored_count: + logger.info("color annotation: %d / %d words are colored", + colored_count, len(word_boxes)) + + +# --------------------------------------------------------------------------- +# 2. Recover colored text that OCR missed +# --------------------------------------------------------------------------- + +def recover_colored_text( + img_bgr: np.ndarray, + existing_words: List[Dict], + min_area: int = 40, + max_regions: int = 60, +) -> List[Dict]: + """Find colored text regions not covered by any existing word box. + + Returns a list of recovered word dicts with ``color``, ``color_name``, + and ``recovered=True`` fields. The ``text`` is set via a lightweight + shape heuristic (e.g. ``!`` for tall narrow shapes) or ``?``. + """ + if img_bgr is None: + return [] + + img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) + ih, iw = img_bgr.shape[:2] + max_area = int(ih * iw * 0.005) + + # --- Build occupancy mask from existing words (with 4px padding) --- + occupied = np.zeros((ih, iw), dtype=np.uint8) + pad = 4 + for wb in existing_words: + x1 = max(0, int(wb["left"]) - pad) + y1 = max(0, int(wb["top"]) - pad) + x2 = min(iw, int(wb["left"] + wb["width"]) + pad) + y2 = min(ih, int(wb["top"] + wb["height"]) + pad) + occupied[y1:y2, x1:x2] = 255 + + recovered: List[Dict] = [] + + for color_name, ranges in _COLOR_RANGES.items(): + # Create mask for this color + mask = np.zeros((ih, iw), dtype=np.uint8) + for lower, upper in ranges: + mask = cv2.bitwise_or(mask, cv2.inRange(img_hsv, lower, upper)) + + # Remove pixels already covered by existing OCR words + mask = cv2.bitwise_and(mask, cv2.bitwise_not(occupied)) + + # Morphological cleanup: + # - Close with tall kernel to merge ! stroke + dot + # - Open to remove noise specks + kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 8)) + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_close) + kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) + mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_open) + + contours, _ = cv2.findContours( + mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE, + ) + + candidates = [] + for cnt in contours: + area = cv2.contourArea(cnt) + if area < min_area or area > max_area: + continue + bx, by, bw, bh = cv2.boundingRect(cnt) + if bh < 6: + continue + candidates.append((area, bx, by, bw, bh)) + + # Keep largest first, limited count + candidates.sort(key=lambda c: c[0], reverse=True) + + for area, bx, by, bw, bh in candidates[:max_regions]: + text = _identify_shape(bw, bh) + recovered.append({ + "text": text, + "left": bx, + "top": by, + "width": bw, + "height": bh, + "conf": 45, + "color": _COLOR_HEX.get(color_name, "#000000"), + "color_name": color_name, + "recovered": True, + }) + + if recovered: + logger.info( + "color recovery: %d colored regions found (%s)", + len(recovered), + ", ".join( + f"{c}: {sum(1 for r in recovered if r['color_name'] == c)}" + for c in sorted({r["color_name"] for r in recovered}) + ), + ) + + return recovered + + +def _identify_shape(w: int, h: int) -> str: + """Simple shape heuristic for common single-character text markers.""" + aspect = w / h if h > 0 else 1.0 + if aspect < 0.55 and h > 10: + # Tall, narrow — likely exclamation mark + return "!" + if 0.6 < aspect < 1.5 and max(w, h) < 25: + # Small, roughly square — bullet or dot + return "•" + return "?" diff --git a/klausur-service/backend/grid_editor_api.py b/klausur-service/backend/grid_editor_api.py index 40e1daf..2064d44 100644 --- a/klausur-service/backend/grid_editor_api.py +++ b/klausur-service/backend/grid_editor_api.py @@ -20,6 +20,7 @@ import numpy as np from fastapi import APIRouter, HTTPException, Request from cv_box_detect import detect_boxes, split_page_into_zones +from cv_color_detect import detect_word_colors, recover_colored_text from cv_words_first import _cluster_rows, _build_cells from ocr_pipeline_session_store import ( get_session_db, @@ -438,15 +439,30 @@ async def build_grid(session_id: str): zones_data: List[Dict[str, Any]] = [] boxes_detected = 0 + recovered_count = 0 + img_bgr = None content_x, content_y, content_w, content_h = _get_content_bounds(all_words) if img_png: - # Decode image for box detection + # Decode image for color detection + box detection arr = np.frombuffer(img_png, dtype=np.uint8) img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img_bgr is not None: + # --- Color detection: annotate existing words --- + detect_word_colors(img_bgr, all_words) + + # --- Recover colored text that OCR missed --- + recovered = recover_colored_text(img_bgr, all_words) + if recovered: + recovered_count = len(recovered) + all_words.extend(recovered) + logger.info( + "build-grid session %s: +%d recovered colored words", + session_id, recovered_count, + ) + # Detect bordered boxes boxes = detect_boxes( img_bgr, @@ -529,6 +545,14 @@ async def build_grid(session_id: str): total_columns = sum(len(z.get("columns", [])) for z in zones_data) total_rows = sum(len(z.get("rows", [])) for z in zones_data) + # Collect color statistics from all word_boxes in cells + color_stats: Dict[str, int] = {} + for z in zones_data: + for cell in z.get("cells", []): + for wb in cell.get("word_boxes", []): + cn = wb.get("color_name", "black") + color_stats[cn] = color_stats.get(cn, 0) + 1 + result = { "session_id": session_id, "image_width": img_w, @@ -541,6 +565,8 @@ async def build_grid(session_id: str): "total_rows": total_rows, "total_cells": total_cells, "total_words": len(all_words), + "recovered_colored": recovered_count, + "color_stats": color_stats, }, "formatting": { "bold_columns": [],