diff --git a/control-pipeline/services/batch_dedup_runner.py b/control-pipeline/services/batch_dedup_runner.py index bd73696..8dad13c 100644 --- a/control-pipeline/services/batch_dedup_runner.py +++ b/control-pipeline/services/batch_dedup_runner.py @@ -330,112 +330,140 @@ class BatchDedupRunner: async def _run_cross_group_pass(self): """Phase 2: Find cross-group duplicates among surviving masters. - After Phase 1, ~52k masters remain. Many have similar semantics - despite different merge_group_hints (e.g. different German spellings). - This pass embeds all masters and finds near-duplicates via Qdrant. + Paginated DB queries + individual error handling per control. + Never loads all rows into memory at once. """ logger.info("BatchDedup Phase 2: Cross-group pass starting...") - rows = self.db.execute(text(""" - SELECT id::text, control_id, title, - generation_metadata->>'merge_group_hint' as merge_group_hint - FROM canonical_controls + # Count total + total_row = self.db.execute(text(""" + SELECT COUNT(*) FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state != 'duplicate' AND release_state != 'deprecated' - ORDER BY control_id - """)).fetchall() + """)).fetchone() + total = total_row[0] if total_row else 0 - self._progress_total = len(rows) + self._progress_total = total self._progress_count = 0 - logger.info("BatchDedup Cross-group: %d masters to check", len(rows)) + logger.info("BatchDedup Cross-group: %d masters to check", total) cross_linked = 0 cross_review = 0 - # Process in parallel batches for embedding + Qdrant search - PARALLEL_BATCH = 10 + # Paginated processing — 100 rows per DB query + DB_PAGE = 100 + last_control_id = "" - async def _embed_and_search(r): - """Embed one control and search Qdrant — safe for asyncio.gather.""" - hint = r[3] or "" - parts = hint.split(":", 2) - action = parts[0] if len(parts) > 0 else "" - obj = parts[1] if len(parts) > 1 else "" - canonical = canonicalize_text(action, obj, r[2]) - embedding = await get_embedding(canonical) - if not embedding: - return None - results = await qdrant_search_cross_regulation( - embedding, top_k=5, collection=self.collection, - ) - return (r, results) + while True: + rows = self.db.execute(text(""" + SELECT id::text, control_id, title, + generation_metadata->>'merge_group_hint' as merge_group_hint + FROM canonical_controls + WHERE decomposition_method = 'pass0b' + AND release_state != 'duplicate' + AND release_state != 'deprecated' + AND control_id > :last_id + ORDER BY control_id + LIMIT :page_size + """), {"last_id": last_control_id, "page_size": DB_PAGE}).fetchall() - for batch_start in range(0, len(rows), PARALLEL_BATCH): - batch = rows[batch_start:batch_start + PARALLEL_BATCH] - tasks = [_embed_and_search(r) for r in batch] - results_batch = await asyncio.gather(*tasks, return_exceptions=True) + if not rows: + break - for res in results_batch: - if res is None or isinstance(res, Exception): - if isinstance(res, Exception): - logger.error("BatchDedup embed/search error: %s", res) + last_control_id = rows[-1][1] + + # Process each control individually (no asyncio.gather — more stable) + for r in rows: + try: + hint = r[3] or "" + parts = hint.split(":", 2) + action = parts[0] if len(parts) > 0 else "" + obj = parts[1] if len(parts) > 1 else "" + canonical = canonicalize_text(action, obj, r[2]) + + # Timeout per embedding call + try: + embedding = await asyncio.wait_for( + get_embedding(canonical), timeout=30.0 + ) + except asyncio.TimeoutError: self.stats["errors"] += 1 - continue - - r, results = res - ctrl_uuid = r[0] - hint = r[3] or "" - - if not results: - continue - - for match in results: - match_score = match.get("score", 0.0) - match_payload = match.get("payload", {}) - match_uuid = match_payload.get("control_uuid", "") - - if match_uuid == ctrl_uuid: continue - if match_score > LINK_THRESHOLD: - try: - self.db.execute(text(""" - UPDATE canonical_controls - SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) - WHERE id = CAST(:dup AS uuid) - AND release_state != 'duplicate' - """), {"master": match_uuid, "dup": ctrl_uuid}) + if not embedding: + continue - self.db.execute(text(""" - INSERT INTO control_parent_links - (control_uuid, parent_control_uuid, link_type, confidence) - VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf) - ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING - """), {"cu": match_uuid, "pu": ctrl_uuid, "conf": match_score}) - - transferred = self._transfer_parent_links(match_uuid, ctrl_uuid) - self.stats["parent_links_transferred"] += transferred - - self.db.commit() - cross_linked += 1 - except Exception as e: - logger.error("BatchDedup cross-group link error %s→%s: %s", - ctrl_uuid, match_uuid, e) - self.db.rollback() - self.stats["errors"] += 1 - break - elif match_score > REVIEW_THRESHOLD: - self._write_review( - {"control_id": r[1], "title": r[2], "objective": "", - "merge_group_hint": hint, "pattern_id": None}, - match_payload, match_score, + try: + results = await asyncio.wait_for( + qdrant_search_cross_regulation( + embedding, top_k=5, collection=self.collection, + ), timeout=30.0 ) - cross_review += 1 - break + except asyncio.TimeoutError: + self.stats["errors"] += 1 + continue - processed = min(batch_start + PARALLEL_BATCH, len(rows)) - self._progress_count = processed - if processed % 500 < PARALLEL_BATCH: + ctrl_uuid = r[0] + + for match in (results or []): + match_score = match.get("score", 0.0) + match_payload = match.get("payload", {}) + match_uuid = match_payload.get("control_uuid", "") + + if match_uuid == ctrl_uuid: + continue + + if match_score > LINK_THRESHOLD: + try: + self.db.execute(text(""" + UPDATE canonical_controls + SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) + WHERE id = CAST(:dup AS uuid) + AND release_state != 'duplicate' + """), {"master": match_uuid, "dup": ctrl_uuid}) + + self.db.execute(text(""" + INSERT INTO control_parent_links + (control_uuid, parent_control_uuid, link_type, confidence) + VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf) + ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING + """), {"cu": match_uuid, "pu": ctrl_uuid, "conf": match_score}) + + transferred = self._transfer_parent_links(match_uuid, ctrl_uuid) + self.stats["parent_links_transferred"] += transferred + self.db.commit() + cross_linked += 1 + except Exception as e: + logger.error("BatchDedup cross-group link error %s→%s: %s", + ctrl_uuid, match_uuid, e) + try: + self.db.rollback() + except Exception: + pass + self.stats["errors"] += 1 + break + elif match_score > REVIEW_THRESHOLD: + self._write_review( + {"control_id": r[1], "title": r[2], "objective": "", + "merge_group_hint": hint, "pattern_id": None}, + match_payload, match_score, + ) + cross_review += 1 + break + + except Exception as e: + logger.error("BatchDedup cross-group control %s error: %s", r[1], e) + self.stats["errors"] += 1 + try: + self.db.rollback() + except Exception: + pass + + self._progress_count += 1 + + # Log progress every page + processed = self._progress_count + if processed % 500 < DB_PAGE: logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review", processed, len(rows), cross_linked, cross_review)