Files
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

374 lines
12 KiB
Python

"""
Mail Credentials Service
Secure storage and retrieval of email account credentials using HashiCorp Vault.
Falls back to encrypted database storage in development.
"""
import os
import base64
import hashlib
import logging
from typing import Optional, Dict
from dataclasses import dataclass
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
logger = logging.getLogger(__name__)
# Environment
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
VAULT_ADDR = os.getenv("VAULT_ADDR", "")
MAIL_CREDENTIALS_DIR = os.getenv("MAIL_CREDENTIALS_DIR", os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "mail_credentials"))
@dataclass
class MailCredentials:
"""Mail account credentials."""
email: str
password: str
imap_host: str
imap_port: int
smtp_host: str
smtp_port: int
class MailCredentialsService:
"""
Service for storing and retrieving mail credentials securely.
In production: Uses HashiCorp Vault KV v2
In development: Uses Fernet encryption with a derived key
"""
def __init__(self):
self._vault_client = None
self._vault_available = False
self._encryption_key = None
# Try to initialize Vault
if VAULT_ADDR:
self._init_vault()
else:
# Development fallback: use encryption key
self._init_encryption()
def _init_vault(self):
"""Initialize Vault client for credential storage."""
try:
import hvac
vault_token = os.getenv("VAULT_TOKEN")
if not vault_token:
logger.warning("VAULT_ADDR set but no VAULT_TOKEN - Vault disabled")
self._init_encryption()
return
self._vault_client = hvac.Client(
url=VAULT_ADDR,
token=vault_token,
)
if self._vault_client.is_authenticated():
self._vault_available = True
logger.info("Mail credentials service: Vault initialized")
else:
logger.warning("Vault authentication failed - using encryption fallback")
self._init_encryption()
except ImportError:
logger.warning("hvac not installed - using encryption fallback")
self._init_encryption()
except Exception as e:
logger.warning(f"Vault initialization failed: {e}")
self._init_encryption()
def _init_encryption(self):
"""Initialize encryption for development/fallback mode."""
# Derive key from environment secret - REQUIRED
secret = os.getenv("MAIL_ENCRYPTION_SECRET")
if not secret:
raise RuntimeError("MAIL_ENCRYPTION_SECRET nicht konfiguriert - bitte via Vault oder Umgebungsvariable setzen")
salt = os.getenv("MAIL_ENCRYPTION_SALT", "breakpilot-mail-salt").encode()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000,
)
key = base64.urlsafe_b64encode(kdf.derive(secret.encode()))
self._encryption_key = Fernet(key)
logger.info("Mail credentials service: Using encrypted storage")
def _get_vault_path(self, account_id: str) -> str:
"""Generate Vault path for a mail account."""
return f"breakpilot/mail/accounts/{account_id}"
async def store_credentials(
self,
account_id: str,
email: str,
password: str,
imap_host: str,
imap_port: int,
smtp_host: str,
smtp_port: int,
) -> str:
"""
Store mail credentials securely.
Returns:
vault_path: Path or reference to stored credentials
"""
if self._vault_available:
return await self._store_in_vault(
account_id, email, password, imap_host, imap_port, smtp_host, smtp_port
)
else:
return await self._store_encrypted(
account_id, email, password, imap_host, imap_port, smtp_host, smtp_port
)
async def _store_in_vault(
self,
account_id: str,
email: str,
password: str,
imap_host: str,
imap_port: int,
smtp_host: str,
smtp_port: int,
) -> str:
"""Store credentials in Vault KV v2."""
path = self._get_vault_path(account_id)
try:
self._vault_client.secrets.kv.v2.create_or_update_secret(
path=path,
secret={
"email": email,
"password": password,
"imap_host": imap_host,
"imap_port": str(imap_port),
"smtp_host": smtp_host,
"smtp_port": str(smtp_port),
},
mount_point="secret",
)
logger.info(f"Stored credentials in Vault for account {account_id}")
return f"vault:{path}"
except Exception as e:
logger.error(f"Failed to store credentials in Vault: {e}")
raise
async def _store_encrypted(
self,
account_id: str,
email: str,
password: str,
imap_host: str,
imap_port: int,
smtp_host: str,
smtp_port: int,
) -> str:
"""Store credentials encrypted (development fallback)."""
import json
credentials = {
"email": email,
"password": password,
"imap_host": imap_host,
"imap_port": imap_port,
"smtp_host": smtp_host,
"smtp_port": smtp_port,
}
# Encrypt the credentials
encrypted = self._encryption_key.encrypt(json.dumps(credentials).encode())
# Store in file (development only)
os.makedirs(MAIL_CREDENTIALS_DIR, exist_ok=True)
path = f"{MAIL_CREDENTIALS_DIR}/{account_id}.enc"
with open(path, "wb") as f:
f.write(encrypted)
logger.info(f"Stored encrypted credentials for account {account_id}")
return f"file:{path}"
async def get_credentials(self, account_id: str, vault_path: str) -> Optional[MailCredentials]:
"""
Retrieve mail credentials.
Args:
account_id: The account ID
vault_path: The storage path (from store_credentials)
Returns:
MailCredentials or None if not found
"""
if vault_path.startswith("vault:"):
return await self._get_from_vault(vault_path[6:])
elif vault_path.startswith("file:"):
return await self._get_from_file(vault_path[5:])
else:
# Legacy path format
return await self._get_from_vault(vault_path)
async def _get_from_vault(self, path: str) -> Optional[MailCredentials]:
"""Retrieve credentials from Vault."""
if not self._vault_available:
logger.warning("Vault not available for credential retrieval")
return None
try:
response = self._vault_client.secrets.kv.v2.read_secret_version(
path=path,
mount_point="secret",
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
return MailCredentials(
email=data["email"],
password=data["password"],
imap_host=data["imap_host"],
imap_port=int(data["imap_port"]),
smtp_host=data["smtp_host"],
smtp_port=int(data["smtp_port"]),
)
except Exception as e:
logger.error(f"Failed to retrieve credentials from Vault: {e}")
return None
async def _get_from_file(self, path: str) -> Optional[MailCredentials]:
"""Retrieve credentials from encrypted file (development)."""
import json
try:
with open(path, "rb") as f:
encrypted = f.read()
decrypted = self._encryption_key.decrypt(encrypted)
data = json.loads(decrypted.decode())
return MailCredentials(
email=data["email"],
password=data["password"],
imap_host=data["imap_host"],
imap_port=data["imap_port"],
smtp_host=data["smtp_host"],
smtp_port=data["smtp_port"],
)
except FileNotFoundError:
logger.warning(f"Credentials file not found: {path}")
except Exception as e:
logger.error(f"Failed to decrypt credentials: {e}")
return None
async def delete_credentials(self, account_id: str, vault_path: str) -> bool:
"""
Delete stored credentials.
Args:
account_id: The account ID
vault_path: The storage path
Returns:
True if deleted successfully
"""
if vault_path.startswith("vault:"):
return await self._delete_from_vault(vault_path[6:])
elif vault_path.startswith("file:"):
return await self._delete_from_file(vault_path[5:])
return False
async def _delete_from_vault(self, path: str) -> bool:
"""Delete credentials from Vault."""
if not self._vault_available:
return False
try:
self._vault_client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path,
mount_point="secret",
)
logger.info(f"Deleted credentials from Vault: {path}")
return True
except Exception as e:
logger.error(f"Failed to delete credentials from Vault: {e}")
return False
async def _delete_from_file(self, path: str) -> bool:
"""Delete credentials file."""
try:
os.remove(path)
logger.info(f"Deleted credentials file: {path}")
return True
except FileNotFoundError:
return True # Already deleted
except Exception as e:
logger.error(f"Failed to delete credentials file: {e}")
return False
async def update_password(
self,
account_id: str,
vault_path: str,
new_password: str,
) -> bool:
"""
Update the password for stored credentials.
Args:
account_id: The account ID
vault_path: The storage path
new_password: The new password
Returns:
True if updated successfully
"""
# Get existing credentials
creds = await self.get_credentials(account_id, vault_path)
if not creds:
return False
# Store with new password
try:
await self.store_credentials(
account_id=account_id,
email=creds.email,
password=new_password,
imap_host=creds.imap_host,
imap_port=creds.imap_port,
smtp_host=creds.smtp_host,
smtp_port=creds.smtp_port,
)
return True
except Exception as e:
logger.error(f"Failed to update password: {e}")
return False
# Global instance
_credentials_service: Optional[MailCredentialsService] = None
def get_credentials_service() -> MailCredentialsService:
"""Get or create the global MailCredentialsService instance."""
global _credentials_service
if _credentials_service is None:
_credentials_service = MailCredentialsService()
return _credentials_service