fix: use batch-then-stream SSE for cell-first OCR
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 26s
CI / test-go-edu-search (push) Successful in 28s
CI / test-python-klausur (push) Failing after 1m49s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 17s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 26s
CI / test-go-edu-search (push) Successful in 28s
CI / test-python-klausur (push) Failing after 1m49s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 17s
The old per-cell streaming timed out because sequential cell OCR was too slow to send the first event before proxy timeout. Now uses build_cell_grid_v2 (parallel ThreadPoolExecutor) via run_in_executor, then streams all cells at once after batch completes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1279,8 +1279,11 @@ async def detect_words(
|
||||
row.word_count = len(row.words)
|
||||
|
||||
if stream:
|
||||
# Cell-First OCR v2: use batch-then-stream approach instead of
|
||||
# per-cell streaming. The parallel ThreadPoolExecutor in
|
||||
# build_cell_grid_v2 is much faster than sequential streaming.
|
||||
return StreamingResponse(
|
||||
_word_stream_generator(
|
||||
_word_batch_stream_generator(
|
||||
session_id, cached, col_regions, row_geoms,
|
||||
dewarped_bgr, engine, pronunciation, request,
|
||||
),
|
||||
@@ -1370,6 +1373,124 @@ async def detect_words(
|
||||
}
|
||||
|
||||
|
||||
async def _word_batch_stream_generator(
|
||||
session_id: str,
|
||||
cached: Dict[str, Any],
|
||||
col_regions: List[PageRegion],
|
||||
row_geoms: List[RowGeometry],
|
||||
dewarped_bgr: np.ndarray,
|
||||
engine: str,
|
||||
pronunciation: str,
|
||||
request: Request,
|
||||
):
|
||||
"""SSE generator that runs batch OCR (parallel) then streams results.
|
||||
|
||||
Unlike the old per-cell streaming, this uses build_cell_grid_v2 with
|
||||
ThreadPoolExecutor for parallel OCR, then emits all cells as SSE events.
|
||||
The 'preparing' event keeps the connection alive during OCR processing.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
t0 = time.time()
|
||||
ocr_img = create_ocr_image(dewarped_bgr)
|
||||
img_h, img_w = dewarped_bgr.shape[:2]
|
||||
|
||||
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'}
|
||||
n_content_rows = len([r for r in row_geoms if r.row_type == 'content'])
|
||||
n_cols = len([c for c in col_regions if c.type not in _skip_types])
|
||||
col_types = {c.type for c in col_regions if c.type not in _skip_types}
|
||||
is_vocab = bool(col_types & {'column_en', 'column_de'})
|
||||
total_cells = n_content_rows * n_cols
|
||||
|
||||
# 1. Send meta event immediately
|
||||
meta_event = {
|
||||
"type": "meta",
|
||||
"grid_shape": {"rows": n_content_rows, "cols": n_cols, "total_cells": total_cells},
|
||||
"layout": "vocab" if is_vocab else "generic",
|
||||
}
|
||||
yield f"data: {json.dumps(meta_event)}\n\n"
|
||||
|
||||
# 2. Send preparing event (keepalive for proxy)
|
||||
yield f"data: {json.dumps({'type': 'preparing', 'message': 'Cell-First OCR laeuft parallel...'})}\n\n"
|
||||
|
||||
# 3. Run batch OCR in thread pool (CPU-bound, don't block event loop)
|
||||
loop = asyncio.get_event_loop()
|
||||
cells, columns_meta = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: build_cell_grid_v2(
|
||||
ocr_img, col_regions, row_geoms, img_w, img_h,
|
||||
ocr_engine=engine, img_bgr=dewarped_bgr,
|
||||
),
|
||||
)
|
||||
|
||||
if await request.is_disconnected():
|
||||
logger.info(f"SSE batch: client disconnected after OCR for {session_id}")
|
||||
return
|
||||
|
||||
# 4. Send columns meta
|
||||
if columns_meta:
|
||||
yield f"data: {json.dumps({'type': 'columns', 'columns_used': columns_meta})}\n\n"
|
||||
|
||||
# 5. Stream all cells
|
||||
for idx, cell in enumerate(cells):
|
||||
cell_event = {
|
||||
"type": "cell",
|
||||
"cell": cell,
|
||||
"progress": {"current": idx + 1, "total": len(cells)},
|
||||
}
|
||||
yield f"data: {json.dumps(cell_event)}\n\n"
|
||||
|
||||
# 6. Build final result and persist
|
||||
duration = time.time() - t0
|
||||
used_engine = cells[0].get("ocr_engine", "tesseract") if cells else engine
|
||||
|
||||
word_result = {
|
||||
"cells": cells,
|
||||
"grid_shape": {"rows": n_content_rows, "cols": n_cols, "total_cells": len(cells)},
|
||||
"columns_used": columns_meta,
|
||||
"layout": "vocab" if is_vocab else "generic",
|
||||
"image_width": img_w,
|
||||
"image_height": img_h,
|
||||
"duration_seconds": round(duration, 2),
|
||||
"ocr_engine": used_engine,
|
||||
"summary": {
|
||||
"total_cells": len(cells),
|
||||
"non_empty_cells": sum(1 for c in cells if c.get("text")),
|
||||
"low_confidence": sum(1 for c in cells if 0 < c.get("confidence", 0) < 50),
|
||||
},
|
||||
}
|
||||
|
||||
vocab_entries = None
|
||||
if is_vocab:
|
||||
entries = _cells_to_vocab_entries(cells, columns_meta)
|
||||
entries = _fix_character_confusion(entries)
|
||||
entries = _fix_phonetic_brackets(entries, pronunciation=pronunciation)
|
||||
word_result["vocab_entries"] = entries
|
||||
word_result["entries"] = entries
|
||||
word_result["entry_count"] = len(entries)
|
||||
word_result["summary"]["total_entries"] = len(entries)
|
||||
word_result["summary"]["with_english"] = sum(1 for e in entries if e.get("english"))
|
||||
word_result["summary"]["with_german"] = sum(1 for e in entries if e.get("german"))
|
||||
vocab_entries = entries
|
||||
|
||||
await update_session_db(session_id, word_result=word_result, current_step=5)
|
||||
cached["word_result"] = word_result
|
||||
|
||||
logger.info(f"OCR Pipeline SSE batch: words session {session_id}: "
|
||||
f"layout={word_result['layout']}, {len(cells)} cells ({duration:.2f}s)")
|
||||
|
||||
# 7. Send complete event
|
||||
complete_event = {
|
||||
"type": "complete",
|
||||
"summary": word_result["summary"],
|
||||
"duration_seconds": round(duration, 2),
|
||||
"ocr_engine": used_engine,
|
||||
}
|
||||
if vocab_entries is not None:
|
||||
complete_event["vocab_entries"] = vocab_entries
|
||||
yield f"data: {json.dumps(complete_event)}\n\n"
|
||||
|
||||
|
||||
async def _word_stream_generator(
|
||||
session_id: str,
|
||||
cached: Dict[str, Any],
|
||||
@@ -1406,9 +1527,13 @@ async def _word_stream_generator(
|
||||
}
|
||||
yield f"data: {json.dumps(meta_event)}\n\n"
|
||||
|
||||
# Keepalive: send preparing event so proxy doesn't timeout during OCR init
|
||||
yield f"data: {json.dumps({'type': 'preparing', 'message': 'Cell-First OCR wird initialisiert...'})}\n\n"
|
||||
|
||||
# Stream cells one by one
|
||||
all_cells: List[Dict[str, Any]] = []
|
||||
cell_idx = 0
|
||||
last_keepalive = time.time()
|
||||
|
||||
for cell, cols_meta, total in build_cell_grid_v2_streaming(
|
||||
ocr_img, col_regions, row_geoms, img_w, img_h,
|
||||
|
||||
Reference in New Issue
Block a user