diff --git a/control-pipeline/services/batch_dedup_runner.py b/control-pipeline/services/batch_dedup_runner.py index 8dad13c..27564a0 100644 --- a/control-pipeline/services/batch_dedup_runner.py +++ b/control-pipeline/services/batch_dedup_runner.py @@ -346,13 +346,40 @@ class BatchDedupRunner: self._progress_total = total self._progress_count = 0 - logger.info("BatchDedup Cross-group: %d masters to check", total) cross_linked = 0 cross_review = 0 - # Paginated processing — 100 rows per DB query + # Checkpoint: resume from last processed control_id DB_PAGE = 100 - last_control_id = "" + # Checkpoint: resume from last processed control_id (survives container restart) + checkpoint_row = self.db.execute(text(""" + SELECT config FROM canonical_generation_jobs + WHERE status = 'dedup_phase2_checkpoint' + LIMIT 1 + """)).fetchone() + last_control_id = checkpoint_row[0] if checkpoint_row else "" + + if last_control_id: + skip_row = self.db.execute(text(""" + SELECT COUNT(*) FROM canonical_controls + WHERE decomposition_method = 'pass0b' + AND release_state != 'duplicate' + AND release_state != 'deprecated' + AND control_id <= :last_id + """), {"last_id": last_control_id}).fetchone() + skipped = skip_row[0] if skip_row else 0 + self._progress_count = skipped + logger.info("BatchDedup Cross-group: RESUMING from %s (skipping %d already processed)", + last_control_id, skipped) + else: + self.db.execute(text(""" + INSERT INTO canonical_generation_jobs (id, status, config) + VALUES (gen_random_uuid(), 'dedup_phase2_checkpoint', '') + """)) + self.db.commit() + + logger.info("BatchDedup Cross-group: %d masters to check (starting from %s)", + total, last_control_id or "beginning") while True: rows = self.db.execute(text(""" @@ -461,11 +488,34 @@ class BatchDedupRunner: self._progress_count += 1 - # Log progress every page + # Save checkpoint + log progress every page + try: + self.db.execute(text(""" + UPDATE canonical_generation_jobs + SET config = :cid + WHERE status = 'dedup_phase2_checkpoint' + """), {"cid": last_control_id}) + self.db.commit() + except Exception: + try: + self.db.rollback() + except Exception: + pass + processed = self._progress_count if processed % 500 < DB_PAGE: - logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review", - processed, len(rows), cross_linked, cross_review) + logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review (checkpoint: %s)", + processed, total, cross_linked, cross_review, last_control_id) + + # Clear checkpoint on completion + try: + self.db.execute(text(""" + DELETE FROM canonical_generation_jobs + WHERE status = 'dedup_phase2_checkpoint' + """)) + self.db.commit() + except Exception: + pass self.stats["cross_group_linked"] = cross_linked self.stats["cross_group_review"] = cross_review