diff --git a/backend-compliance/compliance/services/batch_dedup_runner.py b/backend-compliance/compliance/services/batch_dedup_runner.py index c2c4447..89ccfe8 100644 --- a/backend-compliance/compliance/services/batch_dedup_runner.py +++ b/backend-compliance/compliance/services/batch_dedup_runner.py @@ -141,6 +141,10 @@ class BatchDedupRunner: except Exception as e: logger.error("BatchDedup Phase 1 error on hint %s: %s", hint, e) self.stats["errors"] += 1 + try: + self.db.rollback() + except Exception: + pass logger.info( "BatchDedup Phase 1 done: %d masters, %d linked, %d review", @@ -372,26 +376,32 @@ class BatchDedupRunner: # Simple check: different control UUID is enough if match_score > LINK_THRESHOLD: # Mark the worse one as duplicate - self.db.execute(text(""" - UPDATE canonical_controls - SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) - WHERE id = CAST(:dup AS uuid) - AND release_state != 'duplicate' - """), {"master": match_uuid, "dup": uuid}) + try: + self.db.execute(text(""" + UPDATE canonical_controls + SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) + WHERE id = CAST(:dup AS uuid) + AND release_state != 'duplicate' + """), {"master": match_uuid, "dup": uuid}) - self.db.execute(text(""" - INSERT INTO control_parent_links - (control_uuid, parent_control_uuid, link_type, confidence) - VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf) - ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING - """), {"cu": match_uuid, "pu": uuid, "conf": match_score}) + self.db.execute(text(""" + INSERT INTO control_parent_links + (control_uuid, parent_control_uuid, link_type, confidence) + VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf) + ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING + """), {"cu": match_uuid, "pu": uuid, "conf": match_score}) - # Transfer parent links - transferred = self._transfer_parent_links(match_uuid, uuid) - self.stats["parent_links_transferred"] += transferred + # Transfer parent links + transferred = self._transfer_parent_links(match_uuid, uuid) + self.stats["parent_links_transferred"] += transferred - self.db.commit() - cross_linked += 1 + self.db.commit() + cross_linked += 1 + except Exception as e: + logger.error("BatchDedup cross-group link error %s→%s: %s", + uuid, match_uuid, e) + self.db.rollback() + self.stats["errors"] += 1 break # Only one cross-link per control elif match_score > REVIEW_THRESHOLD: self._write_review( @@ -474,43 +484,55 @@ class BatchDedupRunner: async def _mark_duplicate(self, master: dict, candidate: dict, confidence: float): """Mark candidate as duplicate of master, transfer parent links.""" - self.db.execute(text(""" - UPDATE canonical_controls - SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) - WHERE id = CAST(:cand AS uuid) - """), {"master": master["uuid"], "cand": candidate["uuid"]}) + try: + self.db.execute(text(""" + UPDATE canonical_controls + SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) + WHERE id = CAST(:cand AS uuid) + """), {"master": master["uuid"], "cand": candidate["uuid"]}) - self.db.execute(text(""" - INSERT INTO control_parent_links - (control_uuid, parent_control_uuid, link_type, confidence) - VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) - ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING - """), {"master": master["uuid"], "cand_parent": candidate["uuid"], "conf": confidence}) + self.db.execute(text(""" + INSERT INTO control_parent_links + (control_uuid, parent_control_uuid, link_type, confidence) + VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) + ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING + """), {"master": master["uuid"], "cand_parent": candidate["uuid"], "conf": confidence}) - transferred = self._transfer_parent_links(master["uuid"], candidate["uuid"]) - self.stats["parent_links_transferred"] += transferred + transferred = self._transfer_parent_links(master["uuid"], candidate["uuid"]) + self.stats["parent_links_transferred"] += transferred - self.db.commit() + self.db.commit() + except Exception as e: + logger.error("BatchDedup _mark_duplicate error %s→%s: %s", + candidate["uuid"], master["uuid"], e) + self.db.rollback() + raise async def _mark_duplicate_to(self, master_uuid: str, candidate: dict, confidence: float): """Mark candidate as duplicate of a Qdrant-matched master.""" - self.db.execute(text(""" - UPDATE canonical_controls - SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) - WHERE id = CAST(:cand AS uuid) - """), {"master": master_uuid, "cand": candidate["uuid"]}) + try: + self.db.execute(text(""" + UPDATE canonical_controls + SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) + WHERE id = CAST(:cand AS uuid) + """), {"master": master_uuid, "cand": candidate["uuid"]}) - self.db.execute(text(""" - INSERT INTO control_parent_links - (control_uuid, parent_control_uuid, link_type, confidence) - VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) - ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING - """), {"master": master_uuid, "cand_parent": candidate["uuid"], "conf": confidence}) + self.db.execute(text(""" + INSERT INTO control_parent_links + (control_uuid, parent_control_uuid, link_type, confidence) + VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) + ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING + """), {"master": master_uuid, "cand_parent": candidate["uuid"], "conf": confidence}) - transferred = self._transfer_parent_links(master_uuid, candidate["uuid"]) - self.stats["parent_links_transferred"] += transferred + transferred = self._transfer_parent_links(master_uuid, candidate["uuid"]) + self.stats["parent_links_transferred"] += transferred - self.db.commit() + self.db.commit() + except Exception as e: + logger.error("BatchDedup _mark_duplicate_to error %s→%s: %s", + candidate["uuid"], master_uuid, e) + self.db.rollback() + raise def _transfer_parent_links(self, master_uuid: str, duplicate_uuid: str) -> int: """Move existing parent links from duplicate to master.""" @@ -549,26 +571,31 @@ class BatchDedupRunner: def _write_review(self, candidate: dict, matched_payload: dict, score: float): """Write a dedup review entry for borderline matches.""" - self.db.execute(text(""" - INSERT INTO control_dedup_reviews - (candidate_control_id, candidate_title, candidate_objective, - matched_control_uuid, matched_control_id, - similarity_score, dedup_stage, dedup_details) - VALUES (:ccid, :ct, :co, CAST(:mcu AS uuid), :mci, - :ss, 'batch_dedup', :dd::jsonb) - """), { - "ccid": candidate["control_id"], - "ct": candidate["title"], - "co": candidate.get("objective", ""), - "mcu": matched_payload.get("control_uuid"), - "mci": matched_payload.get("control_id"), - "ss": score, - "dd": json.dumps({ - "merge_group_hint": candidate.get("merge_group_hint", ""), - "pattern_id": candidate.get("pattern_id"), - }), - }) - self.db.commit() + try: + self.db.execute(text(""" + INSERT INTO control_dedup_reviews + (candidate_control_id, candidate_title, candidate_objective, + matched_control_uuid, matched_control_id, + similarity_score, dedup_stage, dedup_details) + VALUES (:ccid, :ct, :co, CAST(:mcu AS uuid), :mci, + :ss, 'batch_dedup', :dd::jsonb) + """), { + "ccid": candidate["control_id"], + "ct": candidate["title"], + "co": candidate.get("objective", ""), + "mcu": matched_payload.get("control_uuid"), + "mci": matched_payload.get("control_id"), + "ss": score, + "dd": json.dumps({ + "merge_group_hint": candidate.get("merge_group_hint", ""), + "pattern_id": candidate.get("pattern_id"), + }), + }) + self.db.commit() + except Exception as e: + logger.error("BatchDedup _write_review error: %s", e) + self.db.rollback() + raise # ── Progress ─────────────────────────────────────────────────────────