feat(pipeline): add checkpoint to dedup Phase 2 — survives container restart

Stores last_control_id in canonical_generation_jobs after each page.
On restart, resumes from checkpoint instead of starting over.
Checkpoint is deleted on completion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-29 09:12:23 +02:00
parent e8f018f2c6
commit 7c5592b50e
@@ -346,13 +346,40 @@ class BatchDedupRunner:
self._progress_total = total
self._progress_count = 0
logger.info("BatchDedup Cross-group: %d masters to check", total)
cross_linked = 0
cross_review = 0
# Paginated processing — 100 rows per DB query
# Checkpoint: resume from last processed control_id
DB_PAGE = 100
last_control_id = ""
# Checkpoint: resume from last processed control_id (survives container restart)
checkpoint_row = self.db.execute(text("""
SELECT config FROM canonical_generation_jobs
WHERE status = 'dedup_phase2_checkpoint'
LIMIT 1
""")).fetchone()
last_control_id = checkpoint_row[0] if checkpoint_row else ""
if last_control_id:
skip_row = self.db.execute(text("""
SELECT COUNT(*) FROM canonical_controls
WHERE decomposition_method = 'pass0b'
AND release_state != 'duplicate'
AND release_state != 'deprecated'
AND control_id <= :last_id
"""), {"last_id": last_control_id}).fetchone()
skipped = skip_row[0] if skip_row else 0
self._progress_count = skipped
logger.info("BatchDedup Cross-group: RESUMING from %s (skipping %d already processed)",
last_control_id, skipped)
else:
self.db.execute(text("""
INSERT INTO canonical_generation_jobs (id, status, config)
VALUES (gen_random_uuid(), 'dedup_phase2_checkpoint', '')
"""))
self.db.commit()
logger.info("BatchDedup Cross-group: %d masters to check (starting from %s)",
total, last_control_id or "beginning")
while True:
rows = self.db.execute(text("""
@@ -461,11 +488,34 @@ class BatchDedupRunner:
self._progress_count += 1
# Log progress every page
# Save checkpoint + log progress every page
try:
self.db.execute(text("""
UPDATE canonical_generation_jobs
SET config = :cid
WHERE status = 'dedup_phase2_checkpoint'
"""), {"cid": last_control_id})
self.db.commit()
except Exception:
try:
self.db.rollback()
except Exception:
pass
processed = self._progress_count
if processed % 500 < DB_PAGE:
logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review",
processed, len(rows), cross_linked, cross_review)
logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review (checkpoint: %s)",
processed, total, cross_linked, cross_review, last_control_id)
# Clear checkpoint on completion
try:
self.db.execute(text("""
DELETE FROM canonical_generation_jobs
WHERE status = 'dedup_phase2_checkpoint'
"""))
self.db.commit()
except Exception:
pass
self.stats["cross_group_linked"] = cross_linked
self.stats["cross_group_review"] = cross_review