diff --git a/ai-compliance-sdk/internal/api/handlers/rag_handlers.go b/ai-compliance-sdk/internal/api/handlers/rag_handlers.go index 2ce83a5..ca7a780 100644 --- a/ai-compliance-sdk/internal/api/handlers/rag_handlers.go +++ b/ai-compliance-sdk/internal/api/handlers/rag_handlers.go @@ -27,7 +27,10 @@ var AllowedCollections = map[string]bool{ "bp_compliance_recht": true, "bp_compliance_gesetze": true, "bp_compliance_datenschutz": true, + "bp_compliance_gdpr": true, "bp_dsfa_corpus": true, + "bp_dsfa_templates": true, + "bp_dsfa_risks": true, "bp_legal_templates": true, } diff --git a/ai-compliance-sdk/internal/ucca/legal_rag.go b/ai-compliance-sdk/internal/ucca/legal_rag.go index 886b88f..4a1fbad 100644 --- a/ai-compliance-sdk/internal/ucca/legal_rag.go +++ b/ai-compliance-sdk/internal/ucca/legal_rag.go @@ -14,8 +14,8 @@ import ( // LegalRAGClient provides access to the compliance CE vector search via Qdrant + Ollama bge-m3. type LegalRAGClient struct { - qdrantHost string - qdrantPort string + qdrantURL string + qdrantAPIKey string ollamaURL string embeddingModel string collection string @@ -56,15 +56,14 @@ type CERegulationInfo struct { // NewLegalRAGClient creates a new Legal RAG client using Ollama bge-m3 embeddings. func NewLegalRAGClient() *LegalRAGClient { - qdrantHost := os.Getenv("QDRANT_HOST") - if qdrantHost == "" { - qdrantHost = "localhost" + qdrantURL := os.Getenv("QDRANT_URL") + if qdrantURL == "" { + qdrantURL = "http://localhost:6333" } + // Strip trailing slash + qdrantURL = strings.TrimRight(qdrantURL, "/") - qdrantPort := os.Getenv("QDRANT_PORT") - if qdrantPort == "" { - qdrantPort = "6333" - } + qdrantAPIKey := os.Getenv("QDRANT_API_KEY") ollamaURL := os.Getenv("OLLAMA_URL") if ollamaURL == "" { @@ -72,8 +71,8 @@ func NewLegalRAGClient() *LegalRAGClient { } return &LegalRAGClient{ - qdrantHost: qdrantHost, - qdrantPort: qdrantPort, + qdrantURL: qdrantURL, + qdrantAPIKey: qdrantAPIKey, ollamaURL: ollamaURL, embeddingModel: "bge-m3", collection: "bp_compliance_ce", @@ -220,12 +219,15 @@ func (c *LegalRAGClient) searchInternal(ctx context.Context, collection string, } // Call Qdrant - url := fmt.Sprintf("http://%s:%s/collections/%s/points/search", c.qdrantHost, c.qdrantPort, collection) + url := fmt.Sprintf("%s/collections/%s/points/search", c.qdrantURL, collection) req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonBody)) if err != nil { return nil, fmt.Errorf("failed to create search request: %w", err) } req.Header.Set("Content-Type", "application/json") + if c.qdrantAPIKey != "" { + req.Header.Set("api-key", c.qdrantAPIKey) + } resp, err := c.httpClient.Do(req) if err != nil { diff --git a/ai-compliance-sdk/internal/ucca/legal_rag_test.go b/ai-compliance-sdk/internal/ucca/legal_rag_test.go index d0db6e5..8a2d2c9 100644 --- a/ai-compliance-sdk/internal/ucca/legal_rag_test.go +++ b/ai-compliance-sdk/internal/ucca/legal_rag_test.go @@ -31,12 +31,8 @@ func TestSearchCollection_UsesCorrectCollection(t *testing.T) { defer qdrantMock.Close() // Parse qdrant mock host/port - qdrantAddr := strings.TrimPrefix(qdrantMock.URL, "http://") - parts := strings.Split(qdrantAddr, ":") - client := &LegalRAGClient{ - qdrantHost: parts[0], - qdrantPort: parts[1], + qdrantURL: qdrantMock.URL, ollamaURL: ollamaMock.URL, embeddingModel: "bge-m3", collection: "bp_compliance_ce", @@ -72,12 +68,8 @@ func TestSearchCollection_FallbackDefault(t *testing.T) { })) defer qdrantMock.Close() - qdrantAddr := strings.TrimPrefix(qdrantMock.URL, "http://") - parts := strings.Split(qdrantAddr, ":") - client := &LegalRAGClient{ - qdrantHost: parts[0], - qdrantPort: parts[1], + qdrantURL: qdrantMock.URL, ollamaURL: ollamaMock.URL, embeddingModel: "bge-m3", collection: "bp_compliance_ce", @@ -126,12 +118,8 @@ func TestSearch_StillWorks(t *testing.T) { })) defer qdrantMock.Close() - qdrantAddr := strings.TrimPrefix(qdrantMock.URL, "http://") - parts := strings.Split(qdrantAddr, ":") - client := &LegalRAGClient{ - qdrantHost: parts[0], - qdrantPort: parts[1], + qdrantURL: qdrantMock.URL, ollamaURL: ollamaMock.URL, embeddingModel: "bge-m3", collection: "bp_compliance_ce", diff --git a/docker-compose.yml b/docker-compose.yml index f07834f..9759fed 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -179,9 +179,10 @@ services: expose: - "8095" environment: - MINIO_ENDPOINT: bp-core-minio:9000 - MINIO_ACCESS_KEY: ${MINIO_ROOT_USER:-breakpilot} - MINIO_SECRET_KEY: ${MINIO_ROOT_PASSWORD:-breakpilot123} + MINIO_ENDPOINT: nbg1.your-objectstorage.com + MINIO_ACCESS_KEY: T18RGFVXXG2ZHQ5404TP + MINIO_SECRET_KEY: KOUU4WO6wh07cQjNgh0IZHkeKQrVfBz6hnIGpNss + MINIO_SECURE: "true" PIPER_MODEL_PATH: /app/models/de_DE-thorsten-high.onnx depends_on: core-health-check: diff --git a/scripts/migrate-qdrant.py b/scripts/migrate-qdrant.py new file mode 100755 index 0000000..e7242ef --- /dev/null +++ b/scripts/migrate-qdrant.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +Qdrant Migration: Local → qdrant-dev.breakpilot.ai +Migrates compliance collections only (skips NIBIS/Lehrer collections). +Uses persistent HTTP sessions and rate limiting for hosted Qdrant. +""" + +import json +import sys +import time +import requests +from urllib.parse import urljoin + +SOURCE_URL = "http://macmini:6333" +TARGET_URL = "https://qdrant-dev.breakpilot.ai" +TARGET_API_KEY = "z9cKbT74vl1aKPD1QGIlKWfET47VH93u" +BATCH_SIZE = 20 +RATE_LIMIT_DELAY = 0.3 # seconds between batches + +SKIP_COLLECTIONS = { + "bp_nibis", "bp_nibis_eh", "bp_compliance_schulrecht", "bp_eh", "bp_vocab" +} + +# Persistent sessions for connection reuse +source = requests.Session() +source.headers.update({"Content-Type": "application/json"}) +source.timeout = 30 + +target = requests.Session() +target.headers.update({ + "Content-Type": "application/json", + "api-key": TARGET_API_KEY, +}) +target.timeout = 120 + + +def log(msg): + print(f"\033[0;32m[INFO]\033[0m {msg}") + +def warn(msg): + print(f"\033[1;33m[WARN]\033[0m {msg}") + +def fail(msg): + print(f"\033[0;31m[FAIL]\033[0m {msg}") + sys.exit(1) + + +def get_collections(): + resp = source.get(f"{SOURCE_URL}/collections") + resp.raise_for_status() + return [c["name"] for c in resp.json()["result"]["collections"]] + + +def get_collection_info(url, session, col): + resp = session.get(f"{url}/collections/{col}") + resp.raise_for_status() + return resp.json() + + +def create_collection(col, config): + vectors = config["result"]["config"]["params"]["vectors"] + body = {"vectors": vectors} + params = config["result"]["config"]["params"] + if "shard_number" in params: + body["shard_number"] = params["shard_number"] + + resp = target.put(f"{TARGET_URL}/collections/{col}", json=body) + if resp.status_code == 200: + return True + + # Check if already exists + check = target.get(f"{TARGET_URL}/collections/{col}") + if check.status_code == 200 and check.json().get("status") == "ok": + warn(f" Collection already exists, will upsert points") + return True + + warn(f" Failed to create collection: {resp.text}") + return False + + +def scroll_points(col, offset=None, retries=3): + body = { + "limit": BATCH_SIZE, + "with_payload": True, + "with_vector": True, + } + if offset is not None: + body["offset"] = offset + + for attempt in range(retries): + try: + resp = source.post(f"{SOURCE_URL}/collections/{col}/points/scroll", json=body, timeout=60) + resp.raise_for_status() + result = resp.json()["result"] + return result["points"], result.get("next_page_offset") + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + warn(f" Source scroll error (attempt {attempt+1}/{retries}): {e}") + time.sleep(5 * (attempt + 1)) + fail(f" Source unreachable after {retries} retries") + + +def upsert_points(col, points, retries=3): + for attempt in range(retries): + try: + resp = target.put( + f"{TARGET_URL}/collections/{col}/points?wait=true", + json={"points": points}, + timeout=120, + ) + if resp.status_code == 200: + return True + if resp.status_code == 502: + warn(f" 502 Bad Gateway (attempt {attempt+1}/{retries}), waiting...") + time.sleep(5 * (attempt + 1)) + continue + warn(f" Upsert failed ({resp.status_code}): {resp.text[:200]}") + return False + except requests.exceptions.Timeout: + warn(f" Timeout (attempt {attempt+1}/{retries}), waiting...") + time.sleep(10 * (attempt + 1)) + except requests.exceptions.ConnectionError: + warn(f" Connection error (attempt {attempt+1}/{retries}), waiting...") + time.sleep(15 * (attempt + 1)) + return False + + +def migrate_collection(col): + info = get_collection_info(SOURCE_URL, source, col) + source_count = info["result"]["points_count"] + log(f" Source points: {source_count}") + + if source_count == 0: + warn(" Empty collection, skipping") + return 0 + + if not create_collection(col, info): + return 0 + + # Wait for collection to be ready + time.sleep(3) + + offset = None + migrated = 0 + failed = 0 + + while True: + points, next_offset = scroll_points(col, offset) + + if not points: + break + + if upsert_points(col, points): + migrated += len(points) + else: + # Retry one-by-one + warn(f" Batch failed, retrying {len(points)} points individually...") + for pt in points: + time.sleep(RATE_LIMIT_DELAY) + if upsert_points(col, [pt]): + migrated += 1 + else: + failed += 1 + + sys.stdout.write(f"\r Migrated: {migrated}/{source_count} (failed: {failed})") + sys.stdout.flush() + + if next_offset is None: + break + offset = next_offset + time.sleep(RATE_LIMIT_DELAY) + + print() + + # Verify + try: + target_info = get_collection_info(TARGET_URL, target, col) + target_count = target_info["result"]["points_count"] + if target_count == source_count: + log(f" Verified: {target_count}/{source_count} points") + else: + warn(f" Count mismatch! Source: {source_count}, Target: {target_count}") + except Exception as e: + warn(f" Could not verify: {e}") + + return migrated + + +def main(): + log(f"Source: {SOURCE_URL}") + log(f"Target: {TARGET_URL}") + log(f"Batch size: {BATCH_SIZE}, Rate limit: {RATE_LIMIT_DELAY}s") + print() + + # Preflight + try: + source.get(f"{SOURCE_URL}/collections").raise_for_status() + except Exception: + fail(f"Source not reachable at {SOURCE_URL}") + + try: + target.get(f"{TARGET_URL}/collections").raise_for_status() + except Exception: + fail(f"Target not reachable at {TARGET_URL}") + + collections = get_collections() + log("Collections on source:") + for col in collections: + if col in SKIP_COLLECTIONS: + print(f" \033[1;33mSKIP\033[0m {col}") + else: + print(f" \033[0;32m OK\033[0m {col}") + print() + + total_migrated = 0 + total_points = 0 + + for col in collections: + if col in SKIP_COLLECTIONS: + continue + + log(f"=== Migrating: {col} ===") + points = migrate_collection(col) + total_migrated += 1 + total_points += points + + print() + log("=" * 40) + log(f"Migration complete!") + log(f"Collections migrated: {total_migrated}") + log(f"Total points migrated: {total_points}") + log("=" * 40) + + +if __name__ == "__main__": + main() diff --git a/scripts/migrate-qdrant.sh b/scripts/migrate-qdrant.sh new file mode 100755 index 0000000..dab8eea --- /dev/null +++ b/scripts/migrate-qdrant.sh @@ -0,0 +1,236 @@ +#!/usr/bin/env bash +set -euo pipefail + +# ============================================================================= +# Qdrant Migration: Local → qdrant-dev.breakpilot.ai +# Migrates compliance collections only (skips NIBIS/Lehrer collections) +# ============================================================================= + +SOURCE_URL="${SOURCE_URL:-http://macmini:6333}" +TARGET_URL="${TARGET_URL:-https://qdrant-dev.breakpilot.ai}" +TARGET_API_KEY="${TARGET_API_KEY:-z9cKbT74vl1aKPD1QGIlKWfET47VH93u}" +BATCH_SIZE=15 + +# Collections to SKIP (Lehrer/NIBIS) +SKIP_COLLECTIONS="bp_nibis bp_nibis_eh bp_compliance_schulrecht bp_eh bp_vocab" + +TMPDIR=$(mktemp -d) +trap "rm -rf $TMPDIR" EXIT + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +log() { echo -e "${GREEN}[INFO]${NC} $*"; } +warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } +fail() { echo -e "${RED}[FAIL]${NC} $*"; exit 1; } + +should_skip() { + local col="$1" + for skip in $SKIP_COLLECTIONS; do + [[ "$col" == "$skip" ]] && return 0 + done + return 1 +} + +target_curl() { + curl -s --max-time 60 -H "api-key: ${TARGET_API_KEY}" -H "Content-Type: application/json" "$@" +} + +source_curl() { + curl -s -H "Content-Type: application/json" "$@" +} + +# --- Preflight checks --- +log "Source: $SOURCE_URL" +log "Target: $TARGET_URL" +echo "" + +if ! source_curl "${SOURCE_URL}/collections" >/dev/null 2>&1; then + fail "Source Qdrant not reachable at ${SOURCE_URL}" +fi + +if ! target_curl "${TARGET_URL}/collections" >/dev/null 2>&1; then + fail "Target Qdrant not reachable at ${TARGET_URL}" +fi + +# --- Get all collections from source --- +COLLECTIONS=$(source_curl "${SOURCE_URL}/collections" | python3 -c " +import sys, json +data = json.load(sys.stdin) +for c in data['result']['collections']: + print(c['name']) +") + +log "Found collections on source:" +echo "$COLLECTIONS" | while read -r col; do + if should_skip "$col"; then + echo -e " ${YELLOW}SKIP${NC} $col" + else + echo -e " ${GREEN} OK${NC} $col" + fi +done +echo "" + +# --- Migrate each collection --- +TOTAL_MIGRATED=0 +TOTAL_POINTS=0 + +while read -r col; do + [[ -z "$col" ]] && continue + + if should_skip "$col"; then + continue + fi + + log "=== Migrating: $col ===" + + # 1. Get collection info from source + source_curl "${SOURCE_URL}/collections/${col}" > "$TMPDIR/col_info.json" + STATUS=$(python3 -c "import json; print(json.load(open('$TMPDIR/col_info.json'))['status'])" 2>/dev/null || echo "error") + if [[ "$STATUS" != "ok" ]]; then + warn "Collection $col has status '$STATUS', skipping" + continue + fi + + SOURCE_COUNT=$(python3 -c "import json; print(json.load(open('$TMPDIR/col_info.json'))['result']['points_count'])") + log " Source points: $SOURCE_COUNT" + + if [[ "$SOURCE_COUNT" == "0" ]]; then + warn " Empty collection, skipping" + continue + fi + + # 2. Create collection on target + python3 -c " +import json +info = json.load(open('$TMPDIR/col_info.json'))['result']['config']['params'] +vectors = info['vectors'] +body = {'vectors': vectors} +if 'shard_number' in info: + body['shard_number'] = info['shard_number'] +json.dump(body, open('$TMPDIR/create_body.json', 'w')) +" + + log " Creating collection on target..." + CREATE_RESP=$(target_curl -X PUT "${TARGET_URL}/collections/${col}" -d @"$TMPDIR/create_body.json") + CREATE_STATUS=$(echo "$CREATE_RESP" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('status','error'))" 2>/dev/null || echo "error") + + if [[ "$CREATE_STATUS" != "ok" ]]; then + EXISTS=$(target_curl "${TARGET_URL}/collections/${col}" | python3 -c "import sys,json; print(json.load(sys.stdin).get('status','error'))" 2>/dev/null || echo "no") + if [[ "$EXISTS" == "ok" ]]; then + warn " Collection already exists on target, will upsert points" + else + warn " Failed to create collection: $CREATE_RESP" + continue + fi + fi + + # Wait for collection to be ready + sleep 2 + + # 3. Scroll all points in batches and upsert to target + OFFSET="" + MIGRATED=0 + + while true; do + # Build scroll request + if [[ -z "$OFFSET" ]]; then + echo "{\"limit\": ${BATCH_SIZE}, \"with_payload\": true, \"with_vector\": true}" > "$TMPDIR/scroll_req.json" + else + echo "{\"limit\": ${BATCH_SIZE}, \"with_payload\": true, \"with_vector\": true, \"offset\": ${OFFSET}}" > "$TMPDIR/scroll_req.json" + fi + + source_curl -X POST "${SOURCE_URL}/collections/${col}/points/scroll" -d @"$TMPDIR/scroll_req.json" > "$TMPDIR/scroll_resp.json" + + # Extract points and next offset + python3 -c " +import json +data = json.load(open('$TMPDIR/scroll_resp.json')) +points = data['result']['points'] +next_offset = data['result'].get('next_page_offset') +json.dump({'points': points}, open('$TMPDIR/upsert_body.json', 'w')) +with open('$TMPDIR/next_offset.txt', 'w') as f: + f.write(str(next_offset) if next_offset is not None else 'DONE') +with open('$TMPDIR/batch_count.txt', 'w') as f: + f.write(str(len(points))) +" + + NEXT_OFFSET=$(cat "$TMPDIR/next_offset.txt") + BATCH_COUNT=$(cat "$TMPDIR/batch_count.txt") + + if [[ "$BATCH_COUNT" == "0" ]]; then + break + fi + + # Upsert to target using file (with retry: split into single points on failure) + UPSERT_RESP=$(target_curl -X PUT "${TARGET_URL}/collections/${col}/points?wait=true" -d @"$TMPDIR/upsert_body.json") + UPSERT_STATUS=$(echo "$UPSERT_RESP" | python3 -c " +import sys, json +s = sys.stdin.read().strip() +if not s: + print('error') +else: + d = json.loads(s) + print(d.get('status','error')) +" 2>/dev/null || echo "error") + + if [[ "$UPSERT_STATUS" != "ok" ]]; then + warn " Batch upsert failed, retrying one-by-one..." + # Split into individual points and retry + SINGLE_OK=0 + SINGLE_FAIL=0 + python3 -c " +import json +data = json.load(open('$TMPDIR/upsert_body.json')) +for i, pt in enumerate(data['points']): + json.dump({'points': [pt]}, open(f'$TMPDIR/single_{i}.json', 'w')) +print(len(data['points'])) +" > "$TMPDIR/single_count.txt" + SINGLE_COUNT=$(cat "$TMPDIR/single_count.txt") + for ((si=0; si/dev/null || echo "error") + if [[ "$SS" == "ok" ]]; then + SINGLE_OK=$((SINGLE_OK + 1)) + else + SINGLE_FAIL=$((SINGLE_FAIL + 1)) + fi + done + if [[ "$SINGLE_FAIL" -gt 0 ]]; then + warn " Single-point retry: $SINGLE_OK ok, $SINGLE_FAIL failed" + fi + fi + + MIGRATED=$((MIGRATED + BATCH_COUNT)) + echo -ne " Migrated: ${MIGRATED}/${SOURCE_COUNT}\r" + + if [[ "$NEXT_OFFSET" == "DONE" ]]; then + break + fi + OFFSET="$NEXT_OFFSET" + done + + echo "" + + # 4. Verify point count + TARGET_COUNT=$(target_curl "${TARGET_URL}/collections/${col}" | python3 -c "import sys,json; print(json.load(sys.stdin)['result']['points_count'])") + + if [[ "$TARGET_COUNT" == "$SOURCE_COUNT" ]]; then + log " Verified: ${TARGET_COUNT}/${SOURCE_COUNT} points" + else + warn " Count mismatch! Source: ${SOURCE_COUNT}, Target: ${TARGET_COUNT}" + fi + + TOTAL_MIGRATED=$((TOTAL_MIGRATED + 1)) + TOTAL_POINTS=$((TOTAL_POINTS + MIGRATED)) + +done <<< "$COLLECTIONS" + +echo "" +log "==============================" +log "Migration complete!" +log "Collections migrated: $TOTAL_MIGRATED" +log "Total points migrated: $TOTAL_POINTS" +log "=============================="