"""Capability Execution Engine (Prototyp). Region-IR mit Runtime-Owner; Capabilities deklarieren claims/consumes/produces; Ausführungsreihenfolge wird aus dem Artefakt-Graphen ABGELEITET (topologisch), nicht hartkodiert. Realisiert C6/C7 als Pipeline-Stages mit Region-Ownership.""" from dataclasses import dataclass, field import reading_order as RO @dataclass class Region: id:int; bbox:tuple; type:str="unknown"; state:str="unclaimed"; owner:str=None @dataclass class Artifact: kind:str; payload:object; source_region:int=None class Capability: name=""; consumes=[]; produces=[]; residual=False def claims(self, region): return False def run(self, regions, page, artifacts): return [] class TableExtraction(Capability): name="C6_TableExtraction"; consumes=["table_region"]; produces=["table_units"] def claims(self, region): return region.type=="table" def run(self, regions, page, artifacts): out=[] for r in regions: crop=page.crop(_clamp(r.bbox, page)) tbls=crop.find_tables() rows=tbls[0].extract() if tbls else [] out.append(Artifact("table_units", {"region":r.id,"rows":len(rows)}, r.id)) return out class ReadingOrder(Capability): name="C7_ReadingOrder"; consumes=["prose_region"]; produces=["ordered_prose"]; residual=True def claims(self, region): return region.type=="prose" def run(self, regions, page, artifacts): table_bboxes=[a.payload for a in artifacts if a.kind=="_table_bbox"] ws=[w for w in page.extract_words() if not _in_any(w, table_bboxes)] text=RO.emit_words(ws, float(page.width)) if hasattr(RO,"emit_words") else "" return [Artifact("ordered_prose", {"words":len(ws),"chars":len(text)}, None)] class FigureExtraction(Capability): name="C8_FigureExtraction"; consumes=["figure_region"]; produces=["figure_units"] def claims(self, region): return region.type=="figure" class References(Capability): name="C4_References"; consumes=["ordered_prose"]; produces=["citation_units"] def _in_any(w, bboxes): cx=(w["x0"]+w["x1"])/2; cy=(w["top"]+w["bottom"])/2 for (x0,t,x1,b) in bboxes: if x0<=cx<=x1 and t<=cy<=b: return True return False def _clamp(b, page): x0,t,x1,bt=b return (max(0,x0),max(0,t),min(float(page.width),x1),min(float(page.height),bt)) def segment(page): regions=[]; rid=0; tbb=[]; W=float(page.width); H=float(page.height) for t in page.find_tables(): b=_clamp(t.bbox,page) if (b[2]-b[0])>=0.25*W and (b[3]-b[1])>=25: # substanzielle Tabelle, keine Footer-Artefakte regions.append(Region(rid,b,"table")); tbb.append(b); rid+=1 regions.append(Region(rid,(0,0,W,H),"prose")); rid+=1 return regions, tbb def resolve_order(caps, raw_types): available=set(raw_types); ordered=[]; remaining=list(caps) while remaining: progressed=False for c in list(remaining): if all(dep in available for dep in c.consumes): ordered.append(c); available.update(c.produces); remaining.remove(c); progressed=True if not progressed: raise RuntimeError("unsatisfiable: "+str([c.name for c in remaining])) return ordered def run_pipeline(page, caps): regions, tbb=segment(page) raw_types=set(r.type+"_region" for r in regions) order=resolve_order(caps, raw_types) artifacts=[Artifact("_table_bbox",b) for b in tbb] # geometrie für C7-Ausschluss # CLAIM-Phase: spezifische Claimer zuerst, residual zuletzt for c in sorted(order, key=lambda c:c.residual): for r in regions: if r.state=="unclaimed" and c.claims(r): r.state="claimed"; r.owner=c.name # RUN-Phase in abgeleiteter Reihenfolge for c in order: owned=[r for r in regions if r.owner==c.name] if owned or any(dep=="ordered_prose" for dep in c.consumes): artifacts+= c.run(owned, page, artifacts) return regions, [a for a in artifacts if not a.kind.startswith("_")], order