Files
breakpilot-compliance/scripts/migrate-qdrant.py
T
Benjamin Admin 5f8009e844
CI / detect-changes (pull_request) Successful in 8s
CI / branch-name (pull_request) Successful in 1s
CI / guardrail-integrity (pull_request) Successful in 5s
CI / secret-scan (pull_request) Successful in 6s
CI / dep-audit (pull_request) Failing after 54s
CI / sbom-scan (pull_request) Failing after 1m3s
CI / build-sha-integrity (pull_request) Successful in 5s
CI / validate-canonical-controls (pull_request) Successful in 4s
CI / loc-budget (pull_request) Successful in 17s
CI / go-lint (pull_request) Failing after 13s
CI / python-lint (pull_request) Failing after 13s
CI / nodejs-lint (pull_request) Failing after 1m8s
CI / nodejs-build (pull_request) Successful in 3m0s
CI / test-go (pull_request) Successful in 1m0s
CI / iace-gt-coverage (pull_request) Successful in 22s
CI / test-python-backend (pull_request) Successful in 30s
CI / test-python-document-crawler (pull_request) Successful in 13s
CI / test-python-dsms-gateway (pull_request) Successful in 16s
fix(security): remove hardcoded Qdrant key + allowlist doc false-positives
secret-scan (gitleaks) had never run on a PR (broken checkout). A real Qdrant dev API key was hardcoded in 4 pre-existing files; removed in favour of env / gitea-secret references (scripts read QDRANT_API_KEY from os.environ; rag-ingest workflow references a gitea Actions secret). The remaining ~52 findings are doc curl examples + .env.example placeholders + a rule_key identifier, allowlisted in .gitleaks.toml (default ruleset kept). gitleaks now reports 0 findings.

ACTION REQUIRED: rotate the Qdrant dev API key — the leaked value is in git history.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-21 14:37:54 +02:00

237 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Qdrant Migration: Local → qdrant-dev.breakpilot.ai
Migrates compliance collections only (skips NIBIS/Lehrer collections).
Uses persistent HTTP sessions and rate limiting for hosted Qdrant.
"""
import json
import os
import sys
import time
import requests
from urllib.parse import urljoin
SOURCE_URL = "http://macmini:6333"
TARGET_URL = "https://qdrant-dev.breakpilot.ai"
TARGET_API_KEY = os.environ.get("QDRANT_API_KEY", "")
BATCH_SIZE = 20
RATE_LIMIT_DELAY = 0.3 # seconds between batches
SKIP_COLLECTIONS = {
"bp_nibis", "bp_nibis_eh", "bp_compliance_schulrecht", "bp_eh", "bp_vocab"
}
# Persistent sessions for connection reuse
source = requests.Session()
source.headers.update({"Content-Type": "application/json"})
source.timeout = 30
target = requests.Session()
target.headers.update({
"Content-Type": "application/json",
"api-key": TARGET_API_KEY,
})
target.timeout = 120
def log(msg):
print(f"\033[0;32m[INFO]\033[0m {msg}")
def warn(msg):
print(f"\033[1;33m[WARN]\033[0m {msg}")
def fail(msg):
print(f"\033[0;31m[FAIL]\033[0m {msg}")
sys.exit(1)
def get_collections():
resp = source.get(f"{SOURCE_URL}/collections")
resp.raise_for_status()
return [c["name"] for c in resp.json()["result"]["collections"]]
def get_collection_info(url, session, col):
resp = session.get(f"{url}/collections/{col}")
resp.raise_for_status()
return resp.json()
def create_collection(col, config):
vectors = config["result"]["config"]["params"]["vectors"]
body = {"vectors": vectors}
params = config["result"]["config"]["params"]
if "shard_number" in params:
body["shard_number"] = params["shard_number"]
resp = target.put(f"{TARGET_URL}/collections/{col}", json=body)
if resp.status_code == 200:
return True
# Check if already exists
check = target.get(f"{TARGET_URL}/collections/{col}")
if check.status_code == 200 and check.json().get("status") == "ok":
warn(f" Collection already exists, will upsert points")
return True
warn(f" Failed to create collection: {resp.text}")
return False
def scroll_points(col, offset=None, retries=3):
body = {
"limit": BATCH_SIZE,
"with_payload": True,
"with_vector": True,
}
if offset is not None:
body["offset"] = offset
for attempt in range(retries):
try:
resp = source.post(f"{SOURCE_URL}/collections/{col}/points/scroll", json=body, timeout=60)
resp.raise_for_status()
result = resp.json()["result"]
return result["points"], result.get("next_page_offset")
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
warn(f" Source scroll error (attempt {attempt+1}/{retries}): {e}")
time.sleep(5 * (attempt + 1))
fail(f" Source unreachable after {retries} retries")
def upsert_points(col, points, retries=3):
for attempt in range(retries):
try:
resp = target.put(
f"{TARGET_URL}/collections/{col}/points?wait=true",
json={"points": points},
timeout=120,
)
if resp.status_code == 200:
return True
if resp.status_code == 502:
warn(f" 502 Bad Gateway (attempt {attempt+1}/{retries}), waiting...")
time.sleep(5 * (attempt + 1))
continue
warn(f" Upsert failed ({resp.status_code}): {resp.text[:200]}")
return False
except requests.exceptions.Timeout:
warn(f" Timeout (attempt {attempt+1}/{retries}), waiting...")
time.sleep(10 * (attempt + 1))
except requests.exceptions.ConnectionError:
warn(f" Connection error (attempt {attempt+1}/{retries}), waiting...")
time.sleep(15 * (attempt + 1))
return False
def migrate_collection(col):
info = get_collection_info(SOURCE_URL, source, col)
source_count = info["result"]["points_count"]
log(f" Source points: {source_count}")
if source_count == 0:
warn(" Empty collection, skipping")
return 0
if not create_collection(col, info):
return 0
# Wait for collection to be ready
time.sleep(3)
offset = None
migrated = 0
failed = 0
while True:
points, next_offset = scroll_points(col, offset)
if not points:
break
if upsert_points(col, points):
migrated += len(points)
else:
# Retry one-by-one
warn(f" Batch failed, retrying {len(points)} points individually...")
for pt in points:
time.sleep(RATE_LIMIT_DELAY)
if upsert_points(col, [pt]):
migrated += 1
else:
failed += 1
sys.stdout.write(f"\r Migrated: {migrated}/{source_count} (failed: {failed})")
sys.stdout.flush()
if next_offset is None:
break
offset = next_offset
time.sleep(RATE_LIMIT_DELAY)
print()
# Verify
try:
target_info = get_collection_info(TARGET_URL, target, col)
target_count = target_info["result"]["points_count"]
if target_count == source_count:
log(f" Verified: {target_count}/{source_count} points")
else:
warn(f" Count mismatch! Source: {source_count}, Target: {target_count}")
except Exception as e:
warn(f" Could not verify: {e}")
return migrated
def main():
log(f"Source: {SOURCE_URL}")
log(f"Target: {TARGET_URL}")
log(f"Batch size: {BATCH_SIZE}, Rate limit: {RATE_LIMIT_DELAY}s")
print()
# Preflight
try:
source.get(f"{SOURCE_URL}/collections").raise_for_status()
except Exception:
fail(f"Source not reachable at {SOURCE_URL}")
try:
target.get(f"{TARGET_URL}/collections").raise_for_status()
except Exception:
fail(f"Target not reachable at {TARGET_URL}")
collections = get_collections()
log("Collections on source:")
for col in collections:
if col in SKIP_COLLECTIONS:
print(f" \033[1;33mSKIP\033[0m {col}")
else:
print(f" \033[0;32m OK\033[0m {col}")
print()
total_migrated = 0
total_points = 0
for col in collections:
if col in SKIP_COLLECTIONS:
continue
log(f"=== Migrating: {col} ===")
points = migrate_collection(col)
total_migrated += 1
total_points += points
print()
log("=" * 40)
log(f"Migration complete!")
log(f"Collections migrated: {total_migrated}")
log(f"Total points migrated: {total_points}")
log("=" * 40)
if __name__ == "__main__":
main()