From 2a493890b63ce21f49f6907fb8afc13297c60d9b Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 2 Mar 2026 11:46:06 +0100 Subject: [PATCH] feat(ocr-pipeline): add SSE streaming and phonetic filter to LLM review - Stream LLM review results batch-by-batch (8 entries per batch) via SSE - Frontend shows live progress bar, batch log, and corrections appearing - Skip entries with IPA phonetic transcriptions (already dictionary-corrected) - Refactor llm_review_entries into reusable helpers for both streaming and non-streaming paths Co-Authored-By: Claude Opus 4.6 --- .../components/ocr-pipeline/StepLlmReview.tsx | 360 ++++++++++-------- klausur-service/backend/cv_vocab_pipeline.py | 219 +++++++++-- klausur-service/backend/ocr_pipeline_api.py | 55 ++- 3 files changed, 441 insertions(+), 193 deletions(-) diff --git a/admin-lehrer/components/ocr-pipeline/StepLlmReview.tsx b/admin-lehrer/components/ocr-pipeline/StepLlmReview.tsx index 81dfac4..ad07c40 100644 --- a/admin-lehrer/components/ocr-pipeline/StepLlmReview.tsx +++ b/admin-lehrer/components/ocr-pipeline/StepLlmReview.tsx @@ -1,6 +1,6 @@ 'use client' -import { useCallback, useState } from 'react' +import { useCallback, useRef, useState } from 'react' const KLAUSUR_API = '/klausur-api' @@ -11,19 +11,23 @@ interface LlmChange { new: string } -interface LlmReviewResult { - changes: LlmChange[] - model_used: string - duration_ms: number - total_entries: number - corrections_found: number -} - interface StepLlmReviewProps { sessionId: string | null onNext: () => void } +interface ReviewMeta { + total_entries: number + to_review: number + skipped: number + model: string +} + +interface StreamProgress { + current: number + total: number +} + const FIELD_LABELS: Record = { english: 'EN', german: 'DE', @@ -32,34 +36,96 @@ const FIELD_LABELS: Record = { export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { const [status, setStatus] = useState<'idle' | 'running' | 'done' | 'error' | 'applied'>('idle') - const [result, setResult] = useState(null) - const [error, setError] = useState('') + const [meta, setMeta] = useState(null) + const [changes, setChanges] = useState([]) + const [progress, setProgress] = useState(null) + const [batchLog, setBatchLog] = useState([]) + const [totalDuration, setTotalDuration] = useState(0) + const [error, setError] = useState('') const [accepted, setAccepted] = useState>(new Set()) const [applying, setApplying] = useState(false) + const tableEndRef = useRef(null) const runReview = useCallback(async () => { if (!sessionId) return setStatus('running') setError('') - setResult(null) + setChanges([]) + setBatchLog([]) + setProgress(null) + setMeta(null) + setTotalDuration(0) try { - const res = await fetch(`${KLAUSUR_API}/api/v1/ocr-pipeline/sessions/${sessionId}/llm-review`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({}), - }) + const res = await fetch( + `${KLAUSUR_API}/api/v1/ocr-pipeline/sessions/${sessionId}/llm-review?stream=true`, + { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({}) }, + ) if (!res.ok) { const data = await res.json().catch(() => ({})) throw new Error(data.detail || `HTTP ${res.status}`) } - const data: LlmReviewResult = await res.json() - setResult(data) - // Accept all changes by default - setAccepted(new Set(data.changes.map((_, i) => i))) - setStatus('done') + const reader = res.body!.getReader() + const decoder = new TextDecoder() + let buffer = '' + let allChanges: LlmChange[] = [] + + while (true) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + + while (buffer.includes('\n\n')) { + const idx = buffer.indexOf('\n\n') + const chunk = buffer.slice(0, idx).trim() + buffer = buffer.slice(idx + 2) + + if (!chunk.startsWith('data: ')) continue + const dataStr = chunk.slice(6) + + let event: any + try { event = JSON.parse(dataStr) } catch { continue } + + if (event.type === 'meta') { + setMeta({ + total_entries: event.total_entries, + to_review: event.to_review, + skipped: event.skipped, + model: event.model, + }) + setBatchLog([`${event.total_entries} Eintraege, ${event.skipped} uebersprungen (Lautschrift), ${event.to_review} zu pruefen`]) + } + + if (event.type === 'batch') { + const batchChanges: LlmChange[] = event.changes || [] + allChanges = [...allChanges, ...batchChanges] + setChanges(allChanges) + setProgress(event.progress) + const rows = (event.entries_reviewed || []).map((r: number) => `R${r}`).join(', ') + setBatchLog(prev => [...prev, + `Batch ${event.batch_index + 1}: ${rows} — ${batchChanges.length} Korrektur${batchChanges.length !== 1 ? 'en' : ''} (${event.duration_ms}ms)` + ]) + setTimeout(() => tableEndRef.current?.scrollIntoView({ behavior: 'smooth', block: 'nearest' }), 16) + } + + if (event.type === 'complete') { + setTotalDuration(event.duration_ms) + setAccepted(new Set(allChanges.map((_, i) => i))) + setStatus('done') + } + + if (event.type === 'error') { + throw new Error(event.detail || 'Unbekannter Fehler') + } + } + } + + // If no complete event was received (e.g. 0 entries to review) + if (allChanges.length === 0 && status !== 'done') { + setStatus('done') + } } catch (e: unknown) { const msg = e instanceof Error ? e.message : String(e) setError(msg) @@ -68,7 +134,7 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { }, [sessionId]) const toggleChange = (index: number) => { - setAccepted((prev) => { + setAccepted(prev => { const next = new Set(prev) if (next.has(index)) next.delete(index) else next.add(index) @@ -77,48 +143,39 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { } const toggleAll = () => { - if (!result) return - if (accepted.size === result.changes.length) { + if (accepted.size === changes.length) { setAccepted(new Set()) } else { - setAccepted(new Set(result.changes.map((_, i) => i))) + setAccepted(new Set(changes.map((_, i) => i))) } } const applyChanges = useCallback(async () => { - if (!sessionId || !result) return + if (!sessionId) return setApplying(true) - try { const res = await fetch(`${KLAUSUR_API}/api/v1/ocr-pipeline/sessions/${sessionId}/llm-review/apply`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ accepted_indices: Array.from(accepted) }), }) - if (!res.ok) { const data = await res.json().catch(() => ({})) throw new Error(data.detail || `HTTP ${res.status}`) } - setStatus('applied') } catch (e: unknown) { - const msg = e instanceof Error ? e.message : String(e) - setError(msg) + setError(e instanceof Error ? e.message : String(e)) } finally { setApplying(false) } - }, [sessionId, result, accepted]) + }, [sessionId, accepted]) if (!sessionId) { - return ( -
- Bitte zuerst eine Session auswaehlen. -
- ) + return
Bitte zuerst eine Session auswaehlen.
} - // --- Idle state --- + // --- Idle --- if (status === 'idle') { return (
@@ -127,59 +184,104 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { Schritt 6: LLM-Korrektur

- Ein lokales Sprachmodell prueft die OCR-Ergebnisse auf typische Erkennungsfehler - (z.B. "8en" statt "Ben") und schlaegt Korrekturen vor. + Ein lokales Sprachmodell prueft die OCR-Ergebnisse auf typische Erkennungsfehler. + Eintraege mit Lautschrift werden automatisch uebersprungen.

Modell: qwen3:30b-a3b via Ollama (lokal)

-
) } - // --- Running state --- + // --- Running (with live progress) --- if (status === 'running') { + const pct = progress ? Math.round((progress.current / progress.total) * 100) : 0 + return ( -
-
-

- Korrektur laeuft... -

-

- qwen3:30b-a3b prueft die Vokabeleintraege -

+
+
+
+

+ LLM-Korrektur laeuft... +

+ {meta && ( + {meta.model} + )} +
+ + {/* Progress bar */} + {progress && ( +
+
+ {progress.current} / {progress.total} Eintraege geprueft + {pct}% +
+
+
+
+
+ )} + + {/* Live batch log */} +
+ {batchLog.map((line, i) => ( +
{line}
+ ))} +
+ + {/* Live changes appearing */} + {changes.length > 0 && ( +
+ + + + + + + + + + + {changes.map((change, idx) => ( + + + + + + + ))} + +
ZeileFeldVorherNachher
R{change.row_index} + + {FIELD_LABELS[change.field] || change.field} + + {change.old}{change.new}
+
+
+ )}
) } - // --- Error state --- + // --- Error --- if (status === 'error') { return (
⚠️
-

- Fehler bei LLM-Korrektur -

-

- {error} -

+

Fehler bei LLM-Korrektur

+

{error}

- -
@@ -187,48 +289,37 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { ) } - // --- Applied state --- + // --- Applied --- if (status === 'applied') { return (
-

- Korrekturen uebernommen -

+

Korrekturen uebernommen

- {accepted.size} von {result?.changes.length ?? 0} Korrekturen wurden angewendet. + {accepted.size} von {changes.length} Korrekturen wurden angewendet.

-
) } - // --- Done state: show diff table --- - const changes = result?.changes ?? [] - + // --- Done: diff table with checkboxes --- if (changes.length === 0) { return (
👍
-

- Keine Korrekturen noetig -

-

- Das LLM hat keine OCR-Fehler gefunden. -

-

- {result?.total_entries} Eintraege geprueft in {result?.duration_ms}ms - ({result?.model_used}) -

-
@@ -240,22 +331,17 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { {/* Header */}
-

- LLM-Korrekturvorschlaege -

+

LLM-Korrekturvorschlaege

{changes.length} Korrektur{changes.length !== 1 ? 'en' : ''} gefunden - · {result?.duration_ms}ms · {result?.model_used} + {meta && <> · {meta.skipped} uebersprungen (Lautschrift)} + {' '}· {totalDuration}ms · {meta?.model}

-
- -
+
{/* Diff table */} @@ -264,12 +350,8 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { - + Zeile Feld @@ -279,40 +361,21 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { {changes.map((change, idx) => ( - + - toggleChange(idx)} - className="rounded border-gray-300 dark:border-gray-600" - /> - - - R{change.row_index} + toggleChange(idx)} + className="rounded border-gray-300 dark:border-gray-600" /> + R{change.row_index} {FIELD_LABELS[change.field] || change.field} - - - {change.old} - - - - - {change.new} - - + {change.old} + {change.new} ))} @@ -321,21 +384,14 @@ export function StepLlmReview({ sessionId, onNext }: StepLlmReviewProps) { {/* Actions */}
-

- {accepted.size} von {changes.length} ausgewaehlt -

+

{accepted.size} von {changes.length} ausgewaehlt

- -
diff --git a/klausur-service/backend/cv_vocab_pipeline.py b/klausur-service/backend/cv_vocab_pipeline.py index 22b7fb4..8417c8c 100644 --- a/klausur-service/backend/cv_vocab_pipeline.py +++ b/klausur-service/backend/cv_vocab_pipeline.py @@ -4318,25 +4318,30 @@ import re as _re _OLLAMA_URL = os.getenv("OLLAMA_URL", os.getenv("OLLAMA_BASE_URL", "http://host.docker.internal:11434")) OLLAMA_REVIEW_MODEL = os.getenv("OLLAMA_REVIEW_MODEL", "qwen3:30b-a3b") +# Regex: entry contains IPA phonetic brackets like "dance [dɑːns]" +_HAS_PHONETIC_RE = _re.compile(r'\[.*?[ˈˌːʃʒθðŋɑɒɔəɜɪʊʌæ].*?\]') -async def llm_review_entries( - entries: List[Dict], - model: str = None, -) -> Dict: - """Send vocab entries to a local LLM for OCR error correction.""" - model = model or OLLAMA_REVIEW_MODEL - # Build a compact table representation for the prompt - table_lines = [] - for e in entries: - table_lines.append({ - "row": e.get("row_index", 0), - "en": e.get("english", ""), - "de": e.get("german", ""), - "ex": e.get("example", ""), - }) +def _entry_needs_review(entry: Dict) -> bool: + """Check if an entry should be sent to the LLM for review. - prompt = f"""Du bist ein Korrekturleser fuer OCR-erkannte Vokabeltabellen (Englisch-Deutsch). + Skip entries that are empty or contain IPA phonetic transcriptions + (those were already corrected by the word dictionary lookup). + """ + en = entry.get("english", "") or "" + de = entry.get("german", "") or "" + # Skip completely empty entries + if not en.strip() and not de.strip(): + return False + # Skip entries with phonetic/IPA brackets — these are dictionary-corrected + if _HAS_PHONETIC_RE.search(en): + return False + return True + + +def _build_llm_prompt(table_lines: List[Dict]) -> str: + """Build the LLM correction prompt for a batch of entries.""" + return f"""Du bist ein Korrekturleser fuer OCR-erkannte Vokabeltabellen (Englisch-Deutsch). Die Tabelle wurde per OCR aus einem Schulbuch-Scan extrahiert. Korrigiere NUR offensichtliche OCR-Fehler. Haeufige OCR-Fehler die du korrigieren sollst: @@ -4359,28 +4364,12 @@ Fuer unveraenderte Eintraege setze "corrected": false. Eingabe: {_json.dumps(table_lines, ensure_ascii=False, indent=2)}""" - t0 = time.time() - async with httpx.AsyncClient(timeout=300.0) as client: - resp = await client.post( - f"{_OLLAMA_URL}/api/chat", - json={ - "model": model, - "messages": [{"role": "user", "content": prompt}], - "stream": False, - "options": {"temperature": 0.1, "num_predict": 8192}, - }, - ) - resp.raise_for_status() - content = resp.json().get("message", {}).get("content", "") - duration_ms = int((time.time() - t0) * 1000) - # Parse LLM response — extract JSON array - corrected = _parse_llm_json_array(content) - - # Build diff: compare original vs corrected +def _diff_batch(originals: List[Dict], corrected: List[Dict]) -> Tuple[List[Dict], List[Dict]]: + """Compare original entries with LLM-corrected ones, return (changes, corrected_entries).""" changes = [] - entries_corrected = [] - for i, orig in enumerate(entries): + entries_out = [] + for i, orig in enumerate(originals): if i < len(corrected): c = corrected[i] entry = dict(orig) @@ -4396,19 +4385,171 @@ Eingabe: }) entry[field_name] = new_val entry["llm_corrected"] = True - entries_corrected.append(entry) + entries_out.append(entry) else: - entries_corrected.append(dict(orig)) + entries_out.append(dict(orig)) + return changes, entries_out + + +async def llm_review_entries( + entries: List[Dict], + model: str = None, +) -> Dict: + """Send vocab entries to a local LLM for OCR error correction (single batch).""" + model = model or OLLAMA_REVIEW_MODEL + + # Filter: only entries that need review + reviewable = [(i, e) for i, e in enumerate(entries) if _entry_needs_review(e)] + + if not reviewable: + return { + "entries_original": entries, + "entries_corrected": [dict(e) for e in entries], + "changes": [], + "skipped_count": len(entries), + "model_used": model, + "duration_ms": 0, + } + + review_entries = [e for _, e in reviewable] + table_lines = [ + {"row": e.get("row_index", 0), "en": e.get("english", ""), "de": e.get("german", ""), "ex": e.get("example", "")} + for e in review_entries + ] + + prompt = _build_llm_prompt(table_lines) + + t0 = time.time() + async with httpx.AsyncClient(timeout=300.0) as client: + resp = await client.post( + f"{_OLLAMA_URL}/api/chat", + json={ + "model": model, + "messages": [{"role": "user", "content": prompt}], + "stream": False, + "options": {"temperature": 0.1, "num_predict": 8192}, + }, + ) + resp.raise_for_status() + content = resp.json().get("message", {}).get("content", "") + duration_ms = int((time.time() - t0) * 1000) + + corrected = _parse_llm_json_array(content) + changes, corrected_entries = _diff_batch(review_entries, corrected) + + # Merge corrected entries back into the full list + all_corrected = [dict(e) for e in entries] + for batch_idx, (orig_idx, _) in enumerate(reviewable): + if batch_idx < len(corrected_entries): + all_corrected[orig_idx] = corrected_entries[batch_idx] return { "entries_original": entries, - "entries_corrected": entries_corrected, + "entries_corrected": all_corrected, "changes": changes, + "skipped_count": len(entries) - len(reviewable), "model_used": model, "duration_ms": duration_ms, } +async def llm_review_entries_streaming( + entries: List[Dict], + model: str = None, + batch_size: int = 8, +): + """Async generator: yield SSE events while reviewing entries in batches.""" + model = model or OLLAMA_REVIEW_MODEL + + # Separate reviewable from skipped entries + reviewable = [] + skipped_indices = [] + for i, e in enumerate(entries): + if _entry_needs_review(e): + reviewable.append((i, e)) + else: + skipped_indices.append(i) + + total_to_review = len(reviewable) + + # meta event + yield { + "type": "meta", + "total_entries": len(entries), + "to_review": total_to_review, + "skipped": len(skipped_indices), + "model": model, + "batch_size": batch_size, + } + + all_changes = [] + all_corrected = [dict(e) for e in entries] + total_duration_ms = 0 + reviewed_count = 0 + + # Process in batches + for batch_start in range(0, total_to_review, batch_size): + batch_items = reviewable[batch_start:batch_start + batch_size] + batch_entries = [e for _, e in batch_items] + + table_lines = [ + {"row": e.get("row_index", 0), "en": e.get("english", ""), "de": e.get("german", ""), "ex": e.get("example", "")} + for e in batch_entries + ] + + prompt = _build_llm_prompt(table_lines) + + t0 = time.time() + async with httpx.AsyncClient(timeout=300.0) as client: + resp = await client.post( + f"{_OLLAMA_URL}/api/chat", + json={ + "model": model, + "messages": [{"role": "user", "content": prompt}], + "stream": False, + "options": {"temperature": 0.1, "num_predict": 4096}, + }, + ) + resp.raise_for_status() + content = resp.json().get("message", {}).get("content", "") + batch_ms = int((time.time() - t0) * 1000) + total_duration_ms += batch_ms + + corrected = _parse_llm_json_array(content) + batch_changes, batch_corrected = _diff_batch(batch_entries, corrected) + + # Merge back + for batch_idx, (orig_idx, _) in enumerate(batch_items): + if batch_idx < len(batch_corrected): + all_corrected[orig_idx] = batch_corrected[batch_idx] + + all_changes.extend(batch_changes) + reviewed_count += len(batch_items) + + # Yield batch result + yield { + "type": "batch", + "batch_index": batch_start // batch_size, + "entries_reviewed": [e.get("row_index", 0) for _, e in batch_items], + "changes": batch_changes, + "duration_ms": batch_ms, + "progress": {"current": reviewed_count, "total": total_to_review}, + } + + # Complete event + yield { + "type": "complete", + "changes": all_changes, + "model_used": model, + "duration_ms": total_duration_ms, + "total_entries": len(entries), + "reviewed": total_to_review, + "skipped": len(skipped_indices), + "corrections_found": len(all_changes), + "entries_corrected": all_corrected, + } + + def _parse_llm_json_array(text: str) -> List[Dict]: """Extract JSON array from LLM response (may contain markdown fences).""" # Strip markdown code fences diff --git a/klausur-service/backend/ocr_pipeline_api.py b/klausur-service/backend/ocr_pipeline_api.py index 01469e0..542bd1e 100644 --- a/klausur-service/backend/ocr_pipeline_api.py +++ b/klausur-service/backend/ocr_pipeline_api.py @@ -51,6 +51,7 @@ from cv_vocab_pipeline import ( dewarp_image, dewarp_image_manual, llm_review_entries, + llm_review_entries_streaming, render_image_high_res, render_pdf_high_res, ) @@ -1395,8 +1396,12 @@ async def get_word_ground_truth(session_id: str): @router.post("/sessions/{session_id}/llm-review") -async def run_llm_review(session_id: str, request: Request): - """Run LLM-based correction on vocab entries from Step 5.""" +async def run_llm_review(session_id: str, request: Request, stream: bool = False): + """Run LLM-based correction on vocab entries from Step 5. + + Query params: + stream: false (default) for JSON response, true for SSE streaming + """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") @@ -1417,6 +1422,14 @@ async def run_llm_review(session_id: str, request: Request): pass model = body.get("model") or OLLAMA_REVIEW_MODEL + if stream: + return StreamingResponse( + _llm_review_stream_generator(session_id, entries, word_result, model, request), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, + ) + + # Non-streaming path try: result = await llm_review_entries(entries, model=model) except Exception as e: @@ -1449,6 +1462,44 @@ async def run_llm_review(session_id: str, request: Request): } +async def _llm_review_stream_generator( + session_id: str, + entries: List[Dict], + word_result: Dict, + model: str, + request: Request, +): + """SSE generator that yields batch-by-batch LLM review progress.""" + try: + async for event in llm_review_entries_streaming(entries, model=model): + if await request.is_disconnected(): + logger.info(f"SSE: client disconnected during LLM review for {session_id}") + return + + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + + # On complete: persist to DB + if event.get("type") == "complete": + word_result["llm_review"] = { + "changes": event["changes"], + "model_used": event["model_used"], + "duration_ms": event["duration_ms"], + "entries_corrected": event["entries_corrected"], + } + await update_session_db(session_id, word_result=word_result, current_step=6) + if session_id in _cache: + _cache[session_id]["word_result"] = word_result + + logger.info(f"LLM review SSE session {session_id}: {event['corrections_found']} changes, " + f"{event['duration_ms']}ms, skipped={event['skipped']}, model={event['model_used']}") + + except Exception as e: + import traceback + logger.error(f"LLM review SSE failed for {session_id}: {type(e).__name__}: {e}\n{traceback.format_exc()}") + error_event = {"type": "error", "detail": f"{type(e).__name__}: {e}"} + yield f"data: {json.dumps(error_event)}\n\n" + + @router.post("/sessions/{session_id}/llm-review/apply") async def apply_llm_corrections(session_id: str, request: Request): """Apply selected LLM corrections to vocab entries."""