This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/secret_store/vault_client.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

516 lines
16 KiB
Python

"""
HashiCorp Vault Client for BreakPilot
Provides secure secrets management with:
- Vault KV v2 secrets engine support
- Automatic token renewal
- Fallback to environment variables in development
- Docker secrets support for containerized environments
- Caching to reduce Vault API calls
Usage:
from secret_store import get_secret
# Get a secret (tries Vault first, then env, then Docker secrets)
api_key = get_secret("anthropic_api_key")
# Get with default value
debug = get_secret("debug_mode", default="false")
License: Apache-2.0 (HashiCorp Vault compatible)
"""
import os
import logging
import functools
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from pathlib import Path
import json
import time
logger = logging.getLogger(__name__)
# =============================================
# Exceptions
# =============================================
class SecretNotFoundError(Exception):
"""Raised when a secret cannot be found in any source."""
pass
# =============================================
# Environment Variable to Vault Path Mapping
# =============================================
# Maps common environment variable names to Vault paths
# Format: env_var_name -> vault_path (relative to secrets_path)
ENV_TO_VAULT_MAPPING = {
# API Keys
"ANTHROPIC_API_KEY": "api_keys/anthropic",
"VAST_API_KEY": "api_keys/vast",
"TAVILY_API_KEY": "api_keys/tavily",
"STRIPE_SECRET_KEY": "api_keys/stripe",
"STRIPE_WEBHOOK_SECRET": "api_keys/stripe_webhook",
"OPENAI_API_KEY": "api_keys/openai",
# Database
"DATABASE_URL": "database/postgres",
"POSTGRES_PASSWORD": "database/postgres",
"SYNAPSE_DB_PASSWORD": "database/synapse",
# Auth
"JWT_SECRET": "auth/jwt",
"JWT_REFRESH_SECRET": "auth/jwt",
"KEYCLOAK_CLIENT_SECRET": "auth/keycloak",
# Communication
"MATRIX_ACCESS_TOKEN": "communication/matrix",
"JITSI_APP_SECRET": "communication/jitsi",
# Storage
"MINIO_ACCESS_KEY": "storage/minio",
"MINIO_SECRET_KEY": "storage/minio",
# Infrastructure
"CONTROL_API_KEY": "infra/vast",
"VAST_INSTANCE_ID": "infra/vast",
}
class VaultConnectionError(Exception):
"""Raised when Vault is configured but unreachable."""
pass
class VaultAuthenticationError(Exception):
"""Raised when Vault authentication fails."""
pass
# =============================================
# Configuration
# =============================================
@dataclass
class VaultConfig:
"""Configuration for HashiCorp Vault connection."""
# Vault server URL
url: str = ""
# Authentication method: "token", "approle", "kubernetes"
auth_method: str = "token"
# Token for token auth
token: Optional[str] = None
# AppRole credentials
role_id: Optional[str] = None
secret_id: Optional[str] = None
# Kubernetes auth
kubernetes_role: Optional[str] = None
kubernetes_jwt_path: str = "/var/run/secrets/kubernetes.io/serviceaccount/token"
# KV secrets engine configuration
secrets_mount: str = "secret" # KV v2 mount path
secrets_path: str = "breakpilot" # Base path for secrets
# TLS configuration
verify_ssl: bool = True
ca_cert: Optional[str] = None
# Caching
cache_ttl_seconds: int = 300 # 5 minutes
# Namespace (Vault Enterprise)
namespace: Optional[str] = None
# =============================================
# Secrets Manager
# =============================================
class SecretsManager:
"""
Unified secrets management with multiple backends.
Priority order:
1. HashiCorp Vault (if configured and available)
2. Environment variables
3. Docker secrets (/run/secrets/)
4. Default value (if provided)
"""
def __init__(
self,
vault_config: Optional[VaultConfig] = None,
environment: str = "development"
):
self.vault_config = vault_config
self.environment = environment
self._vault_client = None
self._cache: Dict[str, tuple] = {} # key -> (value, expiry_time)
self._vault_available = False
# Initialize Vault client if configured
if vault_config and vault_config.url:
self._init_vault_client()
def _init_vault_client(self):
"""Initialize the Vault client with hvac library."""
try:
import hvac
config = self.vault_config
# Create client
self._vault_client = hvac.Client(
url=config.url,
verify=config.verify_ssl if config.ca_cert is None else config.ca_cert,
namespace=config.namespace,
)
# Authenticate based on method
if config.auth_method == "token":
if config.token:
self._vault_client.token = config.token
elif config.auth_method == "approle":
if config.role_id and config.secret_id:
self._vault_client.auth.approle.login(
role_id=config.role_id,
secret_id=config.secret_id,
)
elif config.auth_method == "kubernetes":
jwt_path = Path(config.kubernetes_jwt_path)
if jwt_path.exists():
jwt = jwt_path.read_text().strip()
self._vault_client.auth.kubernetes.login(
role=config.kubernetes_role,
jwt=jwt,
)
# Check if authenticated
if self._vault_client.is_authenticated():
self._vault_available = True
logger.info("Vault client initialized successfully")
else:
logger.warning("Vault client created but not authenticated")
except ImportError:
logger.warning("hvac library not installed - Vault integration disabled")
except Exception as e:
logger.warning(f"Failed to initialize Vault client: {e}")
def _get_from_vault(self, key: str) -> Optional[str]:
"""Get a secret from Vault KV v2."""
if not self._vault_available or not self._vault_client:
return None
try:
config = self.vault_config
# Use mapping if available, otherwise use key as path
vault_path = ENV_TO_VAULT_MAPPING.get(key, key)
path = f"{config.secrets_path}/{vault_path}"
# Read from KV v2
response = self._vault_client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=config.secrets_mount,
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
# For multi-field secrets like JWT, get specific field
field_mapping = {
"JWT_SECRET": "secret",
"JWT_REFRESH_SECRET": "refresh_secret",
"DATABASE_URL": "url",
"POSTGRES_PASSWORD": "password",
"MINIO_ACCESS_KEY": "access_key",
"MINIO_SECRET_KEY": "secret_key",
"VAST_INSTANCE_ID": "instance_id",
"CONTROL_API_KEY": "control_key",
}
# Get specific field if mapped
if key in field_mapping and field_mapping[key] in data:
return data[field_mapping[key]]
# Return the 'value' field or the first field
if "value" in data:
return data["value"]
elif data:
return list(data.values())[0]
except Exception as e:
logger.debug(f"Vault lookup failed for {key}: {e}")
return None
def _get_from_env(self, key: str) -> Optional[str]:
"""Get a secret from environment variables."""
# Try exact key
value = os.environ.get(key)
if value:
return value
# Try uppercase
value = os.environ.get(key.upper())
if value:
return value
return None
def _get_from_docker_secrets(self, key: str) -> Optional[str]:
"""Get a secret from Docker secrets mount."""
secrets_dir = Path("/run/secrets")
if not secrets_dir.exists():
return None
# Try exact filename
secret_file = secrets_dir / key
if secret_file.exists():
return secret_file.read_text().strip()
# Try lowercase
secret_file = secrets_dir / key.lower()
if secret_file.exists():
return secret_file.read_text().strip()
return None
def _check_cache(self, key: str) -> Optional[str]:
"""Check if a secret is cached and not expired."""
if key in self._cache:
value, expiry = self._cache[key]
if time.time() < expiry:
return value
else:
del self._cache[key]
return None
def _cache_secret(self, key: str, value: str):
"""Cache a secret with TTL."""
if self.vault_config:
ttl = self.vault_config.cache_ttl_seconds
else:
ttl = 300
self._cache[key] = (value, time.time() + ttl)
def get(
self,
key: str,
default: Optional[str] = None,
required: bool = False,
cache: bool = True
) -> Optional[str]:
"""
Get a secret from the best available source.
Args:
key: The secret key to look up
default: Default value if not found
required: If True, raise SecretNotFoundError when not found
cache: Whether to cache the result
Returns:
The secret value or default
Raises:
SecretNotFoundError: If required and not found
"""
# Check cache first
if cache:
cached = self._check_cache(key)
if cached is not None:
return cached
# Try Vault (production)
value = self._get_from_vault(key)
# Fall back to environment variables
if value is None:
value = self._get_from_env(key)
# Fall back to Docker secrets
if value is None:
value = self._get_from_docker_secrets(key)
# Use default or raise error
if value is None:
if required:
raise SecretNotFoundError(
f"Secret '{key}' not found in Vault, environment, or Docker secrets"
)
value = default
# Cache the result
if value is not None and cache:
self._cache_secret(key, value)
return value
def get_all(self, prefix: str = "") -> Dict[str, str]:
"""
Get all secrets with a given prefix.
Args:
prefix: Filter secrets by prefix
Returns:
Dictionary of secret key -> value
"""
secrets = {}
# Get from Vault if available
if self._vault_available and self._vault_client:
try:
config = self.vault_config
path = f"{config.secrets_path}/{prefix}" if prefix else config.secrets_path
response = self._vault_client.secrets.kv.v2.list_secrets(
path=path,
mount_point=config.secrets_mount,
)
if response and "data" in response and "keys" in response["data"]:
for key in response["data"]["keys"]:
if not key.endswith("/"): # Skip directories
full_key = f"{prefix}/{key}" if prefix else key
value = self.get(full_key, cache=False)
if value:
secrets[full_key] = value
except Exception as e:
logger.debug(f"Vault list failed: {e}")
# Also get from environment (for development)
for key, value in os.environ.items():
if prefix:
if key.lower().startswith(prefix.lower()):
secrets[key] = value
else:
# Only add secrets that look like secrets
if any(s in key.lower() for s in ["key", "secret", "password", "token"]):
secrets[key] = value
return secrets
def is_vault_available(self) -> bool:
"""Check if Vault is available and authenticated."""
return self._vault_available
def clear_cache(self):
"""Clear the secrets cache."""
self._cache.clear()
# =============================================
# Global Instance & Helper Functions
# =============================================
_secrets_manager: Optional[SecretsManager] = None
def get_secrets_manager() -> SecretsManager:
"""Get or create the global SecretsManager instance."""
global _secrets_manager
if _secrets_manager is None:
# Read Vault configuration from environment
vault_url = os.environ.get("VAULT_ADDR", "")
if vault_url:
config = VaultConfig(
url=vault_url,
auth_method=os.environ.get("VAULT_AUTH_METHOD", "token"),
token=os.environ.get("VAULT_TOKEN"),
role_id=os.environ.get("VAULT_ROLE_ID"),
secret_id=os.environ.get("VAULT_SECRET_ID"),
kubernetes_role=os.environ.get("VAULT_K8S_ROLE"),
secrets_mount=os.environ.get("VAULT_SECRETS_MOUNT", "secret"),
secrets_path=os.environ.get("VAULT_SECRETS_PATH", "breakpilot"),
verify_ssl=os.environ.get("VAULT_SKIP_VERIFY", "false").lower() != "true",
namespace=os.environ.get("VAULT_NAMESPACE"),
)
else:
config = None
environment = os.environ.get("ENVIRONMENT", "development")
_secrets_manager = SecretsManager(vault_config=config, environment=environment)
return _secrets_manager
def get_secret(
key: str,
default: Optional[str] = None,
required: bool = False
) -> Optional[str]:
"""
Convenience function to get a secret.
Usage:
api_key = get_secret("ANTHROPIC_API_KEY")
db_url = get_secret("DATABASE_URL", required=True)
"""
manager = get_secrets_manager()
return manager.get(key, default=default, required=required)
# =============================================
# Secret Key Mappings
# =============================================
# Mapping of standard secret names to their paths in Vault
SECRET_MAPPINGS = {
# API Keys
"ANTHROPIC_API_KEY": "api_keys/anthropic",
"VAST_API_KEY": "api_keys/vast",
"TAVILY_API_KEY": "api_keys/tavily",
"STRIPE_SECRET_KEY": "api_keys/stripe",
"STRIPE_WEBHOOK_SECRET": "api_keys/stripe_webhook",
# Database
"DATABASE_URL": "database/url",
"POSTGRES_PASSWORD": "database/postgres_password",
# JWT
"JWT_SECRET": "auth/jwt_secret",
"JWT_REFRESH_SECRET": "auth/jwt_refresh_secret",
# Keycloak
"KEYCLOAK_CLIENT_SECRET": "auth/keycloak_client_secret",
# Matrix
"MATRIX_ACCESS_TOKEN": "communication/matrix_token",
"SYNAPSE_DB_PASSWORD": "communication/synapse_db_password",
# Jitsi
"JITSI_APP_SECRET": "communication/jitsi_secret",
"JITSI_JICOFO_AUTH_PASSWORD": "communication/jitsi_jicofo",
"JITSI_JVB_AUTH_PASSWORD": "communication/jitsi_jvb",
# MinIO
"MINIO_SECRET_KEY": "storage/minio_secret",
}
def get_mapped_secret(key: str, default: Optional[str] = None) -> Optional[str]:
"""
Get a secret using the standard name mapping.
This maps environment variable names to Vault paths for consistency.
"""
vault_path = SECRET_MAPPINGS.get(key, key)
return get_secret(vault_path, default=default)