All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 37s
CI / test-python-backend-compliance (push) Successful in 32s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 18s
- LegalRAGClient: QDRANT_HOST+PORT → QDRANT_URL + QDRANT_API_KEY - docker-compose: env vars updated for hosted Qdrant - AllowedCollections: added bp_compliance_gdpr, bp_dsfa_templates, bp_dsfa_risks - Migration scripts (bash + python) for data transfer Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
236 lines
6.7 KiB
Python
Executable File
236 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Qdrant Migration: Local → qdrant-dev.breakpilot.ai
|
|
Migrates compliance collections only (skips NIBIS/Lehrer collections).
|
|
Uses persistent HTTP sessions and rate limiting for hosted Qdrant.
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import time
|
|
import requests
|
|
from urllib.parse import urljoin
|
|
|
|
SOURCE_URL = "http://macmini:6333"
|
|
TARGET_URL = "https://qdrant-dev.breakpilot.ai"
|
|
TARGET_API_KEY = "z9cKbT74vl1aKPD1QGIlKWfET47VH93u"
|
|
BATCH_SIZE = 20
|
|
RATE_LIMIT_DELAY = 0.3 # seconds between batches
|
|
|
|
SKIP_COLLECTIONS = {
|
|
"bp_nibis", "bp_nibis_eh", "bp_compliance_schulrecht", "bp_eh", "bp_vocab"
|
|
}
|
|
|
|
# Persistent sessions for connection reuse
|
|
source = requests.Session()
|
|
source.headers.update({"Content-Type": "application/json"})
|
|
source.timeout = 30
|
|
|
|
target = requests.Session()
|
|
target.headers.update({
|
|
"Content-Type": "application/json",
|
|
"api-key": TARGET_API_KEY,
|
|
})
|
|
target.timeout = 120
|
|
|
|
|
|
def log(msg):
|
|
print(f"\033[0;32m[INFO]\033[0m {msg}")
|
|
|
|
def warn(msg):
|
|
print(f"\033[1;33m[WARN]\033[0m {msg}")
|
|
|
|
def fail(msg):
|
|
print(f"\033[0;31m[FAIL]\033[0m {msg}")
|
|
sys.exit(1)
|
|
|
|
|
|
def get_collections():
|
|
resp = source.get(f"{SOURCE_URL}/collections")
|
|
resp.raise_for_status()
|
|
return [c["name"] for c in resp.json()["result"]["collections"]]
|
|
|
|
|
|
def get_collection_info(url, session, col):
|
|
resp = session.get(f"{url}/collections/{col}")
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def create_collection(col, config):
|
|
vectors = config["result"]["config"]["params"]["vectors"]
|
|
body = {"vectors": vectors}
|
|
params = config["result"]["config"]["params"]
|
|
if "shard_number" in params:
|
|
body["shard_number"] = params["shard_number"]
|
|
|
|
resp = target.put(f"{TARGET_URL}/collections/{col}", json=body)
|
|
if resp.status_code == 200:
|
|
return True
|
|
|
|
# Check if already exists
|
|
check = target.get(f"{TARGET_URL}/collections/{col}")
|
|
if check.status_code == 200 and check.json().get("status") == "ok":
|
|
warn(f" Collection already exists, will upsert points")
|
|
return True
|
|
|
|
warn(f" Failed to create collection: {resp.text}")
|
|
return False
|
|
|
|
|
|
def scroll_points(col, offset=None, retries=3):
|
|
body = {
|
|
"limit": BATCH_SIZE,
|
|
"with_payload": True,
|
|
"with_vector": True,
|
|
}
|
|
if offset is not None:
|
|
body["offset"] = offset
|
|
|
|
for attempt in range(retries):
|
|
try:
|
|
resp = source.post(f"{SOURCE_URL}/collections/{col}/points/scroll", json=body, timeout=60)
|
|
resp.raise_for_status()
|
|
result = resp.json()["result"]
|
|
return result["points"], result.get("next_page_offset")
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
warn(f" Source scroll error (attempt {attempt+1}/{retries}): {e}")
|
|
time.sleep(5 * (attempt + 1))
|
|
fail(f" Source unreachable after {retries} retries")
|
|
|
|
|
|
def upsert_points(col, points, retries=3):
|
|
for attempt in range(retries):
|
|
try:
|
|
resp = target.put(
|
|
f"{TARGET_URL}/collections/{col}/points?wait=true",
|
|
json={"points": points},
|
|
timeout=120,
|
|
)
|
|
if resp.status_code == 200:
|
|
return True
|
|
if resp.status_code == 502:
|
|
warn(f" 502 Bad Gateway (attempt {attempt+1}/{retries}), waiting...")
|
|
time.sleep(5 * (attempt + 1))
|
|
continue
|
|
warn(f" Upsert failed ({resp.status_code}): {resp.text[:200]}")
|
|
return False
|
|
except requests.exceptions.Timeout:
|
|
warn(f" Timeout (attempt {attempt+1}/{retries}), waiting...")
|
|
time.sleep(10 * (attempt + 1))
|
|
except requests.exceptions.ConnectionError:
|
|
warn(f" Connection error (attempt {attempt+1}/{retries}), waiting...")
|
|
time.sleep(15 * (attempt + 1))
|
|
return False
|
|
|
|
|
|
def migrate_collection(col):
|
|
info = get_collection_info(SOURCE_URL, source, col)
|
|
source_count = info["result"]["points_count"]
|
|
log(f" Source points: {source_count}")
|
|
|
|
if source_count == 0:
|
|
warn(" Empty collection, skipping")
|
|
return 0
|
|
|
|
if not create_collection(col, info):
|
|
return 0
|
|
|
|
# Wait for collection to be ready
|
|
time.sleep(3)
|
|
|
|
offset = None
|
|
migrated = 0
|
|
failed = 0
|
|
|
|
while True:
|
|
points, next_offset = scroll_points(col, offset)
|
|
|
|
if not points:
|
|
break
|
|
|
|
if upsert_points(col, points):
|
|
migrated += len(points)
|
|
else:
|
|
# Retry one-by-one
|
|
warn(f" Batch failed, retrying {len(points)} points individually...")
|
|
for pt in points:
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
if upsert_points(col, [pt]):
|
|
migrated += 1
|
|
else:
|
|
failed += 1
|
|
|
|
sys.stdout.write(f"\r Migrated: {migrated}/{source_count} (failed: {failed})")
|
|
sys.stdout.flush()
|
|
|
|
if next_offset is None:
|
|
break
|
|
offset = next_offset
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
print()
|
|
|
|
# Verify
|
|
try:
|
|
target_info = get_collection_info(TARGET_URL, target, col)
|
|
target_count = target_info["result"]["points_count"]
|
|
if target_count == source_count:
|
|
log(f" Verified: {target_count}/{source_count} points")
|
|
else:
|
|
warn(f" Count mismatch! Source: {source_count}, Target: {target_count}")
|
|
except Exception as e:
|
|
warn(f" Could not verify: {e}")
|
|
|
|
return migrated
|
|
|
|
|
|
def main():
|
|
log(f"Source: {SOURCE_URL}")
|
|
log(f"Target: {TARGET_URL}")
|
|
log(f"Batch size: {BATCH_SIZE}, Rate limit: {RATE_LIMIT_DELAY}s")
|
|
print()
|
|
|
|
# Preflight
|
|
try:
|
|
source.get(f"{SOURCE_URL}/collections").raise_for_status()
|
|
except Exception:
|
|
fail(f"Source not reachable at {SOURCE_URL}")
|
|
|
|
try:
|
|
target.get(f"{TARGET_URL}/collections").raise_for_status()
|
|
except Exception:
|
|
fail(f"Target not reachable at {TARGET_URL}")
|
|
|
|
collections = get_collections()
|
|
log("Collections on source:")
|
|
for col in collections:
|
|
if col in SKIP_COLLECTIONS:
|
|
print(f" \033[1;33mSKIP\033[0m {col}")
|
|
else:
|
|
print(f" \033[0;32m OK\033[0m {col}")
|
|
print()
|
|
|
|
total_migrated = 0
|
|
total_points = 0
|
|
|
|
for col in collections:
|
|
if col in SKIP_COLLECTIONS:
|
|
continue
|
|
|
|
log(f"=== Migrating: {col} ===")
|
|
points = migrate_collection(col)
|
|
total_migrated += 1
|
|
total_points += points
|
|
|
|
print()
|
|
log("=" * 40)
|
|
log(f"Migration complete!")
|
|
log(f"Collections migrated: {total_migrated}")
|
|
log(f"Total points migrated: {total_points}")
|
|
log("=" * 40)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|