Files
breakpilot-lehrer/klausur-service/backend/cv_layout_column_refine.py
Benjamin Admin 9ba420fa91
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 42s
CI / test-go-edu-search (push) Successful in 34s
CI / test-python-klausur (push) Failing after 2m51s
CI / test-python-agent-core (push) Successful in 21s
CI / test-nodejs-website (push) Successful in 29s
Fix: Remove broken getKlausurApiUrl and clean up empty lines
sed replacement left orphaned hostname references in story page
and empty lines in getApiBase functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 16:02:04 +02:00

459 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Post-processing refinements for column geometry.
Extracted from cv_layout_columns.py — contains:
- _detect_sub_columns() (sub-column detection via left-edge alignment)
- _split_broad_columns() (broad column splitting via word-coverage gaps)
- expand_narrow_columns() (narrow column expansion into whitespace)
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import statistics
from typing import Dict, List, Optional, Tuple
import numpy as np
from cv_vocab_types import ColumnGeometry
logger = logging.getLogger(__name__)
def _detect_sub_columns(
geometries: List[ColumnGeometry],
content_w: int,
left_x: int = 0,
top_y: int = 0,
header_y: Optional[int] = None,
footer_y: Optional[int] = None,
_edge_tolerance: int = 8,
_min_col_start_ratio: float = 0.10,
) -> List[ColumnGeometry]:
"""Split columns that contain internal sub-columns based on left-edge alignment.
For each column, clusters word left-edges into alignment bins (within
``_edge_tolerance`` px). The leftmost bin whose word count reaches
``_min_col_start_ratio`` of the column total is treated as the true column
start. Any words to the left of that bin form a sub-column, provided they
number >= 2 and < 35 % of total.
Word ``left`` values are relative to the content ROI (offset by *left_x*),
while ``ColumnGeometry.x`` is in absolute image coordinates. *left_x*
bridges the two coordinate systems.
If *header_y* / *footer_y* are provided (absolute y-coordinates), words
in header/footer regions are excluded from alignment clustering to avoid
polluting the bins with page numbers or chapter titles. Word ``top``
values are relative to *top_y*.
Returns a new list of ColumnGeometry — potentially longer than the input.
"""
if content_w <= 0:
return geometries
result: List[ColumnGeometry] = []
for geo in geometries:
# Only consider wide-enough columns with enough words
if geo.width_ratio < 0.15 or geo.word_count < 5:
result.append(geo)
continue
# Collect left-edges of confident words, excluding header/footer
# Convert header_y/footer_y from absolute to relative (word 'top' is relative to top_y)
min_top_rel = (header_y - top_y) if header_y is not None else None
max_top_rel = (footer_y - top_y) if footer_y is not None else None
confident = [w for w in geo.words
if w.get('conf', 0) >= 30
and (min_top_rel is None or w['top'] >= min_top_rel)
and (max_top_rel is None or w['top'] <= max_top_rel)]
if len(confident) < 3:
result.append(geo)
continue
# --- Cluster left-edges into alignment bins ---
sorted_edges = sorted(w['left'] for w in confident)
bins: List[Tuple[int, int, int, int]] = [] # (center, count, min_edge, max_edge)
cur = [sorted_edges[0]]
for i in range(1, len(sorted_edges)):
if sorted_edges[i] - cur[-1] <= _edge_tolerance:
cur.append(sorted_edges[i])
else:
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
cur = [sorted_edges[i]]
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
# --- Find the leftmost bin qualifying as a real column start ---
total = len(confident)
min_count = max(3, int(total * _min_col_start_ratio))
col_start_bin = None
for b in bins:
if b[1] >= min_count:
col_start_bin = b
break
if col_start_bin is None:
result.append(geo)
continue
# Words to the left of the column-start bin are sub-column candidates
split_threshold = col_start_bin[2] - _edge_tolerance
sub_words = [w for w in geo.words if w['left'] < split_threshold]
main_words = [w for w in geo.words if w['left'] >= split_threshold]
# Count only body words (excluding header/footer) for the threshold check
# so that header/footer words don't artificially trigger a split.
sub_body = [w for w in sub_words
if (min_top_rel is None or w['top'] >= min_top_rel)
and (max_top_rel is None or w['top'] <= max_top_rel)]
if len(sub_body) < 2 or len(sub_body) / len(geo.words) >= 0.35:
result.append(geo)
continue
# --- Guard against inline markers (bullet points, numbering) ---
# Bullet points like "1.", "2.", "•", "-" sit close to the main
# column text and are part of the cell, not a separate column.
# Only split if the horizontal gap between the rightmost sub-word
# and the main column start is large enough.
max_sub_right = max(w['left'] + w.get('width', 0) for w in sub_words)
gap_to_main = col_start_bin[2] - max_sub_right # px gap
median_heights = [w.get('height', 20) for w in confident]
med_h = statistics.median(median_heights) if median_heights else 20
min_gap = max(med_h * 1.2, 20) # at least 1.2× word height or 20px
if gap_to_main < min_gap:
logger.debug(
"SubColumnSplit: column idx=%d skipped — gap=%dpx < min=%dpx "
"(likely inline markers, not a sub-column)",
geo.index, gap_to_main, min_gap)
result.append(geo)
continue
# --- Build two sub-column geometries ---
# Word 'left' values are relative to left_x; geo.x is absolute.
# Convert the split position from relative to absolute coordinates.
max_sub_left = max(w['left'] for w in sub_words)
split_rel = (max_sub_left + col_start_bin[2]) // 2
split_abs = split_rel + left_x
sub_x = geo.x
sub_width = split_abs - geo.x
main_x = split_abs
main_width = (geo.x + geo.width) - split_abs
if sub_width <= 0 or main_width <= 0:
result.append(geo)
continue
sub_geo = ColumnGeometry(
index=0,
x=sub_x,
y=geo.y,
width=sub_width,
height=geo.height,
word_count=len(sub_words),
words=sub_words,
width_ratio=sub_width / content_w if content_w > 0 else 0.0,
is_sub_column=True,
)
main_geo = ColumnGeometry(
index=0,
x=main_x,
y=geo.y,
width=main_width,
height=geo.height,
word_count=len(main_words),
words=main_words,
width_ratio=main_width / content_w if content_w > 0 else 0.0,
is_sub_column=True,
)
result.append(sub_geo)
result.append(main_geo)
logger.info(
f"SubColumnSplit: column idx={geo.index} split at abs_x={split_abs} "
f"(rel={split_rel}), sub={len(sub_words)} words, "
f"main={len(main_words)} words, "
f"col_start_bin=({col_start_bin[0]}, n={col_start_bin[1]})"
)
# Re-index by left-to-right order
result.sort(key=lambda g: g.x)
for i, g in enumerate(result):
g.index = i
return result
def _split_broad_columns(
geometries: List[ColumnGeometry],
content_w: int,
left_x: int = 0,
_broad_threshold: float = 0.35,
_min_gap_px: int = 15,
_min_words_per_split: int = 5,
) -> List[ColumnGeometry]:
"""Split overly broad columns that contain two language blocks (EN+DE).
Uses word-coverage gap analysis: builds a per-pixel coverage array from the
words inside each broad column, finds the largest horizontal gap, and splits
the column at that gap.
Args:
geometries: Column geometries from _detect_sub_columns.
content_w: Width of the content area in pixels.
left_x: Left edge of content ROI in absolute image coordinates.
_broad_threshold: Minimum width_ratio to consider a column "broad".
_min_gap_px: Minimum gap width (pixels) to trigger a split.
_min_words_per_split: Both halves must have at least this many words.
Returns:
Updated list of ColumnGeometry (possibly with more columns).
"""
result: List[ColumnGeometry] = []
logger.info(f"SplitBroadCols: input {len(geometries)} cols: "
f"{[(g.index, g.x, g.width, g.word_count, round(g.width_ratio, 3)) for g in geometries]}")
for geo in geometries:
if geo.width_ratio <= _broad_threshold or len(geo.words) < 10:
result.append(geo)
continue
# Build word-coverage array (per pixel within column)
col_left_rel = geo.x - left_x # column left in content-relative coords
coverage = np.zeros(geo.width, dtype=np.float32)
for wd in geo.words:
# wd['left'] is relative to left_x (content ROI)
wl = wd['left'] - col_left_rel
wr = wl + wd.get('width', 0)
wl = max(0, int(wl))
wr = min(geo.width, int(wr))
if wr > wl:
coverage[wl:wr] += 1.0
# Light smoothing (kernel=3px) to avoid noise
if len(coverage) > 3:
kernel = np.ones(3, dtype=np.float32) / 3.0
coverage = np.convolve(coverage, kernel, mode='same')
# Normalise to [0, 1]
cmax = coverage.max()
if cmax > 0:
coverage /= cmax
# Find INTERNAL gaps where coverage < 0.5
# Exclude edge gaps (touching pixel 0 or geo.width) — those are margins.
low_mask = coverage < 0.5
all_gaps = []
_gs = None
for px in range(len(low_mask)):
if low_mask[px]:
if _gs is None:
_gs = px
else:
if _gs is not None:
all_gaps.append((_gs, px, px - _gs))
_gs = None
if _gs is not None:
all_gaps.append((_gs, len(low_mask), len(low_mask) - _gs))
# Filter: only internal gaps (not touching column edges)
_edge_margin = 10 # pixels from edge to ignore
internal_gaps = [g for g in all_gaps
if g[0] > _edge_margin and g[1] < geo.width - _edge_margin]
best_gap = max(internal_gaps, key=lambda g: g[2]) if internal_gaps else None
logger.info(f"SplitBroadCols: col {geo.index} all_gaps(>=5px): "
f"{[g for g in all_gaps if g[2] >= 5]}, "
f"internal(>=5px): {[g for g in internal_gaps if g[2] >= 5]}, "
f"best={best_gap}")
if best_gap is None or best_gap[2] < _min_gap_px:
result.append(geo)
continue
gap_center = (best_gap[0] + best_gap[1]) // 2
# Split words by midpoint relative to gap
left_words = []
right_words = []
for wd in geo.words:
wl = wd['left'] - col_left_rel
mid = wl + wd.get('width', 0) / 2.0
if mid < gap_center:
left_words.append(wd)
else:
right_words.append(wd)
if len(left_words) < _min_words_per_split or len(right_words) < _min_words_per_split:
result.append(geo)
continue
# Build two new ColumnGeometry objects
split_x_abs = geo.x + gap_center
left_w = gap_center
right_w = geo.width - gap_center
left_geo = ColumnGeometry(
index=0,
x=geo.x,
y=geo.y,
width=left_w,
height=geo.height,
word_count=len(left_words),
words=left_words,
width_ratio=left_w / content_w if content_w else 0,
is_sub_column=True,
)
right_geo = ColumnGeometry(
index=0,
x=split_x_abs,
y=geo.y,
width=right_w,
height=geo.height,
word_count=len(right_words),
words=right_words,
width_ratio=right_w / content_w if content_w else 0,
is_sub_column=True,
)
logger.info(
f"SplitBroadCols: col {geo.index} SPLIT at gap_center={gap_center} "
f"(gap {best_gap[2]}px @ [{best_gap[0]}..{best_gap[1]}]), "
f"left={len(left_words)} words (w={left_w}), "
f"right={len(right_words)} words (w={right_w})"
)
result.append(left_geo)
result.append(right_geo)
# Re-index left-to-right
result.sort(key=lambda g: g.x)
for i, g in enumerate(result):
g.index = i
return result
def expand_narrow_columns(
geometries: List[ColumnGeometry],
content_w: int,
left_x: int,
word_dicts: List[Dict],
) -> List[ColumnGeometry]:
"""Expand narrow columns into adjacent whitespace gaps.
Narrow columns (marker, page_ref, < 10% content width) often lose
content at image edges due to residual shear. This expands them toward
the neighbouring column, but never past 40% of the gap or past the
nearest word in the neighbour.
Must be called AFTER _detect_sub_columns() so that sub-column splits
(which create the narrowest columns) have already happened.
"""
_NARROW_THRESHOLD_PCT = 10.0
_MIN_WORD_MARGIN = 4
if len(geometries) < 2:
return geometries
logger.info("ExpandNarrowCols: input %d cols: %s",
len(geometries),
[(i, g.x, g.width, round(g.width / content_w * 100, 1))
for i, g in enumerate(geometries)])
for i, g in enumerate(geometries):
col_pct = g.width / content_w * 100 if content_w > 0 else 100
if col_pct >= _NARROW_THRESHOLD_PCT:
continue
expanded = False
orig_pct = col_pct
# --- try expanding to the LEFT ---
if i > 0:
left_nb = geometries[i - 1]
# Gap can be 0 if sub-column split created adjacent columns.
# In that case, look at where the neighbor's rightmost words
# actually are — there may be unused space we can claim.
nb_words_right = [wd['left'] + wd.get('width', 0)
for wd in left_nb.words]
if nb_words_right:
rightmost_word_abs = left_x + max(nb_words_right)
safe_left_abs = rightmost_word_abs + _MIN_WORD_MARGIN
else:
# No words in neighbor → we can take up to neighbor's start
safe_left_abs = left_nb.x + _MIN_WORD_MARGIN
if safe_left_abs < g.x:
g.width += (g.x - safe_left_abs)
g.x = safe_left_abs
expanded = True
# --- try expanding to the RIGHT ---
if i + 1 < len(geometries):
right_nb = geometries[i + 1]
nb_words_left = [wd['left'] for wd in right_nb.words]
if nb_words_left:
leftmost_word_abs = left_x + min(nb_words_left)
safe_right_abs = leftmost_word_abs - _MIN_WORD_MARGIN
else:
safe_right_abs = right_nb.x + right_nb.width - _MIN_WORD_MARGIN
cur_right = g.x + g.width
if safe_right_abs > cur_right:
g.width = safe_right_abs - g.x
expanded = True
if expanded:
col_left_rel = g.x - left_x
col_right_rel = col_left_rel + g.width
g.words = [wd for wd in word_dicts
if col_left_rel <= wd['left'] < col_right_rel]
g.word_count = len(g.words)
g.width_ratio = g.width / content_w if content_w > 0 else 0.0
logger.info(
"ExpandNarrowCols: col %d (%.1f%%%.1f%%) x=%d w=%d words=%d",
i, orig_pct, g.width / content_w * 100, g.x, g.width, g.word_count)
# --- Shrink overlapping neighbors to match new boundaries ---
# Left neighbor: its right edge must not exceed our new left edge
if i > 0:
left_nb = geometries[i - 1]
nb_right = left_nb.x + left_nb.width
if nb_right > g.x:
left_nb.width = g.x - left_nb.x
if left_nb.width < 0:
left_nb.width = 0
left_nb.width_ratio = left_nb.width / content_w if content_w > 0 else 0.0
# Re-assign words
nb_left_rel = left_nb.x - left_x
nb_right_rel = nb_left_rel + left_nb.width
left_nb.words = [wd for wd in word_dicts
if nb_left_rel <= wd['left'] < nb_right_rel]
left_nb.word_count = len(left_nb.words)
# Right neighbor: its left edge must not be before our new right edge
if i + 1 < len(geometries):
right_nb = geometries[i + 1]
my_right = g.x + g.width
if right_nb.x < my_right:
old_right_edge = right_nb.x + right_nb.width
right_nb.x = my_right
right_nb.width = old_right_edge - right_nb.x
if right_nb.width < 0:
right_nb.width = 0
right_nb.width_ratio = right_nb.width / content_w if content_w > 0 else 0.0
# Re-assign words
nb_left_rel = right_nb.x - left_x
nb_right_rel = nb_left_rel + right_nb.width
right_nb.words = [wd for wd in word_dicts
if nb_left_rel <= wd['left'] < nb_right_rel]
right_nb.word_count = len(right_nb.words)
return geometries