Files
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

564 lines
17 KiB
Python

"""
File Processor Service - Dokumentenverarbeitung für BreakPilot.
Shared Service für:
- OCR (Optical Character Recognition) für Handschrift und gedruckten Text
- PDF-Parsing und Textextraktion
- Bildverarbeitung und -optimierung
- DOCX/DOC Textextraktion
Verwendet:
- PaddleOCR für deutsche Handschrift
- PyMuPDF für PDF-Verarbeitung
- python-docx für DOCX-Dateien
- OpenCV für Bildvorverarbeitung
"""
import logging
import os
import io
import base64
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple, Union
from dataclasses import dataclass
from enum import Enum
import cv2
import numpy as np
from PIL import Image
logger = logging.getLogger(__name__)
class FileType(str, Enum):
"""Unterstützte Dateitypen."""
PDF = "pdf"
IMAGE = "image"
DOCX = "docx"
DOC = "doc"
TXT = "txt"
UNKNOWN = "unknown"
class ProcessingMode(str, Enum):
"""Verarbeitungsmodi."""
OCR_HANDWRITING = "ocr_handwriting" # Handschrifterkennung
OCR_PRINTED = "ocr_printed" # Gedruckter Text
TEXT_EXTRACT = "text_extract" # Textextraktion (PDF/DOCX)
MIXED = "mixed" # Kombiniert OCR + Textextraktion
@dataclass
class ProcessedRegion:
"""Ein erkannter Textbereich."""
text: str
confidence: float
bbox: Tuple[int, int, int, int] # x1, y1, x2, y2
page: int = 1
@dataclass
class ProcessingResult:
"""Ergebnis der Dokumentenverarbeitung."""
text: str
confidence: float
regions: List[ProcessedRegion]
page_count: int
file_type: FileType
processing_mode: ProcessingMode
metadata: Dict[str, Any]
class FileProcessor:
"""
Zentrale Dokumentenverarbeitung für BreakPilot.
Unterstützt:
- Handschrifterkennung (OCR) für Klausuren
- Textextraktion aus PDFs
- DOCX/DOC Verarbeitung
- Bildvorverarbeitung für bessere OCR-Ergebnisse
"""
def __init__(self, ocr_lang: str = "de", use_gpu: bool = False):
"""
Initialisiert den File Processor.
Args:
ocr_lang: Sprache für OCR (default: "de" für Deutsch)
use_gpu: GPU für OCR nutzen (beschleunigt Verarbeitung)
"""
self.ocr_lang = ocr_lang
self.use_gpu = use_gpu
self._ocr_engine = None
logger.info(f"FileProcessor initialized (lang={ocr_lang}, gpu={use_gpu})")
@property
def ocr_engine(self):
"""Lazy-Loading des OCR-Engines."""
if self._ocr_engine is None:
self._ocr_engine = self._init_ocr_engine()
return self._ocr_engine
def _init_ocr_engine(self):
"""Initialisiert PaddleOCR oder Fallback."""
try:
from paddleocr import PaddleOCR
return PaddleOCR(
use_angle_cls=True,
lang='german', # Deutsch
use_gpu=self.use_gpu,
show_log=False
)
except ImportError:
logger.warning("PaddleOCR nicht installiert - verwende Fallback")
return None
def detect_file_type(self, file_path: str = None, file_bytes: bytes = None) -> FileType:
"""
Erkennt den Dateityp.
Args:
file_path: Pfad zur Datei
file_bytes: Dateiinhalt als Bytes
Returns:
FileType enum
"""
if file_path:
ext = Path(file_path).suffix.lower()
if ext == ".pdf":
return FileType.PDF
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".gif"]:
return FileType.IMAGE
elif ext == ".docx":
return FileType.DOCX
elif ext == ".doc":
return FileType.DOC
elif ext == ".txt":
return FileType.TXT
if file_bytes:
# Magic number detection
if file_bytes[:4] == b'%PDF':
return FileType.PDF
elif file_bytes[:8] == b'\x89PNG\r\n\x1a\n':
return FileType.IMAGE
elif file_bytes[:2] in [b'\xff\xd8', b'BM']: # JPEG, BMP
return FileType.IMAGE
elif file_bytes[:4] == b'PK\x03\x04': # ZIP (DOCX)
return FileType.DOCX
return FileType.UNKNOWN
def process(
self,
file_path: str = None,
file_bytes: bytes = None,
mode: ProcessingMode = ProcessingMode.MIXED
) -> ProcessingResult:
"""
Verarbeitet ein Dokument.
Args:
file_path: Pfad zur Datei
file_bytes: Dateiinhalt als Bytes
mode: Verarbeitungsmodus
Returns:
ProcessingResult mit extrahiertem Text und Metadaten
"""
if not file_path and not file_bytes:
raise ValueError("Entweder file_path oder file_bytes muss angegeben werden")
file_type = self.detect_file_type(file_path, file_bytes)
logger.info(f"Processing file of type: {file_type}")
if file_type == FileType.PDF:
return self._process_pdf(file_path, file_bytes, mode)
elif file_type == FileType.IMAGE:
return self._process_image(file_path, file_bytes, mode)
elif file_type == FileType.DOCX:
return self._process_docx(file_path, file_bytes)
elif file_type == FileType.TXT:
return self._process_txt(file_path, file_bytes)
else:
raise ValueError(f"Nicht unterstützter Dateityp: {file_type}")
def _process_pdf(
self,
file_path: str = None,
file_bytes: bytes = None,
mode: ProcessingMode = ProcessingMode.MIXED
) -> ProcessingResult:
"""Verarbeitet PDF-Dateien."""
try:
import fitz # PyMuPDF
except ImportError:
logger.warning("PyMuPDF nicht installiert - versuche Fallback")
# Fallback: PDF als Bild behandeln
return self._process_image(file_path, file_bytes, mode)
if file_bytes:
doc = fitz.open(stream=file_bytes, filetype="pdf")
else:
doc = fitz.open(file_path)
all_text = []
all_regions = []
total_confidence = 0.0
region_count = 0
for page_num, page in enumerate(doc, start=1):
# Erst versuchen Text direkt zu extrahieren
page_text = page.get_text()
if page_text.strip() and mode != ProcessingMode.OCR_HANDWRITING:
# PDF enthält Text (nicht nur Bilder)
all_text.append(page_text)
all_regions.append(ProcessedRegion(
text=page_text,
confidence=1.0,
bbox=(0, 0, int(page.rect.width), int(page.rect.height)),
page=page_num
))
total_confidence += 1.0
region_count += 1
else:
# Seite als Bild rendern und OCR anwenden
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2x Auflösung
img_bytes = pix.tobytes("png")
img = Image.open(io.BytesIO(img_bytes))
ocr_result = self._ocr_image(img)
all_text.append(ocr_result["text"])
for region in ocr_result["regions"]:
region.page = page_num
all_regions.append(region)
total_confidence += region.confidence
region_count += 1
doc.close()
avg_confidence = total_confidence / region_count if region_count > 0 else 0.0
return ProcessingResult(
text="\n\n".join(all_text),
confidence=avg_confidence,
regions=all_regions,
page_count=len(doc) if hasattr(doc, '__len__') else 1,
file_type=FileType.PDF,
processing_mode=mode,
metadata={"source": file_path or "bytes"}
)
def _process_image(
self,
file_path: str = None,
file_bytes: bytes = None,
mode: ProcessingMode = ProcessingMode.MIXED
) -> ProcessingResult:
"""Verarbeitet Bilddateien."""
if file_bytes:
img = Image.open(io.BytesIO(file_bytes))
else:
img = Image.open(file_path)
# Bildvorverarbeitung
processed_img = self._preprocess_image(img)
# OCR
ocr_result = self._ocr_image(processed_img)
return ProcessingResult(
text=ocr_result["text"],
confidence=ocr_result["confidence"],
regions=ocr_result["regions"],
page_count=1,
file_type=FileType.IMAGE,
processing_mode=mode,
metadata={
"source": file_path or "bytes",
"image_size": img.size
}
)
def _process_docx(
self,
file_path: str = None,
file_bytes: bytes = None
) -> ProcessingResult:
"""Verarbeitet DOCX-Dateien."""
try:
from docx import Document
except ImportError:
raise ImportError("python-docx ist nicht installiert")
if file_bytes:
doc = Document(io.BytesIO(file_bytes))
else:
doc = Document(file_path)
paragraphs = []
for para in doc.paragraphs:
if para.text.strip():
paragraphs.append(para.text)
# Auch Tabellen extrahieren
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text for cell in row.cells)
if row_text.strip():
paragraphs.append(row_text)
text = "\n\n".join(paragraphs)
return ProcessingResult(
text=text,
confidence=1.0, # Direkte Textextraktion
regions=[ProcessedRegion(
text=text,
confidence=1.0,
bbox=(0, 0, 0, 0),
page=1
)],
page_count=1,
file_type=FileType.DOCX,
processing_mode=ProcessingMode.TEXT_EXTRACT,
metadata={"source": file_path or "bytes"}
)
def _process_txt(
self,
file_path: str = None,
file_bytes: bytes = None
) -> ProcessingResult:
"""Verarbeitet Textdateien."""
if file_bytes:
text = file_bytes.decode('utf-8', errors='ignore')
else:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
return ProcessingResult(
text=text,
confidence=1.0,
regions=[ProcessedRegion(
text=text,
confidence=1.0,
bbox=(0, 0, 0, 0),
page=1
)],
page_count=1,
file_type=FileType.TXT,
processing_mode=ProcessingMode.TEXT_EXTRACT,
metadata={"source": file_path or "bytes"}
)
def _preprocess_image(self, img: Image.Image) -> Image.Image:
"""
Vorverarbeitung des Bildes für bessere OCR-Ergebnisse.
- Konvertierung zu Graustufen
- Kontrastverstärkung
- Rauschunterdrückung
- Binarisierung
"""
# PIL zu OpenCV
cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
# Zu Graustufen konvertieren
gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY)
# Rauschunterdrückung
denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21)
# Kontrastverstärkung (CLAHE)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(denoised)
# Adaptive Binarisierung
binary = cv2.adaptiveThreshold(
enhanced,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
11,
2
)
# Zurück zu PIL
return Image.fromarray(binary)
def _ocr_image(self, img: Image.Image) -> Dict[str, Any]:
"""
Führt OCR auf einem Bild aus.
Returns:
Dict mit text, confidence und regions
"""
if self.ocr_engine is None:
# Fallback wenn kein OCR-Engine verfügbar
return {
"text": "[OCR nicht verfügbar - bitte PaddleOCR installieren]",
"confidence": 0.0,
"regions": []
}
# PIL zu numpy array
img_array = np.array(img)
# Wenn Graustufen, zu RGB konvertieren (PaddleOCR erwartet RGB)
if len(img_array.shape) == 2:
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
# OCR ausführen
result = self.ocr_engine.ocr(img_array, cls=True)
if not result or not result[0]:
return {"text": "", "confidence": 0.0, "regions": []}
all_text = []
all_regions = []
total_confidence = 0.0
for line in result[0]:
bbox_points = line[0] # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
text, confidence = line[1]
# Bounding Box zu x1, y1, x2, y2 konvertieren
x_coords = [p[0] for p in bbox_points]
y_coords = [p[1] for p in bbox_points]
bbox = (
int(min(x_coords)),
int(min(y_coords)),
int(max(x_coords)),
int(max(y_coords))
)
all_text.append(text)
all_regions.append(ProcessedRegion(
text=text,
confidence=confidence,
bbox=bbox
))
total_confidence += confidence
avg_confidence = total_confidence / len(all_regions) if all_regions else 0.0
return {
"text": "\n".join(all_text),
"confidence": avg_confidence,
"regions": all_regions
}
def extract_handwriting_regions(
self,
img: Image.Image,
min_area: int = 500
) -> List[Dict[str, Any]]:
"""
Erkennt und extrahiert handschriftliche Bereiche aus einem Bild.
Nützlich für Klausuren mit gedruckten Fragen und handschriftlichen Antworten.
Args:
img: Eingabebild
min_area: Minimale Fläche für erkannte Regionen
Returns:
Liste von Regionen mit Koordinaten und erkanntem Text
"""
# Bildvorverarbeitung
cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY)
# Kanten erkennen
edges = cv2.Canny(gray, 50, 150)
# Morphologische Operationen zum Verbinden
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 5))
dilated = cv2.dilate(edges, kernel, iterations=2)
# Konturen finden
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
regions = []
for contour in contours:
area = cv2.contourArea(contour)
if area < min_area:
continue
x, y, w, h = cv2.boundingRect(contour)
# Region ausschneiden
region_img = img.crop((x, y, x + w, y + h))
# OCR auf Region anwenden
ocr_result = self._ocr_image(region_img)
regions.append({
"bbox": (x, y, x + w, y + h),
"area": area,
"text": ocr_result["text"],
"confidence": ocr_result["confidence"]
})
# Nach Y-Position sortieren (oben nach unten)
regions.sort(key=lambda r: r["bbox"][1])
return regions
# Singleton-Instanz
_file_processor: Optional[FileProcessor] = None
def get_file_processor() -> FileProcessor:
"""Gibt Singleton-Instanz des File Processors zurück."""
global _file_processor
if _file_processor is None:
_file_processor = FileProcessor()
return _file_processor
# Convenience functions
def process_file(
file_path: str = None,
file_bytes: bytes = None,
mode: ProcessingMode = ProcessingMode.MIXED
) -> ProcessingResult:
"""
Convenience function zum Verarbeiten einer Datei.
Args:
file_path: Pfad zur Datei
file_bytes: Dateiinhalt als Bytes
mode: Verarbeitungsmodus
Returns:
ProcessingResult
"""
processor = get_file_processor()
return processor.process(file_path, file_bytes, mode)
def extract_text_from_pdf(file_path: str = None, file_bytes: bytes = None) -> str:
"""Extrahiert Text aus einer PDF-Datei."""
result = process_file(file_path, file_bytes, ProcessingMode.TEXT_EXTRACT)
return result.text
def ocr_image(file_path: str = None, file_bytes: bytes = None) -> str:
"""Führt OCR auf einem Bild aus."""
result = process_file(file_path, file_bytes, ProcessingMode.OCR_PRINTED)
return result.text
def ocr_handwriting(file_path: str = None, file_bytes: bytes = None) -> str:
"""Führt Handschrift-OCR auf einem Bild aus."""
result = process_file(file_path, file_bytes, ProcessingMode.OCR_HANDWRITING)
return result.text