#!/usr/bin/env bash set -euo pipefail # ============================================================================= # Qdrant Migration: Local → qdrant-dev.breakpilot.ai # Migrates compliance collections only (skips NIBIS/Lehrer collections) # ============================================================================= SOURCE_URL="${SOURCE_URL:-http://macmini:6333}" TARGET_URL="${TARGET_URL:-https://qdrant-dev.breakpilot.ai}" TARGET_API_KEY="${TARGET_API_KEY:-z9cKbT74vl1aKPD1QGIlKWfET47VH93u}" BATCH_SIZE=15 # Collections to SKIP (Lehrer/NIBIS) SKIP_COLLECTIONS="bp_nibis bp_nibis_eh bp_compliance_schulrecht bp_eh bp_vocab" TMPDIR=$(mktemp -d) trap "rm -rf $TMPDIR" EXIT RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' NC='\033[0m' log() { echo -e "${GREEN}[INFO]${NC} $*"; } warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } fail() { echo -e "${RED}[FAIL]${NC} $*"; exit 1; } should_skip() { local col="$1" for skip in $SKIP_COLLECTIONS; do [[ "$col" == "$skip" ]] && return 0 done return 1 } target_curl() { curl -s --max-time 60 -H "api-key: ${TARGET_API_KEY}" -H "Content-Type: application/json" "$@" } source_curl() { curl -s -H "Content-Type: application/json" "$@" } # --- Preflight checks --- log "Source: $SOURCE_URL" log "Target: $TARGET_URL" echo "" if ! source_curl "${SOURCE_URL}/collections" >/dev/null 2>&1; then fail "Source Qdrant not reachable at ${SOURCE_URL}" fi if ! target_curl "${TARGET_URL}/collections" >/dev/null 2>&1; then fail "Target Qdrant not reachable at ${TARGET_URL}" fi # --- Get all collections from source --- COLLECTIONS=$(source_curl "${SOURCE_URL}/collections" | python3 -c " import sys, json data = json.load(sys.stdin) for c in data['result']['collections']: print(c['name']) ") log "Found collections on source:" echo "$COLLECTIONS" | while read -r col; do if should_skip "$col"; then echo -e " ${YELLOW}SKIP${NC} $col" else echo -e " ${GREEN} OK${NC} $col" fi done echo "" # --- Migrate each collection --- TOTAL_MIGRATED=0 TOTAL_POINTS=0 while read -r col; do [[ -z "$col" ]] && continue if should_skip "$col"; then continue fi log "=== Migrating: $col ===" # 1. Get collection info from source source_curl "${SOURCE_URL}/collections/${col}" > "$TMPDIR/col_info.json" STATUS=$(python3 -c "import json; print(json.load(open('$TMPDIR/col_info.json'))['status'])" 2>/dev/null || echo "error") if [[ "$STATUS" != "ok" ]]; then warn "Collection $col has status '$STATUS', skipping" continue fi SOURCE_COUNT=$(python3 -c "import json; print(json.load(open('$TMPDIR/col_info.json'))['result']['points_count'])") log " Source points: $SOURCE_COUNT" if [[ "$SOURCE_COUNT" == "0" ]]; then warn " Empty collection, skipping" continue fi # 2. Create collection on target python3 -c " import json info = json.load(open('$TMPDIR/col_info.json'))['result']['config']['params'] vectors = info['vectors'] body = {'vectors': vectors} if 'shard_number' in info: body['shard_number'] = info['shard_number'] json.dump(body, open('$TMPDIR/create_body.json', 'w')) " log " Creating collection on target..." CREATE_RESP=$(target_curl -X PUT "${TARGET_URL}/collections/${col}" -d @"$TMPDIR/create_body.json") CREATE_STATUS=$(echo "$CREATE_RESP" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('status','error'))" 2>/dev/null || echo "error") if [[ "$CREATE_STATUS" != "ok" ]]; then EXISTS=$(target_curl "${TARGET_URL}/collections/${col}" | python3 -c "import sys,json; print(json.load(sys.stdin).get('status','error'))" 2>/dev/null || echo "no") if [[ "$EXISTS" == "ok" ]]; then warn " Collection already exists on target, will upsert points" else warn " Failed to create collection: $CREATE_RESP" continue fi fi # Wait for collection to be ready sleep 2 # 3. Scroll all points in batches and upsert to target OFFSET="" MIGRATED=0 while true; do # Build scroll request if [[ -z "$OFFSET" ]]; then echo "{\"limit\": ${BATCH_SIZE}, \"with_payload\": true, \"with_vector\": true}" > "$TMPDIR/scroll_req.json" else echo "{\"limit\": ${BATCH_SIZE}, \"with_payload\": true, \"with_vector\": true, \"offset\": ${OFFSET}}" > "$TMPDIR/scroll_req.json" fi source_curl -X POST "${SOURCE_URL}/collections/${col}/points/scroll" -d @"$TMPDIR/scroll_req.json" > "$TMPDIR/scroll_resp.json" # Extract points and next offset python3 -c " import json data = json.load(open('$TMPDIR/scroll_resp.json')) points = data['result']['points'] next_offset = data['result'].get('next_page_offset') json.dump({'points': points}, open('$TMPDIR/upsert_body.json', 'w')) with open('$TMPDIR/next_offset.txt', 'w') as f: f.write(str(next_offset) if next_offset is not None else 'DONE') with open('$TMPDIR/batch_count.txt', 'w') as f: f.write(str(len(points))) " NEXT_OFFSET=$(cat "$TMPDIR/next_offset.txt") BATCH_COUNT=$(cat "$TMPDIR/batch_count.txt") if [[ "$BATCH_COUNT" == "0" ]]; then break fi # Upsert to target using file (with retry: split into single points on failure) UPSERT_RESP=$(target_curl -X PUT "${TARGET_URL}/collections/${col}/points?wait=true" -d @"$TMPDIR/upsert_body.json") UPSERT_STATUS=$(echo "$UPSERT_RESP" | python3 -c " import sys, json s = sys.stdin.read().strip() if not s: print('error') else: d = json.loads(s) print(d.get('status','error')) " 2>/dev/null || echo "error") if [[ "$UPSERT_STATUS" != "ok" ]]; then warn " Batch upsert failed, retrying one-by-one..." # Split into individual points and retry SINGLE_OK=0 SINGLE_FAIL=0 python3 -c " import json data = json.load(open('$TMPDIR/upsert_body.json')) for i, pt in enumerate(data['points']): json.dump({'points': [pt]}, open(f'$TMPDIR/single_{i}.json', 'w')) print(len(data['points'])) " > "$TMPDIR/single_count.txt" SINGLE_COUNT=$(cat "$TMPDIR/single_count.txt") for ((si=0; si/dev/null || echo "error") if [[ "$SS" == "ok" ]]; then SINGLE_OK=$((SINGLE_OK + 1)) else SINGLE_FAIL=$((SINGLE_FAIL + 1)) fi done if [[ "$SINGLE_FAIL" -gt 0 ]]; then warn " Single-point retry: $SINGLE_OK ok, $SINGLE_FAIL failed" fi fi MIGRATED=$((MIGRATED + BATCH_COUNT)) echo -ne " Migrated: ${MIGRATED}/${SOURCE_COUNT}\r" if [[ "$NEXT_OFFSET" == "DONE" ]]; then break fi OFFSET="$NEXT_OFFSET" done echo "" # 4. Verify point count TARGET_COUNT=$(target_curl "${TARGET_URL}/collections/${col}" | python3 -c "import sys,json; print(json.load(sys.stdin)['result']['points_count'])") if [[ "$TARGET_COUNT" == "$SOURCE_COUNT" ]]; then log " Verified: ${TARGET_COUNT}/${SOURCE_COUNT} points" else warn " Count mismatch! Source: ${SOURCE_COUNT}, Target: ${TARGET_COUNT}" fi TOTAL_MIGRATED=$((TOTAL_MIGRATED + 1)) TOTAL_POINTS=$((TOTAL_POINTS + MIGRATED)) done <<< "$COLLECTIONS" echo "" log "==============================" log "Migration complete!" log "Collections migrated: $TOTAL_MIGRATED" log "Total points migrated: $TOTAL_POINTS" log "=============================="