""" File Processor Service - Dokumentenverarbeitung für BreakPilot. Shared Service für: - OCR (Optical Character Recognition) für Handschrift und gedruckten Text - PDF-Parsing und Textextraktion - Bildverarbeitung und -optimierung - DOCX/DOC Textextraktion Verwendet: - PaddleOCR für deutsche Handschrift - PyMuPDF für PDF-Verarbeitung - python-docx für DOCX-Dateien - OpenCV für Bildvorverarbeitung """ import logging import io from pathlib import Path from typing import Optional, List, Dict, Any import cv2 import numpy as np from PIL import Image from .file_processor_models import ( FileType, ProcessingMode, ProcessedRegion, ProcessingResult, ) logger = logging.getLogger(__name__) class FileProcessor: """ Zentrale Dokumentenverarbeitung für BreakPilot. Unterstützt: - Handschrifterkennung (OCR) für Klausuren - Textextraktion aus PDFs - DOCX/DOC Verarbeitung - Bildvorverarbeitung für bessere OCR-Ergebnisse """ def __init__(self, ocr_lang: str = "de", use_gpu: bool = False): self.ocr_lang = ocr_lang self.use_gpu = use_gpu self._ocr_engine = None logger.info(f"FileProcessor initialized (lang={ocr_lang}, gpu={use_gpu})") @property def ocr_engine(self): """Lazy-Loading des OCR-Engines.""" if self._ocr_engine is None: self._ocr_engine = self._init_ocr_engine() return self._ocr_engine def _init_ocr_engine(self): """Initialisiert PaddleOCR oder Fallback.""" try: from paddleocr import PaddleOCR return PaddleOCR( use_angle_cls=True, lang='german', use_gpu=self.use_gpu, show_log=False ) except ImportError: logger.warning("PaddleOCR nicht installiert - verwende Fallback") return None def detect_file_type(self, file_path: str = None, file_bytes: bytes = None) -> FileType: """Erkennt den Dateityp.""" if file_path: ext = Path(file_path).suffix.lower() if ext == ".pdf": return FileType.PDF elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".gif"]: return FileType.IMAGE elif ext == ".docx": return FileType.DOCX elif ext == ".doc": return FileType.DOC elif ext == ".txt": return FileType.TXT if file_bytes: if file_bytes[:4] == b'%PDF': return FileType.PDF elif file_bytes[:8] == b'\x89PNG\r\n\x1a\n': return FileType.IMAGE elif file_bytes[:2] in [b'\xff\xd8', b'BM']: return FileType.IMAGE elif file_bytes[:4] == b'PK\x03\x04': return FileType.DOCX return FileType.UNKNOWN def process( self, file_path: str = None, file_bytes: bytes = None, mode: ProcessingMode = ProcessingMode.MIXED ) -> ProcessingResult: """Verarbeitet ein Dokument.""" if not file_path and not file_bytes: raise ValueError("Entweder file_path oder file_bytes muss angegeben werden") file_type = self.detect_file_type(file_path, file_bytes) logger.info(f"Processing file of type: {file_type}") if file_type == FileType.PDF: return self._process_pdf(file_path, file_bytes, mode) elif file_type == FileType.IMAGE: return self._process_image(file_path, file_bytes, mode) elif file_type == FileType.DOCX: return self._process_docx(file_path, file_bytes) elif file_type == FileType.TXT: return self._process_txt(file_path, file_bytes) else: raise ValueError(f"Nicht unterstützter Dateityp: {file_type}") def _process_pdf(self, file_path=None, file_bytes=None, mode=ProcessingMode.MIXED): """Verarbeitet PDF-Dateien.""" try: import fitz except ImportError: logger.warning("PyMuPDF nicht installiert - versuche Fallback") return self._process_image(file_path, file_bytes, mode) if file_bytes: doc = fitz.open(stream=file_bytes, filetype="pdf") else: doc = fitz.open(file_path) all_text, all_regions = [], [] total_confidence, region_count = 0.0, 0 for page_num, page in enumerate(doc, start=1): page_text = page.get_text() if page_text.strip() and mode != ProcessingMode.OCR_HANDWRITING: all_text.append(page_text) all_regions.append(ProcessedRegion( text=page_text, confidence=1.0, bbox=(0, 0, int(page.rect.width), int(page.rect.height)), page=page_num )) total_confidence += 1.0 region_count += 1 else: pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) img_bytes = pix.tobytes("png") img = Image.open(io.BytesIO(img_bytes)) ocr_result = self._ocr_image(img) all_text.append(ocr_result["text"]) for region in ocr_result["regions"]: region.page = page_num all_regions.append(region) total_confidence += region.confidence region_count += 1 doc.close() avg_confidence = total_confidence / region_count if region_count > 0 else 0.0 return ProcessingResult( text="\n\n".join(all_text), confidence=avg_confidence, regions=all_regions, page_count=len(doc) if hasattr(doc, '__len__') else 1, file_type=FileType.PDF, processing_mode=mode, metadata={"source": file_path or "bytes"} ) def _process_image(self, file_path=None, file_bytes=None, mode=ProcessingMode.MIXED): """Verarbeitet Bilddateien.""" if file_bytes: img = Image.open(io.BytesIO(file_bytes)) else: img = Image.open(file_path) processed_img = self._preprocess_image(img) ocr_result = self._ocr_image(processed_img) return ProcessingResult( text=ocr_result["text"], confidence=ocr_result["confidence"], regions=ocr_result["regions"], page_count=1, file_type=FileType.IMAGE, processing_mode=mode, metadata={"source": file_path or "bytes", "image_size": img.size} ) def _process_docx(self, file_path=None, file_bytes=None): """Verarbeitet DOCX-Dateien.""" try: from docx import Document except ImportError: raise ImportError("python-docx ist nicht installiert") if file_bytes: doc = Document(io.BytesIO(file_bytes)) else: doc = Document(file_path) paragraphs = [] for para in doc.paragraphs: if para.text.strip(): paragraphs.append(para.text) for table in doc.tables: for row in table.rows: row_text = " | ".join(cell.text for cell in row.cells) if row_text.strip(): paragraphs.append(row_text) text = "\n\n".join(paragraphs) return ProcessingResult( text=text, confidence=1.0, regions=[ProcessedRegion(text=text, confidence=1.0, bbox=(0, 0, 0, 0), page=1)], page_count=1, file_type=FileType.DOCX, processing_mode=ProcessingMode.TEXT_EXTRACT, metadata={"source": file_path or "bytes"} ) def _process_txt(self, file_path=None, file_bytes=None): """Verarbeitet Textdateien.""" if file_bytes: text = file_bytes.decode('utf-8', errors='ignore') else: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text = f.read() return ProcessingResult( text=text, confidence=1.0, regions=[ProcessedRegion(text=text, confidence=1.0, bbox=(0, 0, 0, 0), page=1)], page_count=1, file_type=FileType.TXT, processing_mode=ProcessingMode.TEXT_EXTRACT, metadata={"source": file_path or "bytes"} ) def _preprocess_image(self, img: Image.Image) -> Image.Image: """Vorverarbeitung des Bildes für bessere OCR-Ergebnisse.""" cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) enhanced = clahe.apply(denoised) binary = cv2.adaptiveThreshold( enhanced, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 ) return Image.fromarray(binary) def _ocr_image(self, img: Image.Image) -> Dict[str, Any]: """Führt OCR auf einem Bild aus.""" if self.ocr_engine is None: return {"text": "[OCR nicht verfügbar - bitte PaddleOCR installieren]", "confidence": 0.0, "regions": []} img_array = np.array(img) if len(img_array.shape) == 2: img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) result = self.ocr_engine.ocr(img_array, cls=True) if not result or not result[0]: return {"text": "", "confidence": 0.0, "regions": []} all_text, all_regions = [], [] total_confidence = 0.0 for line in result[0]: bbox_points = line[0] text, confidence = line[1] x_coords = [p[0] for p in bbox_points] y_coords = [p[1] for p in bbox_points] bbox = (int(min(x_coords)), int(min(y_coords)), int(max(x_coords)), int(max(y_coords))) all_text.append(text) all_regions.append(ProcessedRegion(text=text, confidence=confidence, bbox=bbox)) total_confidence += confidence avg_confidence = total_confidence / len(all_regions) if all_regions else 0.0 return {"text": "\n".join(all_text), "confidence": avg_confidence, "regions": all_regions} def extract_handwriting_regions(self, img: Image.Image, min_area: int = 500) -> List[Dict[str, Any]]: """Erkennt und extrahiert handschriftliche Bereiche aus einem Bild.""" cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 5)) dilated = cv2.dilate(edges, kernel, iterations=2) contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) regions = [] for contour in contours: area = cv2.contourArea(contour) if area < min_area: continue x, y, w, h = cv2.boundingRect(contour) region_img = img.crop((x, y, x + w, y + h)) ocr_result = self._ocr_image(region_img) regions.append({ "bbox": (x, y, x + w, y + h), "area": area, "text": ocr_result["text"], "confidence": ocr_result["confidence"] }) regions.sort(key=lambda r: r["bbox"][1]) return regions # Singleton-Instanz _file_processor: Optional[FileProcessor] = None def get_file_processor() -> FileProcessor: """Gibt Singleton-Instanz des File Processors zurück.""" global _file_processor if _file_processor is None: _file_processor = FileProcessor() return _file_processor # Convenience functions def process_file(file_path=None, file_bytes=None, mode=ProcessingMode.MIXED) -> ProcessingResult: """Convenience function zum Verarbeiten einer Datei.""" processor = get_file_processor() return processor.process(file_path, file_bytes, mode) def extract_text_from_pdf(file_path=None, file_bytes=None) -> str: """Extrahiert Text aus einer PDF-Datei.""" result = process_file(file_path, file_bytes, ProcessingMode.TEXT_EXTRACT) return result.text def ocr_image(file_path=None, file_bytes=None) -> str: """Führt OCR auf einem Bild aus.""" result = process_file(file_path, file_bytes, ProcessingMode.OCR_PRINTED) return result.text def ocr_handwriting(file_path=None, file_bytes=None) -> str: """Führt Handschrift-OCR auf einem Bild aus.""" result = process_file(file_path, file_bytes, ProcessingMode.OCR_HANDWRITING) return result.text