feat(ocr-pipeline): add SSE streaming and phonetic filter to LLM review

- Stream LLM review results batch-by-batch (8 entries per batch) via SSE
- Frontend shows live progress bar, batch log, and corrections appearing
- Skip entries with IPA phonetic transcriptions (already dictionary-corrected)
- Refactor llm_review_entries into reusable helpers for both streaming and non-streaming paths

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-02 11:46:06 +01:00
parent e171a736e7
commit 2a493890b6
3 changed files with 441 additions and 193 deletions

View File

@@ -4318,25 +4318,30 @@ import re as _re
_OLLAMA_URL = os.getenv("OLLAMA_URL", os.getenv("OLLAMA_BASE_URL", "http://host.docker.internal:11434"))
OLLAMA_REVIEW_MODEL = os.getenv("OLLAMA_REVIEW_MODEL", "qwen3:30b-a3b")
# Regex: entry contains IPA phonetic brackets like "dance [dɑːns]"
_HAS_PHONETIC_RE = _re.compile(r'\[.*?[ˈˌːʃʒθðŋɑɒɔəɜɪʊʌæ].*?\]')
async def llm_review_entries(
entries: List[Dict],
model: str = None,
) -> Dict:
"""Send vocab entries to a local LLM for OCR error correction."""
model = model or OLLAMA_REVIEW_MODEL
# Build a compact table representation for the prompt
table_lines = []
for e in entries:
table_lines.append({
"row": e.get("row_index", 0),
"en": e.get("english", ""),
"de": e.get("german", ""),
"ex": e.get("example", ""),
})
def _entry_needs_review(entry: Dict) -> bool:
"""Check if an entry should be sent to the LLM for review.
prompt = f"""Du bist ein Korrekturleser fuer OCR-erkannte Vokabeltabellen (Englisch-Deutsch).
Skip entries that are empty or contain IPA phonetic transcriptions
(those were already corrected by the word dictionary lookup).
"""
en = entry.get("english", "") or ""
de = entry.get("german", "") or ""
# Skip completely empty entries
if not en.strip() and not de.strip():
return False
# Skip entries with phonetic/IPA brackets — these are dictionary-corrected
if _HAS_PHONETIC_RE.search(en):
return False
return True
def _build_llm_prompt(table_lines: List[Dict]) -> str:
"""Build the LLM correction prompt for a batch of entries."""
return f"""Du bist ein Korrekturleser fuer OCR-erkannte Vokabeltabellen (Englisch-Deutsch).
Die Tabelle wurde per OCR aus einem Schulbuch-Scan extrahiert. Korrigiere NUR offensichtliche OCR-Fehler.
Haeufige OCR-Fehler die du korrigieren sollst:
@@ -4359,28 +4364,12 @@ Fuer unveraenderte Eintraege setze "corrected": false.
Eingabe:
{_json.dumps(table_lines, ensure_ascii=False, indent=2)}"""
t0 = time.time()
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{_OLLAMA_URL}/api/chat",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 8192},
},
)
resp.raise_for_status()
content = resp.json().get("message", {}).get("content", "")
duration_ms = int((time.time() - t0) * 1000)
# Parse LLM response — extract JSON array
corrected = _parse_llm_json_array(content)
# Build diff: compare original vs corrected
def _diff_batch(originals: List[Dict], corrected: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
"""Compare original entries with LLM-corrected ones, return (changes, corrected_entries)."""
changes = []
entries_corrected = []
for i, orig in enumerate(entries):
entries_out = []
for i, orig in enumerate(originals):
if i < len(corrected):
c = corrected[i]
entry = dict(orig)
@@ -4396,19 +4385,171 @@ Eingabe:
})
entry[field_name] = new_val
entry["llm_corrected"] = True
entries_corrected.append(entry)
entries_out.append(entry)
else:
entries_corrected.append(dict(orig))
entries_out.append(dict(orig))
return changes, entries_out
async def llm_review_entries(
entries: List[Dict],
model: str = None,
) -> Dict:
"""Send vocab entries to a local LLM for OCR error correction (single batch)."""
model = model or OLLAMA_REVIEW_MODEL
# Filter: only entries that need review
reviewable = [(i, e) for i, e in enumerate(entries) if _entry_needs_review(e)]
if not reviewable:
return {
"entries_original": entries,
"entries_corrected": [dict(e) for e in entries],
"changes": [],
"skipped_count": len(entries),
"model_used": model,
"duration_ms": 0,
}
review_entries = [e for _, e in reviewable]
table_lines = [
{"row": e.get("row_index", 0), "en": e.get("english", ""), "de": e.get("german", ""), "ex": e.get("example", "")}
for e in review_entries
]
prompt = _build_llm_prompt(table_lines)
t0 = time.time()
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{_OLLAMA_URL}/api/chat",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 8192},
},
)
resp.raise_for_status()
content = resp.json().get("message", {}).get("content", "")
duration_ms = int((time.time() - t0) * 1000)
corrected = _parse_llm_json_array(content)
changes, corrected_entries = _diff_batch(review_entries, corrected)
# Merge corrected entries back into the full list
all_corrected = [dict(e) for e in entries]
for batch_idx, (orig_idx, _) in enumerate(reviewable):
if batch_idx < len(corrected_entries):
all_corrected[orig_idx] = corrected_entries[batch_idx]
return {
"entries_original": entries,
"entries_corrected": entries_corrected,
"entries_corrected": all_corrected,
"changes": changes,
"skipped_count": len(entries) - len(reviewable),
"model_used": model,
"duration_ms": duration_ms,
}
async def llm_review_entries_streaming(
entries: List[Dict],
model: str = None,
batch_size: int = 8,
):
"""Async generator: yield SSE events while reviewing entries in batches."""
model = model or OLLAMA_REVIEW_MODEL
# Separate reviewable from skipped entries
reviewable = []
skipped_indices = []
for i, e in enumerate(entries):
if _entry_needs_review(e):
reviewable.append((i, e))
else:
skipped_indices.append(i)
total_to_review = len(reviewable)
# meta event
yield {
"type": "meta",
"total_entries": len(entries),
"to_review": total_to_review,
"skipped": len(skipped_indices),
"model": model,
"batch_size": batch_size,
}
all_changes = []
all_corrected = [dict(e) for e in entries]
total_duration_ms = 0
reviewed_count = 0
# Process in batches
for batch_start in range(0, total_to_review, batch_size):
batch_items = reviewable[batch_start:batch_start + batch_size]
batch_entries = [e for _, e in batch_items]
table_lines = [
{"row": e.get("row_index", 0), "en": e.get("english", ""), "de": e.get("german", ""), "ex": e.get("example", "")}
for e in batch_entries
]
prompt = _build_llm_prompt(table_lines)
t0 = time.time()
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{_OLLAMA_URL}/api/chat",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 4096},
},
)
resp.raise_for_status()
content = resp.json().get("message", {}).get("content", "")
batch_ms = int((time.time() - t0) * 1000)
total_duration_ms += batch_ms
corrected = _parse_llm_json_array(content)
batch_changes, batch_corrected = _diff_batch(batch_entries, corrected)
# Merge back
for batch_idx, (orig_idx, _) in enumerate(batch_items):
if batch_idx < len(batch_corrected):
all_corrected[orig_idx] = batch_corrected[batch_idx]
all_changes.extend(batch_changes)
reviewed_count += len(batch_items)
# Yield batch result
yield {
"type": "batch",
"batch_index": batch_start // batch_size,
"entries_reviewed": [e.get("row_index", 0) for _, e in batch_items],
"changes": batch_changes,
"duration_ms": batch_ms,
"progress": {"current": reviewed_count, "total": total_to_review},
}
# Complete event
yield {
"type": "complete",
"changes": all_changes,
"model_used": model,
"duration_ms": total_duration_ms,
"total_entries": len(entries),
"reviewed": total_to_review,
"skipped": len(skipped_indices),
"corrections_found": len(all_changes),
"entries_corrected": all_corrected,
}
def _parse_llm_json_array(text: str) -> List[Dict]:
"""Extract JSON array from LLM response (may contain markdown fences)."""
# Strip markdown code fences

View File

@@ -51,6 +51,7 @@ from cv_vocab_pipeline import (
dewarp_image,
dewarp_image_manual,
llm_review_entries,
llm_review_entries_streaming,
render_image_high_res,
render_pdf_high_res,
)
@@ -1395,8 +1396,12 @@ async def get_word_ground_truth(session_id: str):
@router.post("/sessions/{session_id}/llm-review")
async def run_llm_review(session_id: str, request: Request):
"""Run LLM-based correction on vocab entries from Step 5."""
async def run_llm_review(session_id: str, request: Request, stream: bool = False):
"""Run LLM-based correction on vocab entries from Step 5.
Query params:
stream: false (default) for JSON response, true for SSE streaming
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
@@ -1417,6 +1422,14 @@ async def run_llm_review(session_id: str, request: Request):
pass
model = body.get("model") or OLLAMA_REVIEW_MODEL
if stream:
return StreamingResponse(
_llm_review_stream_generator(session_id, entries, word_result, model, request),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
)
# Non-streaming path
try:
result = await llm_review_entries(entries, model=model)
except Exception as e:
@@ -1449,6 +1462,44 @@ async def run_llm_review(session_id: str, request: Request):
}
async def _llm_review_stream_generator(
session_id: str,
entries: List[Dict],
word_result: Dict,
model: str,
request: Request,
):
"""SSE generator that yields batch-by-batch LLM review progress."""
try:
async for event in llm_review_entries_streaming(entries, model=model):
if await request.is_disconnected():
logger.info(f"SSE: client disconnected during LLM review for {session_id}")
return
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
# On complete: persist to DB
if event.get("type") == "complete":
word_result["llm_review"] = {
"changes": event["changes"],
"model_used": event["model_used"],
"duration_ms": event["duration_ms"],
"entries_corrected": event["entries_corrected"],
}
await update_session_db(session_id, word_result=word_result, current_step=6)
if session_id in _cache:
_cache[session_id]["word_result"] = word_result
logger.info(f"LLM review SSE session {session_id}: {event['corrections_found']} changes, "
f"{event['duration_ms']}ms, skipped={event['skipped']}, model={event['model_used']}")
except Exception as e:
import traceback
logger.error(f"LLM review SSE failed for {session_id}: {type(e).__name__}: {e}\n{traceback.format_exc()}")
error_event = {"type": "error", "detail": f"{type(e).__name__}: {e}"}
yield f"data: {json.dumps(error_event)}\n\n"
@router.post("/sessions/{session_id}/llm-review/apply")
async def apply_llm_corrections(session_id: str, request: Request):
"""Apply selected LLM corrections to vocab entries."""