Flexible inhaltsbasierte Spaltenerkennung (2-Phasen)
Ersetzt hardcodierte Positionsregeln durch ein zweistufiges System: Phase A erkennt Spaltengeometrie (Clustering), Phase B klassifiziert Typen per Inhalt (Sprache/Rolle) mit 3-stufiger Fallback-Kette. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -48,16 +48,46 @@ except ImportError:
|
||||
CV_PIPELINE_AVAILABLE = CV2_AVAILABLE and TESSERACT_AVAILABLE
|
||||
|
||||
|
||||
# --- Language Detection Constants ---
|
||||
|
||||
GERMAN_FUNCTION_WORDS = {'der', 'die', 'das', 'und', 'ist', 'ein', 'eine', 'nicht',
|
||||
'von', 'zu', 'mit', 'auf', 'fuer', 'den', 'dem', 'sich', 'auch', 'wird',
|
||||
'nach', 'bei', 'aus', 'wie', 'oder', 'wenn', 'noch', 'aber', 'hat', 'nur',
|
||||
'ueber', 'kann', 'als', 'ich', 'er', 'sie', 'es', 'wir', 'ihr', 'haben',
|
||||
'sein', 'werden', 'war', 'sind', 'muss', 'soll', 'dieser', 'diese', 'diesem'}
|
||||
|
||||
ENGLISH_FUNCTION_WORDS = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'to', 'of',
|
||||
'and', 'in', 'that', 'it', 'for', 'on', 'with', 'as', 'at', 'by', 'from',
|
||||
'or', 'but', 'not', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
|
||||
'would', 'can', 'could', 'should', 'may', 'might', 'this', 'they', 'you', 'he',
|
||||
'she', 'we', 'my', 'your', 'his', 'her', 'its', 'our', 'their', 'which'}
|
||||
|
||||
|
||||
# --- Data Classes ---
|
||||
|
||||
@dataclass
|
||||
class PageRegion:
|
||||
"""A detected region on the page."""
|
||||
type: str # 'column_en', 'column_de', 'column_example', 'page_ref', 'column_marker', 'header', 'footer'
|
||||
type: str # 'column_en', 'column_de', 'column_example', 'page_ref', 'column_marker', 'column_text', 'header', 'footer'
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
classification_confidence: float = 1.0 # 0.0-1.0
|
||||
classification_method: str = "" # 'content', 'position_enhanced', 'position_fallback'
|
||||
|
||||
|
||||
@dataclass
|
||||
class ColumnGeometry:
|
||||
"""Geometrisch erkannte Spalte vor Typ-Klassifikation."""
|
||||
index: int # 0-basiert, links->rechts
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
word_count: int
|
||||
words: List[Dict] # Wort-Dicts aus Tesseract (text, conf, left, top, ...)
|
||||
width_ratio: float # width / content_width (0.0-1.0)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -840,22 +870,24 @@ def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegi
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Stage 5b: Word-Based Layout Analysis (5-Column Detection)
|
||||
# Stage 5b: Word-Based Layout Analysis (Two-Phase Column Detection)
|
||||
# =============================================================================
|
||||
|
||||
def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> List[PageRegion]:
|
||||
"""Detect columns by clustering left-aligned word positions from Tesseract.
|
||||
# --- Phase A: Geometry Detection ---
|
||||
|
||||
This approach works better than projection profiles for vocabulary pages
|
||||
with 5 columns (page_ref, EN, DE, markers, examples) because it detects
|
||||
column starts where left-aligned words cluster.
|
||||
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]:
|
||||
"""Detect column geometry by clustering left-aligned word positions.
|
||||
|
||||
Phase A of the two-phase column detection. Returns untyped column
|
||||
geometries with their words for subsequent content-based classification.
|
||||
|
||||
Args:
|
||||
ocr_img: Binarized grayscale image for layout analysis.
|
||||
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
||||
|
||||
Returns:
|
||||
List of PageRegion objects. Falls back to analyze_layout() if < 3 clusters.
|
||||
Tuple of (geometries, left_x, right_x, top_y, bottom_y) or None if
|
||||
fewer than 3 clusters are found (signals fallback needed).
|
||||
"""
|
||||
h, w = ocr_img.shape[:2]
|
||||
|
||||
@@ -870,7 +902,7 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
|
||||
top_y, bottom_y = 0, h
|
||||
content_w, content_h = w, h
|
||||
|
||||
logger.info(f"LayoutByWords: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
||||
logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
||||
f"y=[{top_y}..{bottom_y}] ({content_h}px)")
|
||||
|
||||
# --- Get word bounding boxes from Tesseract ---
|
||||
@@ -880,13 +912,12 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
|
||||
try:
|
||||
data = pytesseract.image_to_data(pil_img, lang='eng+deu', output_type=pytesseract.Output.DICT)
|
||||
except Exception as e:
|
||||
logger.warning(f"LayoutByWords: Tesseract image_to_data failed: {e}, falling back")
|
||||
layout_img = create_layout_image(dewarped_bgr)
|
||||
return analyze_layout(layout_img, ocr_img)
|
||||
logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}")
|
||||
return None
|
||||
|
||||
# Collect left edges of recognized words (confidence > 30)
|
||||
# Collect words with their full info
|
||||
word_dicts = []
|
||||
left_edges = []
|
||||
word_info = [] # (left, top, width, height, text, conf)
|
||||
n_words = len(data['text'])
|
||||
for i in range(n_words):
|
||||
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
|
||||
@@ -898,20 +929,22 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
|
||||
bw = int(data['width'][i])
|
||||
bh = int(data['height'][i])
|
||||
left_edges.append(lx)
|
||||
word_info.append((lx, ty, bw, bh, text, conf))
|
||||
word_dicts.append({
|
||||
'text': text, 'conf': conf,
|
||||
'left': lx, 'top': ty, 'width': bw, 'height': bh,
|
||||
})
|
||||
|
||||
if len(left_edges) < 5:
|
||||
logger.warning(f"LayoutByWords: only {len(left_edges)} words detected, falling back")
|
||||
layout_img = create_layout_image(dewarped_bgr)
|
||||
return analyze_layout(layout_img, ocr_img)
|
||||
logger.warning(f"ColumnGeometry: only {len(left_edges)} words detected")
|
||||
return None
|
||||
|
||||
logger.info(f"LayoutByWords: {len(left_edges)} words detected in content area")
|
||||
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
|
||||
|
||||
# --- Cluster left edges ---
|
||||
tolerance = max(10, int(content_w * 0.01)) # ~1% of content width
|
||||
tolerance = max(10, int(content_w * 0.01))
|
||||
sorted_edges = sorted(left_edges)
|
||||
|
||||
clusters = [] # list of (center_x, count, edges)
|
||||
clusters = []
|
||||
current_cluster = [sorted_edges[0]]
|
||||
for edge in sorted_edges[1:]:
|
||||
if edge - current_cluster[-1] <= tolerance:
|
||||
@@ -925,20 +958,18 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
|
||||
significant = [(int(np.mean(c)), len(c), min(c), max(c)) for c in clusters if len(c) >= 2]
|
||||
significant.sort(key=lambda s: s[0])
|
||||
|
||||
logger.info(f"LayoutByWords: {len(significant)} significant clusters "
|
||||
logger.info(f"ColumnGeometry: {len(significant)} significant clusters "
|
||||
f"(from {len(clusters)} total): "
|
||||
f"{[(s[0]+left_x, s[1]) for s in significant[:10]]}")
|
||||
|
||||
if len(significant) < 3:
|
||||
logger.info("LayoutByWords: < 3 clusters, falling back to projection-based layout")
|
||||
layout_img = create_layout_image(dewarped_bgr)
|
||||
return analyze_layout(layout_img, ocr_img)
|
||||
logger.info("ColumnGeometry: < 3 clusters, signaling fallback")
|
||||
return None
|
||||
|
||||
# --- Merge clusters that are very close (within 2*tolerance) ---
|
||||
merged = [significant[0]]
|
||||
for s in significant[1:]:
|
||||
if s[0] - merged[-1][0] < 2 * tolerance:
|
||||
# Merge: weighted average position, sum counts
|
||||
prev = merged[-1]
|
||||
total = prev[1] + s[1]
|
||||
avg_x = (prev[0] * prev[1] + s[0] * s[1]) // total
|
||||
@@ -946,114 +977,562 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
|
||||
else:
|
||||
merged.append(s)
|
||||
|
||||
logger.info(f"LayoutByWords: {len(merged)} clusters after merging: "
|
||||
logger.info(f"ColumnGeometry: {len(merged)} clusters after merging: "
|
||||
f"{[(m[0]+left_x, m[1]) for m in merged]}")
|
||||
|
||||
if len(merged) < 3:
|
||||
logger.info("LayoutByWords: < 3 merged clusters, falling back")
|
||||
layout_img = create_layout_image(dewarped_bgr)
|
||||
return analyze_layout(layout_img, ocr_img)
|
||||
logger.info("ColumnGeometry: < 3 merged clusters, signaling fallback")
|
||||
return None
|
||||
|
||||
# --- Derive column boundaries ---
|
||||
# 2mm margin before each cluster start (~8px at 100dpi, scale with image)
|
||||
margin_px = max(5, int(content_w * 0.005))
|
||||
|
||||
col_starts = [] # (abs_x, word_count)
|
||||
col_starts = []
|
||||
for center_x, count, min_edge, max_edge in merged:
|
||||
abs_start = max(0, left_x + min_edge - margin_px)
|
||||
col_starts.append((abs_start, count))
|
||||
|
||||
# Calculate column widths
|
||||
col_defs = [] # (abs_x, width, word_count)
|
||||
# Calculate column widths and assign words to columns
|
||||
geometries = []
|
||||
for i, (start_x, count) in enumerate(col_starts):
|
||||
if i + 1 < len(col_starts):
|
||||
col_width = col_starts[i + 1][0] - start_x
|
||||
else:
|
||||
col_width = right_x - start_x
|
||||
col_defs.append((start_x, col_width, count))
|
||||
|
||||
logger.info(f"LayoutByWords: column definitions: "
|
||||
f"{[(d[0], d[1], d[2]) for d in col_defs]}")
|
||||
# Assign words to this column based on left edge
|
||||
col_left_rel = start_x - left_x
|
||||
col_right_rel = col_left_rel + col_width
|
||||
col_words = [w for w in word_dicts
|
||||
if col_left_rel <= w['left'] < col_right_rel]
|
||||
|
||||
# --- Assign types based on rules ---
|
||||
geometries.append(ColumnGeometry(
|
||||
index=i,
|
||||
x=start_x,
|
||||
y=top_y,
|
||||
width=col_width,
|
||||
height=content_h,
|
||||
word_count=len(col_words),
|
||||
words=col_words,
|
||||
width_ratio=col_width / content_w if content_w > 0 else 0.0,
|
||||
))
|
||||
|
||||
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
|
||||
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
|
||||
|
||||
return (geometries, left_x, right_x, top_y, bottom_y)
|
||||
|
||||
|
||||
# --- Phase B: Content-Based Classification ---
|
||||
|
||||
def _score_language(words: List[Dict]) -> Dict[str, float]:
|
||||
"""Score the language of a column's words.
|
||||
|
||||
Analyzes function words, umlauts, and capitalization patterns
|
||||
to determine whether text is English or German.
|
||||
|
||||
Args:
|
||||
words: List of word dicts with 'text' and 'conf' keys.
|
||||
|
||||
Returns:
|
||||
Dict with 'eng' and 'deu' scores (0.0-1.0).
|
||||
"""
|
||||
if not words:
|
||||
return {'eng': 0.0, 'deu': 0.0}
|
||||
|
||||
# Only consider words with decent confidence
|
||||
good_words = [w['text'].lower() for w in words if w.get('conf', 0) > 40 and len(w['text']) > 0]
|
||||
if not good_words:
|
||||
return {'eng': 0.0, 'deu': 0.0}
|
||||
|
||||
total = len(good_words)
|
||||
en_hits = sum(1 for w in good_words if w in ENGLISH_FUNCTION_WORDS)
|
||||
de_hits = sum(1 for w in good_words if w in GERMAN_FUNCTION_WORDS)
|
||||
|
||||
# Check for umlauts (strong German signal)
|
||||
raw_texts = [w['text'] for w in words if w.get('conf', 0) > 40]
|
||||
umlaut_count = sum(1 for t in raw_texts
|
||||
for c in t if c in 'äöüÄÖÜß')
|
||||
|
||||
# German capitalization: nouns are capitalized mid-sentence
|
||||
# Count words that start with uppercase but aren't at position 0
|
||||
cap_words = sum(1 for t in raw_texts if t[0].isupper() and len(t) > 2)
|
||||
|
||||
en_score = en_hits / total if total > 0 else 0.0
|
||||
de_score = de_hits / total if total > 0 else 0.0
|
||||
|
||||
# Boost German score for umlauts
|
||||
if umlaut_count > 0:
|
||||
de_score = min(1.0, de_score + 0.15 * min(umlaut_count, 5))
|
||||
|
||||
# Boost German score for high capitalization ratio (typical for German nouns)
|
||||
if total > 5:
|
||||
cap_ratio = cap_words / total
|
||||
if cap_ratio > 0.3:
|
||||
de_score = min(1.0, de_score + 0.1)
|
||||
|
||||
return {'eng': round(en_score, 3), 'deu': round(de_score, 3)}
|
||||
|
||||
|
||||
def _score_role(geom: ColumnGeometry) -> Dict[str, float]:
|
||||
"""Score the role of a column based on its geometry and content patterns.
|
||||
|
||||
Args:
|
||||
geom: ColumnGeometry with words and dimensions.
|
||||
|
||||
Returns:
|
||||
Dict with role scores: 'reference', 'marker', 'sentence', 'vocabulary'.
|
||||
"""
|
||||
scores = {'reference': 0.0, 'marker': 0.0, 'sentence': 0.0, 'vocabulary': 0.0}
|
||||
|
||||
if not geom.words:
|
||||
return scores
|
||||
|
||||
texts = [w['text'] for w in geom.words if w.get('conf', 0) > 40]
|
||||
if not texts:
|
||||
return scores
|
||||
|
||||
avg_word_len = sum(len(t) for t in texts) / len(texts)
|
||||
has_punctuation = sum(1 for t in texts if any(c in t for c in '.!?;:,'))
|
||||
digit_words = sum(1 for t in texts if any(c.isdigit() for c in t))
|
||||
digit_ratio = digit_words / len(texts) if texts else 0.0
|
||||
|
||||
# Reference: narrow + mostly numbers/page references
|
||||
if geom.width_ratio < 0.12:
|
||||
scores['reference'] = 0.5
|
||||
if digit_ratio > 0.4:
|
||||
scores['reference'] = min(1.0, 0.5 + digit_ratio * 0.5)
|
||||
|
||||
# Marker: very narrow + few short entries
|
||||
if geom.width_ratio < 0.08 and geom.word_count <= 10:
|
||||
scores['marker'] = 0.7
|
||||
if avg_word_len < 4:
|
||||
scores['marker'] = 0.9
|
||||
|
||||
# Sentence: longer words + punctuation present
|
||||
if geom.width_ratio > 0.15 and has_punctuation > 2:
|
||||
scores['sentence'] = 0.3 + min(0.5, has_punctuation / len(texts))
|
||||
if avg_word_len > 4:
|
||||
scores['sentence'] = min(1.0, scores['sentence'] + 0.2)
|
||||
|
||||
# Vocabulary: medium width + medium word length
|
||||
if 0.10 < geom.width_ratio < 0.45:
|
||||
scores['vocabulary'] = 0.4
|
||||
if 3 < avg_word_len < 8:
|
||||
scores['vocabulary'] = min(1.0, scores['vocabulary'] + 0.3)
|
||||
|
||||
return {k: round(v, 3) for k, v in scores.items()}
|
||||
|
||||
|
||||
def classify_column_types(geometries: List[ColumnGeometry],
|
||||
content_w: int,
|
||||
top_y: int,
|
||||
img_w: int,
|
||||
img_h: int,
|
||||
bottom_y: int) -> List[PageRegion]:
|
||||
"""Classify column types using a 3-level fallback chain.
|
||||
|
||||
Level 1: Content-based (language + role scoring)
|
||||
Level 2: Position + language (old rules enhanced with language detection)
|
||||
Level 3: Pure position (exact old code, no regression)
|
||||
|
||||
Args:
|
||||
geometries: List of ColumnGeometry from Phase A.
|
||||
content_w: Total content width.
|
||||
top_y: Top Y of content area.
|
||||
img_w: Full image width.
|
||||
img_h: Full image height.
|
||||
bottom_y: Bottom Y of content area.
|
||||
|
||||
Returns:
|
||||
List of PageRegion with types, confidence, and method.
|
||||
"""
|
||||
content_h = bottom_y - top_y
|
||||
|
||||
# Special case: single column → plain text page
|
||||
if len(geometries) == 1:
|
||||
geom = geometries[0]
|
||||
return [PageRegion(
|
||||
type='column_text', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=geom.height,
|
||||
classification_confidence=0.9,
|
||||
classification_method='content',
|
||||
)]
|
||||
|
||||
# --- Score all columns ---
|
||||
lang_scores = [_score_language(g.words) for g in geometries]
|
||||
role_scores = [_score_role(g) for g in geometries]
|
||||
|
||||
logger.info(f"ClassifyColumns: language scores: "
|
||||
f"{[(g.index, ls) for g, ls in zip(geometries, lang_scores)]}")
|
||||
logger.info(f"ClassifyColumns: role scores: "
|
||||
f"{[(g.index, rs) for g, rs in zip(geometries, role_scores)]}")
|
||||
|
||||
# --- Level 1: Content-based classification ---
|
||||
regions = _classify_by_content(geometries, lang_scores, role_scores, content_w, content_h)
|
||||
if regions is not None:
|
||||
logger.info("ClassifyColumns: Level 1 (content-based) succeeded")
|
||||
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
||||
return regions
|
||||
|
||||
# --- Level 2: Position + language enhanced ---
|
||||
regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h)
|
||||
if regions is not None:
|
||||
logger.info("ClassifyColumns: Level 2 (position+language) succeeded")
|
||||
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
||||
return regions
|
||||
|
||||
# --- Level 3: Pure position fallback (old code, no regression) ---
|
||||
logger.info("ClassifyColumns: Level 3 (position fallback)")
|
||||
regions = _classify_by_position_fallback(geometries, content_w, content_h)
|
||||
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
||||
return regions
|
||||
|
||||
|
||||
def _classify_by_content(geometries: List[ColumnGeometry],
|
||||
lang_scores: List[Dict[str, float]],
|
||||
role_scores: List[Dict[str, float]],
|
||||
content_w: int,
|
||||
content_h: int) -> Optional[List[PageRegion]]:
|
||||
"""Level 1: Classify columns purely by content analysis.
|
||||
|
||||
Requires clear language signals to distinguish EN/DE columns.
|
||||
Returns None if language signals are too weak.
|
||||
"""
|
||||
regions = []
|
||||
total_content_w = right_x - left_x
|
||||
untyped = list(range(len(col_defs))) # indices not yet assigned
|
||||
assigned = set()
|
||||
|
||||
# Rule 1: Leftmost narrow column (< 12% width) → page_ref
|
||||
if col_defs[0][1] < total_content_w * 0.12:
|
||||
# Step 1: Assign structural roles first (reference, marker)
|
||||
for i, (geom, rs) in enumerate(zip(geometries, role_scores)):
|
||||
if rs['reference'] >= 0.5 and geom.width_ratio < 0.12:
|
||||
regions.append(PageRegion(
|
||||
type='page_ref', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=rs['reference'],
|
||||
classification_method='content',
|
||||
))
|
||||
assigned.add(i)
|
||||
elif rs['marker'] >= 0.7 and geom.width_ratio < 0.08:
|
||||
regions.append(PageRegion(
|
||||
type='column_marker', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=rs['marker'],
|
||||
classification_method='content',
|
||||
))
|
||||
assigned.add(i)
|
||||
|
||||
# Step 2: Among remaining columns, find EN and DE by language scores
|
||||
remaining = [(i, geometries[i], lang_scores[i], role_scores[i])
|
||||
for i in range(len(geometries)) if i not in assigned]
|
||||
|
||||
if len(remaining) < 2:
|
||||
# Not enough columns for EN/DE pair
|
||||
if len(remaining) == 1:
|
||||
i, geom, ls, rs = remaining[0]
|
||||
regions.append(PageRegion(
|
||||
type='column_text', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=0.6,
|
||||
classification_method='content',
|
||||
))
|
||||
regions.sort(key=lambda r: r.x)
|
||||
return regions
|
||||
|
||||
# Check if we have enough language signal
|
||||
en_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['eng'] > ls['deu'] and ls['eng'] > 0.05]
|
||||
de_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['deu'] > ls['eng'] and ls['deu'] > 0.05]
|
||||
|
||||
if not en_candidates or not de_candidates:
|
||||
# Language signals too weak for content-based classification
|
||||
logger.info("ClassifyColumns: Level 1 failed - no clear EN/DE language split")
|
||||
return None
|
||||
|
||||
# Pick the best EN and DE candidates
|
||||
best_en = max(en_candidates, key=lambda x: x[2]['eng'])
|
||||
best_de = max(de_candidates, key=lambda x: x[2]['deu'])
|
||||
|
||||
if best_en[0] == best_de[0]:
|
||||
# Same column scored highest for both — ambiguous
|
||||
logger.info("ClassifyColumns: Level 1 failed - same column highest for EN and DE")
|
||||
return None
|
||||
|
||||
en_conf = best_en[2]['eng']
|
||||
de_conf = best_de[2]['deu']
|
||||
|
||||
regions.append(PageRegion(
|
||||
type='column_en', x=best_en[1].x, y=best_en[1].y,
|
||||
width=best_en[1].width, height=content_h,
|
||||
classification_confidence=round(en_conf, 2),
|
||||
classification_method='content',
|
||||
))
|
||||
assigned.add(best_en[0])
|
||||
|
||||
regions.append(PageRegion(
|
||||
type='column_de', x=best_de[1].x, y=best_de[1].y,
|
||||
width=best_de[1].width, height=content_h,
|
||||
classification_confidence=round(de_conf, 2),
|
||||
classification_method='content',
|
||||
))
|
||||
assigned.add(best_de[0])
|
||||
|
||||
# Step 3: Remaining columns → example or text based on role scores
|
||||
for i, geom, ls, rs in remaining:
|
||||
if i in assigned:
|
||||
continue
|
||||
if rs['sentence'] > 0.4:
|
||||
regions.append(PageRegion(
|
||||
type='column_example', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=round(rs['sentence'], 2),
|
||||
classification_method='content',
|
||||
))
|
||||
else:
|
||||
regions.append(PageRegion(
|
||||
type='column_example', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=0.5,
|
||||
classification_method='content',
|
||||
))
|
||||
|
||||
regions.sort(key=lambda r: r.x)
|
||||
return regions
|
||||
|
||||
|
||||
def _classify_by_position_enhanced(geometries: List[ColumnGeometry],
|
||||
lang_scores: List[Dict[str, float]],
|
||||
content_w: int,
|
||||
content_h: int) -> Optional[List[PageRegion]]:
|
||||
"""Level 2: Position-based rules enhanced with language confirmation.
|
||||
|
||||
Uses the old positional heuristics but confirms EN/DE assignment
|
||||
with language scores (swapping if needed).
|
||||
"""
|
||||
regions = []
|
||||
untyped = list(range(len(geometries)))
|
||||
|
||||
# Rule 1: Leftmost narrow column → page_ref
|
||||
g0 = geometries[0]
|
||||
if g0.width_ratio < 0.12:
|
||||
regions.append(PageRegion(
|
||||
type='page_ref', x=col_defs[0][0], y=top_y,
|
||||
width=col_defs[0][1], height=content_h
|
||||
type='page_ref', x=g0.x, y=g0.y,
|
||||
width=g0.width, height=content_h,
|
||||
classification_confidence=0.8,
|
||||
classification_method='position_enhanced',
|
||||
))
|
||||
untyped.remove(0)
|
||||
logger.info(f"LayoutByWords: col 0 → page_ref (width={col_defs[0][1]}px, "
|
||||
f"{col_defs[0][1]*100/total_content_w:.1f}%)")
|
||||
|
||||
# Rule 2: Narrow column with few words (< 8% width, <= 8 words) → column_marker
|
||||
# Rule 2: Narrow columns with few words → marker
|
||||
for i in list(untyped):
|
||||
col_x, col_w, col_count = col_defs[i]
|
||||
if col_w < total_content_w * 0.08 and col_count <= 8:
|
||||
geom = geometries[i]
|
||||
if geom.width_ratio < 0.08 and geom.word_count <= 10:
|
||||
regions.append(PageRegion(
|
||||
type='column_marker', x=col_x, y=top_y,
|
||||
width=col_w, height=content_h
|
||||
type='column_marker', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=0.7,
|
||||
classification_method='position_enhanced',
|
||||
))
|
||||
untyped.remove(i)
|
||||
logger.info(f"LayoutByWords: col {i} → column_marker (width={col_w}px, "
|
||||
f"{col_w*100/total_content_w:.1f}%, words={col_count})")
|
||||
|
||||
# Rule 3: Rightmost remaining (widest or last) → column_example
|
||||
# Rule 3: Rightmost remaining → column_example (if 3+ remaining)
|
||||
if len(untyped) >= 3:
|
||||
last_idx = untyped[-1]
|
||||
geom = geometries[last_idx]
|
||||
regions.append(PageRegion(
|
||||
type='column_example', x=col_defs[last_idx][0], y=top_y,
|
||||
width=col_defs[last_idx][1], height=content_h
|
||||
type='column_example', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=0.7,
|
||||
classification_method='position_enhanced',
|
||||
))
|
||||
untyped.remove(last_idx)
|
||||
logger.info(f"LayoutByWords: col {last_idx} → column_example")
|
||||
|
||||
# Rule 4: First remaining → column_en, second → column_de
|
||||
# Rule 4: First two remaining → EN/DE, but check language to possibly swap
|
||||
if len(untyped) >= 2:
|
||||
idx_a = untyped[0]
|
||||
idx_b = untyped[1]
|
||||
ls_a = lang_scores[idx_a]
|
||||
ls_b = lang_scores[idx_b]
|
||||
|
||||
# Default: first=EN, second=DE (old behavior)
|
||||
en_idx, de_idx = idx_a, idx_b
|
||||
conf = 0.7
|
||||
|
||||
# Swap if language signals clearly indicate the opposite
|
||||
if ls_a['deu'] > ls_a['eng'] and ls_b['eng'] > ls_b['deu']:
|
||||
en_idx, de_idx = idx_b, idx_a
|
||||
conf = 0.85
|
||||
logger.info(f"ClassifyColumns: Level 2 swapped EN/DE based on language scores")
|
||||
|
||||
regions.append(PageRegion(
|
||||
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
||||
width=geometries[en_idx].width, height=content_h,
|
||||
classification_confidence=conf,
|
||||
classification_method='position_enhanced',
|
||||
))
|
||||
regions.append(PageRegion(
|
||||
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
||||
width=geometries[de_idx].width, height=content_h,
|
||||
classification_confidence=conf,
|
||||
classification_method='position_enhanced',
|
||||
))
|
||||
untyped = untyped[2:]
|
||||
elif len(untyped) == 1:
|
||||
idx = untyped[0]
|
||||
geom = geometries[idx]
|
||||
regions.append(PageRegion(
|
||||
type='column_en', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=0.5,
|
||||
classification_method='position_enhanced',
|
||||
))
|
||||
untyped = []
|
||||
|
||||
# Remaining → example
|
||||
for idx in untyped:
|
||||
geom = geometries[idx]
|
||||
regions.append(PageRegion(
|
||||
type='column_example', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=0.5,
|
||||
classification_method='position_enhanced',
|
||||
))
|
||||
|
||||
regions.sort(key=lambda r: r.x)
|
||||
return regions
|
||||
|
||||
|
||||
def _classify_by_position_fallback(geometries: List[ColumnGeometry],
|
||||
content_w: int,
|
||||
content_h: int) -> List[PageRegion]:
|
||||
"""Level 3: Pure position-based fallback (identical to old code).
|
||||
|
||||
Guarantees no regression from the previous behavior.
|
||||
"""
|
||||
regions = []
|
||||
untyped = list(range(len(geometries)))
|
||||
|
||||
# Rule 1: Leftmost narrow column → page_ref
|
||||
g0 = geometries[0]
|
||||
if g0.width_ratio < 0.12:
|
||||
regions.append(PageRegion(
|
||||
type='page_ref', x=g0.x, y=g0.y,
|
||||
width=g0.width, height=content_h,
|
||||
classification_confidence=1.0,
|
||||
classification_method='position_fallback',
|
||||
))
|
||||
untyped.remove(0)
|
||||
|
||||
# Rule 2: Narrow + few words → marker
|
||||
for i in list(untyped):
|
||||
geom = geometries[i]
|
||||
if geom.width_ratio < 0.08 and geom.word_count <= 8:
|
||||
regions.append(PageRegion(
|
||||
type='column_marker', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=1.0,
|
||||
classification_method='position_fallback',
|
||||
))
|
||||
untyped.remove(i)
|
||||
|
||||
# Rule 3: Rightmost remaining → example (if 3+)
|
||||
if len(untyped) >= 3:
|
||||
last_idx = untyped[-1]
|
||||
geom = geometries[last_idx]
|
||||
regions.append(PageRegion(
|
||||
type='column_example', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=1.0,
|
||||
classification_method='position_fallback',
|
||||
))
|
||||
untyped.remove(last_idx)
|
||||
|
||||
# Rule 4: First remaining → EN, second → DE
|
||||
if len(untyped) >= 2:
|
||||
en_idx = untyped[0]
|
||||
de_idx = untyped[1]
|
||||
regions.append(PageRegion(
|
||||
type='column_en', x=col_defs[en_idx][0], y=top_y,
|
||||
width=col_defs[en_idx][1], height=content_h
|
||||
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
||||
width=geometries[en_idx].width, height=content_h,
|
||||
classification_confidence=1.0,
|
||||
classification_method='position_fallback',
|
||||
))
|
||||
regions.append(PageRegion(
|
||||
type='column_de', x=col_defs[de_idx][0], y=top_y,
|
||||
width=col_defs[de_idx][1], height=content_h
|
||||
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
||||
width=geometries[de_idx].width, height=content_h,
|
||||
classification_confidence=1.0,
|
||||
classification_method='position_fallback',
|
||||
))
|
||||
untyped = untyped[2:]
|
||||
logger.info(f"LayoutByWords: col {en_idx} → column_en, col {de_idx} → column_de")
|
||||
elif len(untyped) == 1:
|
||||
# Only one left — call it column_en
|
||||
idx = untyped[0]
|
||||
geom = geometries[idx]
|
||||
regions.append(PageRegion(
|
||||
type='column_en', x=col_defs[idx][0], y=top_y,
|
||||
width=col_defs[idx][1], height=content_h
|
||||
type='column_en', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=1.0,
|
||||
classification_method='position_fallback',
|
||||
))
|
||||
untyped = []
|
||||
|
||||
# Any remaining untyped columns get generic column_example type
|
||||
for idx in untyped:
|
||||
geom = geometries[idx]
|
||||
regions.append(PageRegion(
|
||||
type='column_example', x=col_defs[idx][0], y=top_y,
|
||||
width=col_defs[idx][1], height=content_h
|
||||
type='column_example', x=geom.x, y=geom.y,
|
||||
width=geom.width, height=content_h,
|
||||
classification_confidence=1.0,
|
||||
classification_method='position_fallback',
|
||||
))
|
||||
|
||||
# Sort by x position for consistent output
|
||||
regions.sort(key=lambda r: r.x)
|
||||
return regions
|
||||
|
||||
# Add header/footer
|
||||
|
||||
def _add_header_footer(regions: List[PageRegion], top_y: int, bottom_y: int,
|
||||
img_w: int, img_h: int) -> None:
|
||||
"""Add header/footer regions in-place."""
|
||||
if top_y > 10:
|
||||
regions.append(PageRegion(type='header', x=0, y=0, width=w, height=top_y))
|
||||
if bottom_y < h - 10:
|
||||
regions.append(PageRegion(type='footer', x=0, y=bottom_y, width=w, height=h - bottom_y))
|
||||
regions.append(PageRegion(type='header', x=0, y=0, width=img_w, height=top_y))
|
||||
if bottom_y < img_h - 10:
|
||||
regions.append(PageRegion(type='footer', x=0, y=bottom_y, width=img_w, height=img_h - bottom_y))
|
||||
|
||||
|
||||
# --- Main Entry Point ---
|
||||
|
||||
def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> List[PageRegion]:
|
||||
"""Detect columns using two-phase approach: geometry then content classification.
|
||||
|
||||
Phase A: detect_column_geometry() — clustering word positions into columns.
|
||||
Phase B: classify_column_types() — content-based type assignment with fallback.
|
||||
|
||||
Falls back to projection-based analyze_layout() if geometry detection fails.
|
||||
|
||||
Args:
|
||||
ocr_img: Binarized grayscale image for layout analysis.
|
||||
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
||||
|
||||
Returns:
|
||||
List of PageRegion objects with types, confidence, and method.
|
||||
"""
|
||||
h, w = ocr_img.shape[:2]
|
||||
|
||||
# Phase A: Geometry detection
|
||||
result = detect_column_geometry(ocr_img, dewarped_bgr)
|
||||
|
||||
if result is None:
|
||||
# Fallback to projection-based layout
|
||||
logger.info("LayoutByWords: geometry detection failed, falling back to projection profiles")
|
||||
layout_img = create_layout_image(dewarped_bgr)
|
||||
return analyze_layout(layout_img, ocr_img)
|
||||
|
||||
geometries, left_x, right_x, top_y, bottom_y = result
|
||||
content_w = right_x - left_x
|
||||
|
||||
# Phase B: Content-based classification
|
||||
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y)
|
||||
|
||||
col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref'])
|
||||
logger.info(f"LayoutByWords: {col_count} columns detected: "
|
||||
f"{[(r.type, r.x, r.width) for r in regions if r.type not in ('header','footer')]}")
|
||||
methods = set(r.classification_method for r in regions if r.classification_method)
|
||||
logger.info(f"LayoutByWords: {col_count} columns detected (methods: {methods}): "
|
||||
f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer')]}")
|
||||
|
||||
return regions
|
||||
|
||||
@@ -1276,6 +1755,11 @@ def match_lines_to_vocab(ocr_results: Dict[str, List[Dict]],
|
||||
Returns:
|
||||
List of VocabRow objects.
|
||||
"""
|
||||
# If no vocabulary columns detected (e.g. plain text page), return empty
|
||||
if 'column_en' not in ocr_results and 'column_de' not in ocr_results:
|
||||
logger.info("match_lines_to_vocab: no column_en/column_de in OCR results, returning empty")
|
||||
return []
|
||||
|
||||
# Group words into lines per column
|
||||
en_lines = _group_words_into_lines(ocr_results.get('column_en', []), y_tolerance_px)
|
||||
de_lines = _group_words_into_lines(ocr_results.get('column_de', []), y_tolerance_px)
|
||||
|
||||
@@ -648,8 +648,16 @@ async def detect_columns(session_id: str):
|
||||
duration = time.time() - t0
|
||||
|
||||
columns = [asdict(r) for r in regions]
|
||||
|
||||
# Determine classification methods used
|
||||
methods = list(set(
|
||||
c.get("classification_method", "") for c in columns
|
||||
if c.get("classification_method")
|
||||
))
|
||||
|
||||
column_result = {
|
||||
"columns": columns,
|
||||
"classification_methods": methods,
|
||||
"duration_seconds": round(duration, 2),
|
||||
}
|
||||
|
||||
@@ -742,6 +750,7 @@ async def _get_columns_overlay(session_id: str) -> Response:
|
||||
"column_en": (255, 180, 0), # Blue
|
||||
"column_de": (0, 200, 0), # Green
|
||||
"column_example": (0, 140, 255), # Orange
|
||||
"column_text": (200, 200, 0), # Cyan/Turquoise
|
||||
"page_ref": (200, 0, 200), # Purple
|
||||
"column_marker": (0, 0, 220), # Red
|
||||
"header": (128, 128, 128), # Gray
|
||||
@@ -760,8 +769,11 @@ async def _get_columns_overlay(session_id: str) -> Response:
|
||||
# Solid border
|
||||
cv2.rectangle(img, (x, y), (x + w, y + h), color, 3)
|
||||
|
||||
# Label
|
||||
# Label with confidence
|
||||
label = col.get("type", "unknown").replace("column_", "").upper()
|
||||
conf = col.get("classification_confidence")
|
||||
if conf is not None and conf < 1.0:
|
||||
label = f"{label} {int(conf * 100)}%"
|
||||
cv2.putText(img, label, (x + 10, y + 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user