Compare commits
6 Commits
28aa74b4b0
...
e8f018f2c6
| Author | SHA1 | Date | |
|---|---|---|---|
| e8f018f2c6 | |||
| b151951448 | |||
| 2e2e81b3e1 | |||
| b873c0e4ae | |||
| 9dc16674e2 | |||
| e6e2688b56 |
@@ -2298,13 +2298,29 @@ class SubmitPass0bRequest(BaseModel):
|
||||
batch_size: int = 5
|
||||
|
||||
|
||||
_last_submit_batch_id: str = ""
|
||||
_last_submit_time: float = 0
|
||||
|
||||
|
||||
@router.post("/generate/submit-pass0b")
|
||||
async def submit_pass0b(req: SubmitPass0bRequest):
|
||||
"""Submit Pass 0b batch to Anthropic Batch API.
|
||||
|
||||
Loads unprocessed obligations, applies pre-LLM filter, submits batch.
|
||||
Returns batch_id for status polling and later result processing.
|
||||
SAFETY: Refuses to submit if a batch was submitted in the last 10 minutes.
|
||||
This prevents duplicate batches from curl retries or timeouts.
|
||||
"""
|
||||
import time
|
||||
global _last_submit_batch_id, _last_submit_time
|
||||
|
||||
# Idempotency guard: refuse if last submit was <10 min ago
|
||||
elapsed = time.time() - _last_submit_time
|
||||
if elapsed < 600 and _last_submit_batch_id:
|
||||
return {
|
||||
"status": "blocked",
|
||||
"reason": f"Batch {_last_submit_batch_id} was submitted {int(elapsed)}s ago. Wait {int(600 - elapsed)}s or use force=true.",
|
||||
"last_batch_id": _last_submit_batch_id,
|
||||
}
|
||||
|
||||
from services.decomposition_pass import DecompositionPass
|
||||
db = SessionLocal()
|
||||
try:
|
||||
@@ -2313,6 +2329,12 @@ async def submit_pass0b(req: SubmitPass0bRequest):
|
||||
limit=req.limit,
|
||||
batch_size=req.batch_size,
|
||||
)
|
||||
# Record successful submit
|
||||
batch_id = result.get("batch_id", "")
|
||||
if batch_id:
|
||||
_last_submit_batch_id = batch_id
|
||||
_last_submit_time = time.time()
|
||||
logger.info("Submit guard: recorded batch %s", batch_id)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error("Submit Pass 0b failed: %s", e)
|
||||
|
||||
@@ -131,11 +131,20 @@ class BatchDedupRunner:
|
||||
await ensure_qdrant_collection(collection=self.collection)
|
||||
|
||||
# Phase 1: Intra-group dedup (same merge_group_hint)
|
||||
# Optimization: skip singleton groups (they're automatically masters)
|
||||
self._progress_phase = "phase1"
|
||||
groups = self._load_merge_groups(hint_filter)
|
||||
self._progress_total = self.stats["total_controls"]
|
||||
|
||||
for hint, controls in groups:
|
||||
multi_groups = [(h, c) for h, c in groups if len(c) > 1]
|
||||
singleton_count = len(groups) - len(multi_groups)
|
||||
self.stats["singleton_groups_skipped"] = singleton_count
|
||||
logger.info(
|
||||
"BatchDedup Phase 1: %d multi-control groups to process, %d singletons skipped",
|
||||
len(multi_groups), singleton_count,
|
||||
)
|
||||
|
||||
for hint, controls in multi_groups:
|
||||
try:
|
||||
await self._process_hint_group(hint, controls, dry_run)
|
||||
self.stats["phase1_groups_processed"] += 1
|
||||
@@ -148,8 +157,8 @@ class BatchDedupRunner:
|
||||
pass
|
||||
|
||||
logger.info(
|
||||
"BatchDedup Phase 1 done: %d masters, %d linked, %d review",
|
||||
self.stats["masters"], self.stats["linked"], self.stats["review"],
|
||||
"BatchDedup Phase 1 done: %d masters, %d linked, %d review (skipped %d singletons)",
|
||||
self.stats["masters"], self.stats["linked"], self.stats["review"], singleton_count,
|
||||
)
|
||||
|
||||
# Phase 2: Cross-group dedup via embeddings
|
||||
@@ -321,112 +330,140 @@ class BatchDedupRunner:
|
||||
async def _run_cross_group_pass(self):
|
||||
"""Phase 2: Find cross-group duplicates among surviving masters.
|
||||
|
||||
After Phase 1, ~52k masters remain. Many have similar semantics
|
||||
despite different merge_group_hints (e.g. different German spellings).
|
||||
This pass embeds all masters and finds near-duplicates via Qdrant.
|
||||
Paginated DB queries + individual error handling per control.
|
||||
Never loads all rows into memory at once.
|
||||
"""
|
||||
logger.info("BatchDedup Phase 2: Cross-group pass starting...")
|
||||
|
||||
rows = self.db.execute(text("""
|
||||
SELECT id::text, control_id, title,
|
||||
generation_metadata->>'merge_group_hint' as merge_group_hint
|
||||
FROM canonical_controls
|
||||
# Count total
|
||||
total_row = self.db.execute(text("""
|
||||
SELECT COUNT(*) FROM canonical_controls
|
||||
WHERE decomposition_method = 'pass0b'
|
||||
AND release_state != 'duplicate'
|
||||
AND release_state != 'deprecated'
|
||||
ORDER BY control_id
|
||||
""")).fetchall()
|
||||
""")).fetchone()
|
||||
total = total_row[0] if total_row else 0
|
||||
|
||||
self._progress_total = len(rows)
|
||||
self._progress_total = total
|
||||
self._progress_count = 0
|
||||
logger.info("BatchDedup Cross-group: %d masters to check", len(rows))
|
||||
logger.info("BatchDedup Cross-group: %d masters to check", total)
|
||||
cross_linked = 0
|
||||
cross_review = 0
|
||||
|
||||
# Process in parallel batches for embedding + Qdrant search
|
||||
PARALLEL_BATCH = 10
|
||||
# Paginated processing — 100 rows per DB query
|
||||
DB_PAGE = 100
|
||||
last_control_id = ""
|
||||
|
||||
async def _embed_and_search(r):
|
||||
"""Embed one control and search Qdrant — safe for asyncio.gather."""
|
||||
hint = r[3] or ""
|
||||
parts = hint.split(":", 2)
|
||||
action = parts[0] if len(parts) > 0 else ""
|
||||
obj = parts[1] if len(parts) > 1 else ""
|
||||
canonical = canonicalize_text(action, obj, r[2])
|
||||
embedding = await get_embedding(canonical)
|
||||
if not embedding:
|
||||
return None
|
||||
results = await qdrant_search_cross_regulation(
|
||||
embedding, top_k=5, collection=self.collection,
|
||||
)
|
||||
return (r, results)
|
||||
while True:
|
||||
rows = self.db.execute(text("""
|
||||
SELECT id::text, control_id, title,
|
||||
generation_metadata->>'merge_group_hint' as merge_group_hint
|
||||
FROM canonical_controls
|
||||
WHERE decomposition_method = 'pass0b'
|
||||
AND release_state != 'duplicate'
|
||||
AND release_state != 'deprecated'
|
||||
AND control_id > :last_id
|
||||
ORDER BY control_id
|
||||
LIMIT :page_size
|
||||
"""), {"last_id": last_control_id, "page_size": DB_PAGE}).fetchall()
|
||||
|
||||
for batch_start in range(0, len(rows), PARALLEL_BATCH):
|
||||
batch = rows[batch_start:batch_start + PARALLEL_BATCH]
|
||||
tasks = [_embed_and_search(r) for r in batch]
|
||||
results_batch = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
for res in results_batch:
|
||||
if res is None or isinstance(res, Exception):
|
||||
if isinstance(res, Exception):
|
||||
logger.error("BatchDedup embed/search error: %s", res)
|
||||
last_control_id = rows[-1][1]
|
||||
|
||||
# Process each control individually (no asyncio.gather — more stable)
|
||||
for r in rows:
|
||||
try:
|
||||
hint = r[3] or ""
|
||||
parts = hint.split(":", 2)
|
||||
action = parts[0] if len(parts) > 0 else ""
|
||||
obj = parts[1] if len(parts) > 1 else ""
|
||||
canonical = canonicalize_text(action, obj, r[2])
|
||||
|
||||
# Timeout per embedding call
|
||||
try:
|
||||
embedding = await asyncio.wait_for(
|
||||
get_embedding(canonical), timeout=30.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
self.stats["errors"] += 1
|
||||
continue
|
||||
|
||||
r, results = res
|
||||
ctrl_uuid = r[0]
|
||||
hint = r[3] or ""
|
||||
|
||||
if not results:
|
||||
continue
|
||||
|
||||
for match in results:
|
||||
match_score = match.get("score", 0.0)
|
||||
match_payload = match.get("payload", {})
|
||||
match_uuid = match_payload.get("control_uuid", "")
|
||||
|
||||
if match_uuid == ctrl_uuid:
|
||||
continue
|
||||
|
||||
if match_score > LINK_THRESHOLD:
|
||||
try:
|
||||
self.db.execute(text("""
|
||||
UPDATE canonical_controls
|
||||
SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid)
|
||||
WHERE id = CAST(:dup AS uuid)
|
||||
AND release_state != 'duplicate'
|
||||
"""), {"master": match_uuid, "dup": ctrl_uuid})
|
||||
if not embedding:
|
||||
continue
|
||||
|
||||
self.db.execute(text("""
|
||||
INSERT INTO control_parent_links
|
||||
(control_uuid, parent_control_uuid, link_type, confidence)
|
||||
VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf)
|
||||
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
|
||||
"""), {"cu": match_uuid, "pu": ctrl_uuid, "conf": match_score})
|
||||
|
||||
transferred = self._transfer_parent_links(match_uuid, ctrl_uuid)
|
||||
self.stats["parent_links_transferred"] += transferred
|
||||
|
||||
self.db.commit()
|
||||
cross_linked += 1
|
||||
except Exception as e:
|
||||
logger.error("BatchDedup cross-group link error %s→%s: %s",
|
||||
ctrl_uuid, match_uuid, e)
|
||||
self.db.rollback()
|
||||
self.stats["errors"] += 1
|
||||
break
|
||||
elif match_score > REVIEW_THRESHOLD:
|
||||
self._write_review(
|
||||
{"control_id": r[1], "title": r[2], "objective": "",
|
||||
"merge_group_hint": hint, "pattern_id": None},
|
||||
match_payload, match_score,
|
||||
try:
|
||||
results = await asyncio.wait_for(
|
||||
qdrant_search_cross_regulation(
|
||||
embedding, top_k=5, collection=self.collection,
|
||||
), timeout=30.0
|
||||
)
|
||||
cross_review += 1
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
self.stats["errors"] += 1
|
||||
continue
|
||||
|
||||
processed = min(batch_start + PARALLEL_BATCH, len(rows))
|
||||
self._progress_count = processed
|
||||
if processed % 500 < PARALLEL_BATCH:
|
||||
ctrl_uuid = r[0]
|
||||
|
||||
for match in (results or []):
|
||||
match_score = match.get("score", 0.0)
|
||||
match_payload = match.get("payload", {})
|
||||
match_uuid = match_payload.get("control_uuid", "")
|
||||
|
||||
if match_uuid == ctrl_uuid:
|
||||
continue
|
||||
|
||||
if match_score > LINK_THRESHOLD:
|
||||
try:
|
||||
self.db.execute(text("""
|
||||
UPDATE canonical_controls
|
||||
SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid)
|
||||
WHERE id = CAST(:dup AS uuid)
|
||||
AND release_state != 'duplicate'
|
||||
"""), {"master": match_uuid, "dup": ctrl_uuid})
|
||||
|
||||
self.db.execute(text("""
|
||||
INSERT INTO control_parent_links
|
||||
(control_uuid, parent_control_uuid, link_type, confidence)
|
||||
VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf)
|
||||
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
|
||||
"""), {"cu": match_uuid, "pu": ctrl_uuid, "conf": match_score})
|
||||
|
||||
transferred = self._transfer_parent_links(match_uuid, ctrl_uuid)
|
||||
self.stats["parent_links_transferred"] += transferred
|
||||
self.db.commit()
|
||||
cross_linked += 1
|
||||
except Exception as e:
|
||||
logger.error("BatchDedup cross-group link error %s→%s: %s",
|
||||
ctrl_uuid, match_uuid, e)
|
||||
try:
|
||||
self.db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
self.stats["errors"] += 1
|
||||
break
|
||||
elif match_score > REVIEW_THRESHOLD:
|
||||
self._write_review(
|
||||
{"control_id": r[1], "title": r[2], "objective": "",
|
||||
"merge_group_hint": hint, "pattern_id": None},
|
||||
match_payload, match_score,
|
||||
)
|
||||
cross_review += 1
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error("BatchDedup cross-group control %s error: %s", r[1], e)
|
||||
self.stats["errors"] += 1
|
||||
try:
|
||||
self.db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._progress_count += 1
|
||||
|
||||
# Log progress every page
|
||||
processed = self._progress_count
|
||||
if processed % 500 < DB_PAGE:
|
||||
logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review",
|
||||
processed, len(rows), cross_linked, cross_review)
|
||||
|
||||
|
||||
+2
-6
@@ -413,12 +413,8 @@ services:
|
||||
embedding-service:
|
||||
condition: service_healthy
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://127.0.0.1:8098/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 10s
|
||||
restart: unless-stopped
|
||||
disable: true
|
||||
restart: "no"
|
||||
networks:
|
||||
- breakpilot-network
|
||||
|
||||
|
||||
@@ -285,6 +285,8 @@ server {
|
||||
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
|
||||
ssl_prefer_server_ciphers off;
|
||||
|
||||
client_max_body_size 50M;
|
||||
|
||||
# SDK API proxy (same origin)
|
||||
location /sdk/v1/ {
|
||||
set $upstream_sdk bp-compliance-ai-sdk:8090;
|
||||
@@ -533,7 +535,7 @@ server {
|
||||
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
|
||||
ssl_prefer_server_ciphers off;
|
||||
|
||||
client_max_body_size 10M;
|
||||
client_max_body_size 50M;
|
||||
|
||||
location / {
|
||||
set $upstream_sdk bp-compliance-ai-sdk:8090;
|
||||
|
||||
Reference in New Issue
Block a user