feat: Sprint 1 — IPA hardening, regression framework, ground-truth review
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 28s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m55s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 19s

Track A (Backend):
- Compound word IPA decomposition (schoolbag→school+bag)
- Trailing garbled IPA fragment removal after brackets (R21 fix)
- Regression runner with DB persistence, history endpoints
- Page crop determinism verified with tests

Track B (Frontend):
- OCR Regression dashboard (/ai/ocr-regression)
- Ground Truth Review workflow (/ai/ocr-ground-truth)
  with split-view, confidence highlighting, inline edit,
  batch mark, progress tracking

Track C (Docs):
- OCR-Pipeline.md v5.0 (Steps 5e-5h)
- Regression testing guide
- mkdocs.yml nav update

Track D (Infra):
- TrOCR baseline benchmark script
- run-regression.sh shell script
- Migration 008: regression_runs table

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-23 09:21:27 +01:00
parent f5d5d6c59c
commit a1e079b911
13 changed files with 1796 additions and 15 deletions

View File

@@ -1032,6 +1032,37 @@ def _text_has_garbled_ipa(text: str) -> bool:
return False
def _decompose_compound(word: str, pronunciation: str = 'british') -> Optional[str]:
"""Try to decompose a compound word and concatenate IPA for each part.
E.g. "schoolbag""school"+"bag" → IPA for both concatenated.
Only returns IPA if ALL parts are found in the dictionary.
Tries splits at every position (min 3 chars per part) and picks the
split where the first part is longest.
"""
if not IPA_AVAILABLE:
return None
lower = word.lower().strip()
if len(lower) < 6:
return None # too short for a compound
best_ipa = None
best_first_len = 0
for split_pos in range(3, len(lower) - 2): # min 3 chars each part
first = lower[:split_pos]
second = lower[split_pos:]
ipa_first = _lookup_ipa(first, pronunciation)
ipa_second = _lookup_ipa(second, pronunciation)
if ipa_first and ipa_second:
if split_pos > best_first_len:
best_first_len = split_pos
best_ipa = ipa_first + ipa_second
return best_ipa
def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
"""Insert IPA pronunciation for English words that have no brackets at all.
@@ -1077,6 +1108,10 @@ def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
# Fallback: try without hyphens (e.g. "second-hand" → "secondhand")
if not ipa and '-' in clean:
ipa = _lookup_ipa(clean.replace('-', ''), pronunciation)
# Fallback 0b: compound word decomposition
# E.g. "schoolbag" → "school"+"bag" → concatenated IPA
if not ipa:
ipa = _decompose_compound(clean, pronunciation)
# Fallback 1: IPA-marker split for merged tokens where OCR
# joined headword with its IPA (e.g. "schoolbagsku:lbæg").
# Find the first IPA marker character (:, æ, ɪ, etc.), walk
@@ -1098,6 +1133,9 @@ def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
headword = w[:split]
ocr_ipa = w[split:]
hw_ipa = _lookup_ipa(headword, pronunciation)
if not hw_ipa:
# Try compound decomposition for the headword part
hw_ipa = _decompose_compound(headword, pronunciation)
if hw_ipa:
words[i] = f"{headword} [{hw_ipa}]"
else:
@@ -1197,6 +1235,12 @@ def _strip_post_bracket_garbled(
E.g. ``sea [sˈiː] si:`` → ``sea [sˈiː]``
``seat [sˈiːt] si:t`` → ``seat [sˈiːt]``
``seat [sˈiːt] belt si:t belt`` → ``seat [sˈiːt] belt``
For multi-word headwords like "seat belt", a real English word ("belt")
may be followed by garbled IPA duplicates. We detect this by checking
whether the sequence after a real word contains IPA markers (`:`, `ə`,
etc.) — if so, everything from the first garbled token onward is stripped.
"""
if ']' not in text:
return text
@@ -1207,6 +1251,8 @@ def _strip_post_bracket_garbled(
after = text[last_bracket + 1:].strip()
if not after:
return text
_IPA_MARKER_CHARS = set(':əɪɛɒʊʌæɑɔʃʒθðŋˈˌ')
after_words = after.split()
kept: List[str] = []
for idx, w in enumerate(after_words):
@@ -1215,17 +1261,42 @@ def _strip_post_bracket_garbled(
kept.extend(after_words[idx:])
break
# Contains IPA markers (length mark, IPA chars) — garbled, skip
if ':' in w or any(c in w for c in 'əɪɛɒʊʌæɑɔʃʒθðŋˈˌ'):
if any(c in w for c in _IPA_MARKER_CHARS):
# Everything from here is garbled IPA — stop scanning
# but look ahead: if any remaining words are real English
# words WITHOUT IPA markers, they might be a different headword
# following. Only skip the contiguous garbled run.
continue
clean = re.sub(r'[^a-zA-Z]', '', w)
# Uppercase — likely German, keep rest
if clean and clean[0].isupper():
kept.extend(after_words[idx:])
break
# Known English word — keep rest
# Known English word — keep it, but check if followed by garbled IPA
# (multi-word headword case like "seat [siːt] belt si:t belt")
if clean and len(clean) >= 2 and _lookup_ipa(clean, pronunciation):
kept.extend(after_words[idx:])
break
# Peek ahead: if next word has IPA markers, the rest is garbled
remaining = after_words[idx + 1:]
has_garbled_after = any(
any(c in rw for c in _IPA_MARKER_CHARS)
for rw in remaining
)
if has_garbled_after:
# Keep this real word but stop — rest is garbled duplication
kept.append(w)
# Still scan for delimiters/German in the remaining words
for ridx, rw in enumerate(remaining):
if rw in ('', '', '-', '/', '|', ',', ';'):
kept.extend(remaining[ridx:])
break
rclean = re.sub(r'[^a-zA-Z]', '', rw)
if rclean and rclean[0].isupper():
kept.extend(remaining[ridx:])
break
break
else:
kept.extend(after_words[idx:])
break
# Unknown short word — likely garbled, skip
if kept:
return before + ' ' + ' '.join(kept)

View File

@@ -0,0 +1,18 @@
-- Migration 008: Regression test run history
-- Stores results of regression test runs for trend analysis.
CREATE TABLE IF NOT EXISTS regression_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
status VARCHAR(20) NOT NULL, -- 'pass', 'fail', 'error'
total INT NOT NULL DEFAULT 0,
passed INT NOT NULL DEFAULT 0,
failed INT NOT NULL DEFAULT 0,
errors INT NOT NULL DEFAULT 0,
duration_ms INT,
results JSONB NOT NULL DEFAULT '[]',
triggered_by VARCHAR(50) DEFAULT 'manual' -- 'manual', 'script', 'ci'
);
CREATE INDEX IF NOT EXISTS idx_regression_runs_run_at
ON regression_runs (run_at DESC);

View File

@@ -8,7 +8,11 @@ Lizenz: Apache 2.0
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import json
import logging
import os
import time
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
@@ -16,6 +20,7 @@ from fastapi import APIRouter, HTTPException, Query
from grid_editor_api import _build_grid_core
from ocr_pipeline_session_store import (
get_pool,
get_session_db,
list_ground_truth_sessions_db,
update_session_db,
@@ -26,6 +31,60 @@ logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["regression"])
# ---------------------------------------------------------------------------
# DB persistence for regression runs
# ---------------------------------------------------------------------------
async def _init_regression_table():
"""Ensure regression_runs table exists (idempotent)."""
pool = await get_pool()
async with pool.acquire() as conn:
migration_path = os.path.join(
os.path.dirname(__file__),
"migrations/008_regression_runs.sql",
)
if os.path.exists(migration_path):
with open(migration_path, "r") as f:
sql = f.read()
await conn.execute(sql)
async def _persist_regression_run(
status: str,
summary: dict,
results: list,
duration_ms: int,
triggered_by: str = "manual",
) -> str:
"""Save a regression run to the database. Returns the run ID."""
try:
await _init_regression_table()
pool = await get_pool()
run_id = str(uuid.uuid4())
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO regression_runs
(id, status, total, passed, failed, errors, duration_ms, results, triggered_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9)
""",
run_id,
status,
summary.get("total", 0),
summary.get("passed", 0),
summary.get("failed", 0),
summary.get("errors", 0),
duration_ms,
json.dumps(results),
triggered_by,
)
logger.info("Regression run %s persisted: %s", run_id, status)
return run_id
except Exception as e:
logger.warning("Failed to persist regression run: %s", e)
return ""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@@ -299,8 +358,11 @@ async def run_single_regression(session_id: str):
@router.post("/regression/run")
async def run_all_regressions():
async def run_all_regressions(
triggered_by: str = Query("manual", description="Who triggered: manual, script, ci"),
):
"""Re-run build_grid for ALL ground-truth sessions and compare."""
start_time = time.monotonic()
sessions = await list_ground_truth_sessions_db()
if not sessions:
@@ -370,19 +432,105 @@ async def run_all_regressions():
results.append(entry)
overall = "pass" if failed == 0 and errors == 0 else "fail"
duration_ms = int((time.monotonic() - start_time) * 1000)
summary = {
"total": len(results),
"passed": passed,
"failed": failed,
"errors": errors,
}
logger.info(
"Regression suite: %s%d passed, %d failed, %d errors (of %d)",
overall, passed, failed, errors, len(results),
"Regression suite: %s%d passed, %d failed, %d errors (of %d) in %dms",
overall, passed, failed, errors, len(results), duration_ms,
)
# Persist to DB
run_id = await _persist_regression_run(
status=overall,
summary=summary,
results=results,
duration_ms=duration_ms,
triggered_by=triggered_by,
)
return {
"status": overall,
"run_id": run_id,
"duration_ms": duration_ms,
"results": results,
"summary": {
"total": len(results),
"passed": passed,
"failed": failed,
"errors": errors,
},
"summary": summary,
}
@router.get("/regression/history")
async def get_regression_history(
limit: int = Query(20, ge=1, le=100),
):
"""Get recent regression run history from the database."""
try:
await _init_regression_table()
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, run_at, status, total, passed, failed, errors,
duration_ms, triggered_by
FROM regression_runs
ORDER BY run_at DESC
LIMIT $1
""",
limit,
)
return {
"runs": [
{
"id": str(row["id"]),
"run_at": row["run_at"].isoformat() if row["run_at"] else None,
"status": row["status"],
"total": row["total"],
"passed": row["passed"],
"failed": row["failed"],
"errors": row["errors"],
"duration_ms": row["duration_ms"],
"triggered_by": row["triggered_by"],
}
for row in rows
],
"count": len(rows),
}
except Exception as e:
logger.warning("Failed to fetch regression history: %s", e)
return {"runs": [], "count": 0, "error": str(e)}
@router.get("/regression/history/{run_id}")
async def get_regression_run_detail(run_id: str):
"""Get detailed results of a specific regression run."""
try:
await _init_regression_table()
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM regression_runs WHERE id = $1",
run_id,
)
if not row:
raise HTTPException(status_code=404, detail="Run not found")
return {
"id": str(row["id"]),
"run_at": row["run_at"].isoformat() if row["run_at"] else None,
"status": row["status"],
"total": row["total"],
"passed": row["passed"],
"failed": row["failed"],
"errors": row["errors"],
"duration_ms": row["duration_ms"],
"triggered_by": row["triggered_by"],
"results": json.loads(row["results"]) if row["results"] else [],
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -57,6 +57,63 @@ class TestInsertMissingIpa:
result = _insert_missing_ipa("Anstecknadel", "british")
assert result == "Anstecknadel"
def test_compound_word_schoolbag_gets_ipa(self):
"""R07: Compound word 'schoolbag' should get decomposed IPA (school+bag)."""
from cv_ocr_engines import _insert_missing_ipa
result = _insert_missing_ipa("schoolbag", "british")
assert "[" in result and "]" in result
assert result.startswith("schoolbag [")
def test_compound_word_blackbird(self):
"""Compound word 'blackbird' should get decomposed IPA."""
from cv_ocr_engines import _insert_missing_ipa
result = _insert_missing_ipa("blackbird", "british")
assert "[" in result and "]" in result
def test_compound_word_too_short(self):
"""Words shorter than 6 chars should not attempt compound decomposition."""
from cv_ocr_engines import _decompose_compound
assert _decompose_compound("bag", "british") is None
def test_decompose_compound_direct(self):
"""Direct test of _decompose_compound for known compounds."""
from cv_ocr_engines import _decompose_compound
# schoolbag = school + bag — both should be in dictionary
result = _decompose_compound("schoolbag", "british")
assert result is not None
class TestStripPostBracketGarbled:
"""Tests for _strip_post_bracket_garbled — trailing garbled IPA removal."""
def test_simple_trailing_garbled(self):
"""R21-simple: 'sea [sˈiː] si:' → trailing IPA marker removed."""
from cv_ocr_engines import _strip_post_bracket_garbled
result = _strip_post_bracket_garbled("sea [sˈiː] si:")
assert "si:" not in result
assert result.startswith("sea [sˈiː]")
def test_multi_word_trailing_garbled(self):
"""R21: 'seat [sˈiːt] belt si:t belt' → keep 'belt', remove garbled."""
from cv_ocr_engines import _strip_post_bracket_garbled
result = _strip_post_bracket_garbled("seat [sˈiːt] belt si:t belt")
assert "belt" in result # real word kept
assert "si:t" not in result # garbled removed
# Should contain "seat [sˈiːt] belt" but not the garbled duplication
assert result.count("belt") == 1
def test_delimiter_after_bracket_kept(self):
"""Delimiters after IPA bracket are kept."""
from cv_ocr_engines import _strip_post_bracket_garbled
result = _strip_post_bracket_garbled("dance [dˈɑːns] tanzen")
assert " tanzen" in result
def test_german_after_bracket_kept(self):
"""German words (uppercase) after IPA bracket are kept."""
from cv_ocr_engines import _strip_post_bracket_garbled
result = _strip_post_bracket_garbled("badge [bædʒ] Abzeichen")
assert "Abzeichen" in result
class TestFixCellPhonetics:
"""Tests for fix_cell_phonetics function."""

View File

@@ -415,3 +415,53 @@ class TestDetectAndCropPage:
assert 0 <= pct["y"] <= 100
assert 0 < pct["width"] <= 100
assert 0 < pct["height"] <= 100
class TestCropDeterminism:
"""A3: Verify that page crop produces identical results across N runs."""
@pytest.mark.parametrize("image_factory,desc", [
(
lambda: _make_image_with_content(800, 600, (100, 700, 80, 520)),
"standard content",
),
(
lambda: _make_book_scan(1000, 800),
"book scan with spine shadow",
),
])
def test_determinism_10_runs(self, image_factory, desc):
"""Same image must produce identical crops in 10 consecutive runs."""
img = image_factory()
results = []
for _ in range(10):
cropped, result = detect_and_crop_page(img.copy())
results.append({
"crop_applied": result["crop_applied"],
"cropped_size": result["cropped_size"],
"border_fractions": result["border_fractions"],
"shape": cropped.shape,
})
first = results[0]
for i, r in enumerate(results[1:], 1):
assert r["crop_applied"] == first["crop_applied"], (
f"Run {i} crop_applied differs from run 0 ({desc})"
)
assert r["cropped_size"] == first["cropped_size"], (
f"Run {i} cropped_size differs from run 0 ({desc})"
)
assert r["shape"] == first["shape"], (
f"Run {i} output shape differs from run 0 ({desc})"
)
def test_determinism_pixel_identical(self):
"""Crop output pixels must be identical across runs."""
img = _make_image_with_content(800, 600, (100, 700, 80, 520))
ref_crop, _ = detect_and_crop_page(img.copy())
for i in range(5):
crop, _ = detect_and_crop_page(img.copy())
assert np.array_equal(ref_crop, crop), (
f"Run {i} produced different pixel output"
)