From 7c5592b50ef5d2e32e0f96b6cbabadcd57f979e2 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 29 Apr 2026 09:12:23 +0200 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20add=20checkpoint=20to=20dedup?= =?UTF-8?q?=20Phase=202=20=E2=80=94=20survives=20container=20restart?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stores last_control_id in canonical_generation_jobs after each page. On restart, resumes from checkpoint instead of starting over. Checkpoint is deleted on completion. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../services/batch_dedup_runner.py | 62 +++++++++++++++++-- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/control-pipeline/services/batch_dedup_runner.py b/control-pipeline/services/batch_dedup_runner.py index 8dad13c..27564a0 100644 --- a/control-pipeline/services/batch_dedup_runner.py +++ b/control-pipeline/services/batch_dedup_runner.py @@ -346,13 +346,40 @@ class BatchDedupRunner: self._progress_total = total self._progress_count = 0 - logger.info("BatchDedup Cross-group: %d masters to check", total) cross_linked = 0 cross_review = 0 - # Paginated processing — 100 rows per DB query + # Checkpoint: resume from last processed control_id DB_PAGE = 100 - last_control_id = "" + # Checkpoint: resume from last processed control_id (survives container restart) + checkpoint_row = self.db.execute(text(""" + SELECT config FROM canonical_generation_jobs + WHERE status = 'dedup_phase2_checkpoint' + LIMIT 1 + """)).fetchone() + last_control_id = checkpoint_row[0] if checkpoint_row else "" + + if last_control_id: + skip_row = self.db.execute(text(""" + SELECT COUNT(*) FROM canonical_controls + WHERE decomposition_method = 'pass0b' + AND release_state != 'duplicate' + AND release_state != 'deprecated' + AND control_id <= :last_id + """), {"last_id": last_control_id}).fetchone() + skipped = skip_row[0] if skip_row else 0 + self._progress_count = skipped + logger.info("BatchDedup Cross-group: RESUMING from %s (skipping %d already processed)", + last_control_id, skipped) + else: + self.db.execute(text(""" + INSERT INTO canonical_generation_jobs (id, status, config) + VALUES (gen_random_uuid(), 'dedup_phase2_checkpoint', '') + """)) + self.db.commit() + + logger.info("BatchDedup Cross-group: %d masters to check (starting from %s)", + total, last_control_id or "beginning") while True: rows = self.db.execute(text(""" @@ -461,11 +488,34 @@ class BatchDedupRunner: self._progress_count += 1 - # Log progress every page + # Save checkpoint + log progress every page + try: + self.db.execute(text(""" + UPDATE canonical_generation_jobs + SET config = :cid + WHERE status = 'dedup_phase2_checkpoint' + """), {"cid": last_control_id}) + self.db.commit() + except Exception: + try: + self.db.rollback() + except Exception: + pass + processed = self._progress_count if processed % 500 < DB_PAGE: - logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review", - processed, len(rows), cross_linked, cross_review) + logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review (checkpoint: %s)", + processed, total, cross_linked, cross_review, last_control_id) + + # Clear checkpoint on completion + try: + self.db.execute(text(""" + DELETE FROM canonical_generation_jobs + WHERE status = 'dedup_phase2_checkpoint' + """)) + self.db.commit() + except Exception: + pass self.stats["cross_group_linked"] = cross_linked self.stats["cross_group_review"] = cross_review