""" Inpainting Service for Worksheet Cleanup Removes handwriting from scanned worksheets using inpainting techniques. Supports multiple backends: 1. OpenCV (Telea/NS algorithms) - Fast, CPU-based baseline 2. LaMa (Large Mask Inpainting) - Optional, better quality DATENSCHUTZ: All processing happens locally on Mac Mini. """ import numpy as np from PIL import Image import io import logging from typing import Tuple, Optional from dataclasses import dataclass from enum import Enum # OpenCV is optional - only required for actual inpainting try: import cv2 CV2_AVAILABLE = True except ImportError: cv2 = None CV2_AVAILABLE = False logger = logging.getLogger(__name__) class InpaintingMethod(str, Enum): """Available inpainting methods.""" OPENCV_TELEA = "opencv_telea" # Fast, good for small regions OPENCV_NS = "opencv_ns" # Navier-Stokes, slower but smoother LAMA = "lama" # LaMa deep learning (if available) AUTO = "auto" # Automatically select best method @dataclass class InpaintingResult: """Result of inpainting operation.""" image: np.ndarray # Cleaned image (BGR) method_used: str # Which method was actually used processing_time_ms: float # Processing time in milliseconds success: bool error_message: Optional[str] = None # Global LaMa model (lazy loaded) _lama_model = None _lama_available = None def check_lama_available() -> bool: """Check if LaMa inpainting is available.""" global _lama_available if _lama_available is not None: return _lama_available try: # Try to import lama-cleaner library from lama_cleaner.model_manager import ModelManager _lama_available = True logger.info("LaMa inpainting is available") except ImportError: _lama_available = False logger.info("LaMa not available, will use OpenCV fallback") except Exception as e: _lama_available = False logger.warning(f"LaMa check failed: {e}") return _lama_available def inpaint_image( image_bytes: bytes, mask_bytes: bytes, method: InpaintingMethod = InpaintingMethod.AUTO, inpaint_radius: int = 3 ) -> InpaintingResult: """ Inpaint (remove) masked regions from an image. Args: image_bytes: Source image as bytes mask_bytes: Binary mask where white (255) = regions to remove method: Inpainting method to use inpaint_radius: Radius for OpenCV inpainting (default 3) Returns: InpaintingResult with cleaned image Raises: ImportError: If OpenCV is not available """ if not CV2_AVAILABLE: raise ImportError( "OpenCV (cv2) is required for inpainting. " "Install with: pip install opencv-python-headless" ) import time start_time = time.time() try: # Load image img = Image.open(io.BytesIO(image_bytes)) img_array = np.array(img) # Convert to BGR for OpenCV if len(img_array.shape) == 2: img_bgr = cv2.cvtColor(img_array, cv2.COLOR_GRAY2BGR) elif img_array.shape[2] == 4: img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGBA2BGR) elif img_array.shape[2] == 3: img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) else: img_bgr = img_array # Load mask mask_img = Image.open(io.BytesIO(mask_bytes)) mask_array = np.array(mask_img) # Ensure mask is single channel if len(mask_array.shape) == 3: mask_array = cv2.cvtColor(mask_array, cv2.COLOR_BGR2GRAY) # Ensure mask is binary _, mask_binary = cv2.threshold(mask_array, 127, 255, cv2.THRESH_BINARY) # Resize mask if dimensions don't match if mask_binary.shape[:2] != img_bgr.shape[:2]: mask_binary = cv2.resize( mask_binary, (img_bgr.shape[1], img_bgr.shape[0]), interpolation=cv2.INTER_NEAREST ) # Select method if method == InpaintingMethod.AUTO: # Use LaMa if available and mask is large mask_ratio = np.sum(mask_binary > 0) / mask_binary.size if check_lama_available() and mask_ratio > 0.05: method = InpaintingMethod.LAMA else: method = InpaintingMethod.OPENCV_TELEA # Perform inpainting if method == InpaintingMethod.LAMA: result_img, actual_method = _inpaint_lama(img_bgr, mask_binary) elif method == InpaintingMethod.OPENCV_NS: result_img = cv2.inpaint( img_bgr, mask_binary, inpaint_radius, cv2.INPAINT_NS ) actual_method = "opencv_ns" else: # OPENCV_TELEA (default) result_img = cv2.inpaint( img_bgr, mask_binary, inpaint_radius, cv2.INPAINT_TELEA ) actual_method = "opencv_telea" processing_time = (time.time() - start_time) * 1000 logger.info(f"Inpainting completed: method={actual_method}, " f"time={processing_time:.1f}ms") return InpaintingResult( image=result_img, method_used=actual_method, processing_time_ms=processing_time, success=True ) except Exception as e: logger.error(f"Inpainting failed: {e}") import traceback logger.error(traceback.format_exc()) return InpaintingResult( image=None, method_used="none", processing_time_ms=0, success=False, error_message=str(e) ) def _inpaint_lama( img_bgr: np.ndarray, mask: np.ndarray ) -> Tuple[np.ndarray, str]: """ Inpaint using LaMa (Large Mask Inpainting). Falls back to OpenCV if LaMa fails. """ global _lama_model try: from lama_cleaner.model_manager import ModelManager from lama_cleaner.schema import Config, HDStrategy, LDMSampler # Initialize model if needed if _lama_model is None: logger.info("Loading LaMa model...") _lama_model = ModelManager( name="lama", device="cpu", # Use CPU for Mac Mini compatibility ) logger.info("LaMa model loaded") # Prepare config config = Config( ldm_steps=25, ldm_sampler=LDMSampler.plms, hd_strategy=HDStrategy.ORIGINAL, hd_strategy_crop_margin=32, hd_strategy_crop_trigger_size=800, hd_strategy_resize_limit=800, ) # Convert BGR to RGB for LaMa img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # Run inpainting result_rgb = _lama_model(img_rgb, mask, config) # Convert back to BGR result_bgr = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR) return result_bgr, "lama" except Exception as e: logger.warning(f"LaMa inpainting failed, falling back to OpenCV: {e}") # Fallback to OpenCV result = cv2.inpaint(img_bgr, mask, 3, cv2.INPAINT_TELEA) return result, "opencv_telea_fallback" def inpaint_opencv_telea( image_bytes: bytes, mask_bytes: bytes, radius: int = 3 ) -> bytes: """ Simple OpenCV Telea inpainting - fastest option. Args: image_bytes: Source image mask_bytes: Binary mask (white = remove) radius: Inpainting radius Returns: Inpainted image as PNG bytes """ result = inpaint_image( image_bytes, mask_bytes, method=InpaintingMethod.OPENCV_TELEA, inpaint_radius=radius ) if not result.success: raise RuntimeError(f"Inpainting failed: {result.error_message}") return image_to_png(result.image) def inpaint_opencv_ns( image_bytes: bytes, mask_bytes: bytes, radius: int = 3 ) -> bytes: """ OpenCV Navier-Stokes inpainting - smoother but slower. """ result = inpaint_image( image_bytes, mask_bytes, method=InpaintingMethod.OPENCV_NS, inpaint_radius=radius ) if not result.success: raise RuntimeError(f"Inpainting failed: {result.error_message}") return image_to_png(result.image) def remove_handwriting( image_bytes: bytes, mask: Optional[np.ndarray] = None, method: InpaintingMethod = InpaintingMethod.AUTO ) -> Tuple[bytes, dict]: """ High-level function to remove handwriting from an image. If no mask is provided, detects handwriting automatically. Args: image_bytes: Source image mask: Optional pre-computed mask method: Inpainting method Returns: Tuple of (cleaned image bytes, metadata dict) """ from services.handwriting_detection import detect_handwriting, mask_to_png # Detect handwriting if no mask provided if mask is None: detection_result = detect_handwriting(image_bytes) mask = detection_result.mask detection_info = { "confidence": detection_result.confidence, "handwriting_ratio": detection_result.handwriting_ratio, "detection_method": detection_result.detection_method } else: detection_info = {"provided_mask": True} # Check if there's anything to inpaint if np.sum(mask > 0) == 0: logger.info("No handwriting detected, returning original image") return image_bytes, { "inpainting_performed": False, "reason": "no_handwriting_detected", **detection_info } # Convert mask to bytes for inpainting mask_bytes = mask_to_png(mask) # Perform inpainting result = inpaint_image(image_bytes, mask_bytes, method=method) if not result.success: raise RuntimeError(f"Inpainting failed: {result.error_message}") # Convert result to PNG result_bytes = image_to_png(result.image) metadata = { "inpainting_performed": True, "method_used": result.method_used, "processing_time_ms": result.processing_time_ms, **detection_info } return result_bytes, metadata def image_to_png(img_bgr: np.ndarray) -> bytes: """ Convert BGR image array to PNG bytes. """ img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) img_pil = Image.fromarray(img_rgb) buffer = io.BytesIO() img_pil.save(buffer, format='PNG', optimize=True) return buffer.getvalue() def dilate_mask(mask_bytes: bytes, iterations: int = 2) -> bytes: """ Dilate a mask to expand the removal region. Useful to ensure complete handwriting removal including edges. """ mask_img = Image.open(io.BytesIO(mask_bytes)) mask_array = np.array(mask_img) if len(mask_array.shape) == 3: mask_array = cv2.cvtColor(mask_array, cv2.COLOR_BGR2GRAY) kernel = np.ones((3, 3), np.uint8) dilated = cv2.dilate(mask_array, kernel, iterations=iterations) img = Image.fromarray(dilated) buffer = io.BytesIO() img.save(buffer, format='PNG') return buffer.getvalue()