From 68d230c297f093728ee6360cf0ddbda2b05724d5 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 4 Mar 2026 14:51:55 +0100 Subject: [PATCH] fix: use batch-then-stream SSE for cell-first OCR The old per-cell streaming timed out because sequential cell OCR was too slow to send the first event before proxy timeout. Now uses build_cell_grid_v2 (parallel ThreadPoolExecutor) via run_in_executor, then streams all cells at once after batch completes. Co-Authored-By: Claude Opus 4.6 --- klausur-service/backend/ocr_pipeline_api.py | 127 +++++++++++++++++++- 1 file changed, 126 insertions(+), 1 deletion(-) diff --git a/klausur-service/backend/ocr_pipeline_api.py b/klausur-service/backend/ocr_pipeline_api.py index b1636e5..bd641b6 100644 --- a/klausur-service/backend/ocr_pipeline_api.py +++ b/klausur-service/backend/ocr_pipeline_api.py @@ -1279,8 +1279,11 @@ async def detect_words( row.word_count = len(row.words) if stream: + # Cell-First OCR v2: use batch-then-stream approach instead of + # per-cell streaming. The parallel ThreadPoolExecutor in + # build_cell_grid_v2 is much faster than sequential streaming. return StreamingResponse( - _word_stream_generator( + _word_batch_stream_generator( session_id, cached, col_regions, row_geoms, dewarped_bgr, engine, pronunciation, request, ), @@ -1370,6 +1373,124 @@ async def detect_words( } +async def _word_batch_stream_generator( + session_id: str, + cached: Dict[str, Any], + col_regions: List[PageRegion], + row_geoms: List[RowGeometry], + dewarped_bgr: np.ndarray, + engine: str, + pronunciation: str, + request: Request, +): + """SSE generator that runs batch OCR (parallel) then streams results. + + Unlike the old per-cell streaming, this uses build_cell_grid_v2 with + ThreadPoolExecutor for parallel OCR, then emits all cells as SSE events. + The 'preparing' event keeps the connection alive during OCR processing. + """ + import asyncio + + t0 = time.time() + ocr_img = create_ocr_image(dewarped_bgr) + img_h, img_w = dewarped_bgr.shape[:2] + + _skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'} + n_content_rows = len([r for r in row_geoms if r.row_type == 'content']) + n_cols = len([c for c in col_regions if c.type not in _skip_types]) + col_types = {c.type for c in col_regions if c.type not in _skip_types} + is_vocab = bool(col_types & {'column_en', 'column_de'}) + total_cells = n_content_rows * n_cols + + # 1. Send meta event immediately + meta_event = { + "type": "meta", + "grid_shape": {"rows": n_content_rows, "cols": n_cols, "total_cells": total_cells}, + "layout": "vocab" if is_vocab else "generic", + } + yield f"data: {json.dumps(meta_event)}\n\n" + + # 2. Send preparing event (keepalive for proxy) + yield f"data: {json.dumps({'type': 'preparing', 'message': 'Cell-First OCR laeuft parallel...'})}\n\n" + + # 3. Run batch OCR in thread pool (CPU-bound, don't block event loop) + loop = asyncio.get_event_loop() + cells, columns_meta = await loop.run_in_executor( + None, + lambda: build_cell_grid_v2( + ocr_img, col_regions, row_geoms, img_w, img_h, + ocr_engine=engine, img_bgr=dewarped_bgr, + ), + ) + + if await request.is_disconnected(): + logger.info(f"SSE batch: client disconnected after OCR for {session_id}") + return + + # 4. Send columns meta + if columns_meta: + yield f"data: {json.dumps({'type': 'columns', 'columns_used': columns_meta})}\n\n" + + # 5. Stream all cells + for idx, cell in enumerate(cells): + cell_event = { + "type": "cell", + "cell": cell, + "progress": {"current": idx + 1, "total": len(cells)}, + } + yield f"data: {json.dumps(cell_event)}\n\n" + + # 6. Build final result and persist + duration = time.time() - t0 + used_engine = cells[0].get("ocr_engine", "tesseract") if cells else engine + + word_result = { + "cells": cells, + "grid_shape": {"rows": n_content_rows, "cols": n_cols, "total_cells": len(cells)}, + "columns_used": columns_meta, + "layout": "vocab" if is_vocab else "generic", + "image_width": img_w, + "image_height": img_h, + "duration_seconds": round(duration, 2), + "ocr_engine": used_engine, + "summary": { + "total_cells": len(cells), + "non_empty_cells": sum(1 for c in cells if c.get("text")), + "low_confidence": sum(1 for c in cells if 0 < c.get("confidence", 0) < 50), + }, + } + + vocab_entries = None + if is_vocab: + entries = _cells_to_vocab_entries(cells, columns_meta) + entries = _fix_character_confusion(entries) + entries = _fix_phonetic_brackets(entries, pronunciation=pronunciation) + word_result["vocab_entries"] = entries + word_result["entries"] = entries + word_result["entry_count"] = len(entries) + word_result["summary"]["total_entries"] = len(entries) + word_result["summary"]["with_english"] = sum(1 for e in entries if e.get("english")) + word_result["summary"]["with_german"] = sum(1 for e in entries if e.get("german")) + vocab_entries = entries + + await update_session_db(session_id, word_result=word_result, current_step=5) + cached["word_result"] = word_result + + logger.info(f"OCR Pipeline SSE batch: words session {session_id}: " + f"layout={word_result['layout']}, {len(cells)} cells ({duration:.2f}s)") + + # 7. Send complete event + complete_event = { + "type": "complete", + "summary": word_result["summary"], + "duration_seconds": round(duration, 2), + "ocr_engine": used_engine, + } + if vocab_entries is not None: + complete_event["vocab_entries"] = vocab_entries + yield f"data: {json.dumps(complete_event)}\n\n" + + async def _word_stream_generator( session_id: str, cached: Dict[str, Any], @@ -1406,9 +1527,13 @@ async def _word_stream_generator( } yield f"data: {json.dumps(meta_event)}\n\n" + # Keepalive: send preparing event so proxy doesn't timeout during OCR init + yield f"data: {json.dumps({'type': 'preparing', 'message': 'Cell-First OCR wird initialisiert...'})}\n\n" + # Stream cells one by one all_cells: List[Dict[str, Any]] = [] cell_idx = 0 + last_keepalive = time.time() for cell, cols_meta, total in build_cell_grid_v2_streaming( ocr_img, col_regions, row_geoms, img_w, img_h,