Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
374 lines
12 KiB
Python
374 lines
12 KiB
Python
"""
|
|
Mail Credentials Service
|
|
|
|
Secure storage and retrieval of email account credentials using HashiCorp Vault.
|
|
Falls back to encrypted database storage in development.
|
|
"""
|
|
|
|
import os
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
from typing import Optional, Dict
|
|
from dataclasses import dataclass
|
|
from cryptography.fernet import Fernet
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Environment
|
|
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
|
|
VAULT_ADDR = os.getenv("VAULT_ADDR", "")
|
|
MAIL_CREDENTIALS_DIR = os.getenv("MAIL_CREDENTIALS_DIR", os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "mail_credentials"))
|
|
|
|
|
|
@dataclass
|
|
class MailCredentials:
|
|
"""Mail account credentials."""
|
|
email: str
|
|
password: str
|
|
imap_host: str
|
|
imap_port: int
|
|
smtp_host: str
|
|
smtp_port: int
|
|
|
|
|
|
class MailCredentialsService:
|
|
"""
|
|
Service for storing and retrieving mail credentials securely.
|
|
|
|
In production: Uses HashiCorp Vault KV v2
|
|
In development: Uses Fernet encryption with a derived key
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._vault_client = None
|
|
self._vault_available = False
|
|
self._encryption_key = None
|
|
|
|
# Try to initialize Vault
|
|
if VAULT_ADDR:
|
|
self._init_vault()
|
|
else:
|
|
# Development fallback: use encryption key
|
|
self._init_encryption()
|
|
|
|
def _init_vault(self):
|
|
"""Initialize Vault client for credential storage."""
|
|
try:
|
|
import hvac
|
|
|
|
vault_token = os.getenv("VAULT_TOKEN")
|
|
if not vault_token:
|
|
logger.warning("VAULT_ADDR set but no VAULT_TOKEN - Vault disabled")
|
|
self._init_encryption()
|
|
return
|
|
|
|
self._vault_client = hvac.Client(
|
|
url=VAULT_ADDR,
|
|
token=vault_token,
|
|
)
|
|
|
|
if self._vault_client.is_authenticated():
|
|
self._vault_available = True
|
|
logger.info("Mail credentials service: Vault initialized")
|
|
else:
|
|
logger.warning("Vault authentication failed - using encryption fallback")
|
|
self._init_encryption()
|
|
|
|
except ImportError:
|
|
logger.warning("hvac not installed - using encryption fallback")
|
|
self._init_encryption()
|
|
except Exception as e:
|
|
logger.warning(f"Vault initialization failed: {e}")
|
|
self._init_encryption()
|
|
|
|
def _init_encryption(self):
|
|
"""Initialize encryption for development/fallback mode."""
|
|
# Derive key from environment secret - REQUIRED
|
|
secret = os.getenv("MAIL_ENCRYPTION_SECRET")
|
|
if not secret:
|
|
raise RuntimeError("MAIL_ENCRYPTION_SECRET nicht konfiguriert - bitte via Vault oder Umgebungsvariable setzen")
|
|
salt = os.getenv("MAIL_ENCRYPTION_SALT", "breakpilot-mail-salt").encode()
|
|
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=salt,
|
|
iterations=480000,
|
|
)
|
|
|
|
key = base64.urlsafe_b64encode(kdf.derive(secret.encode()))
|
|
self._encryption_key = Fernet(key)
|
|
logger.info("Mail credentials service: Using encrypted storage")
|
|
|
|
def _get_vault_path(self, account_id: str) -> str:
|
|
"""Generate Vault path for a mail account."""
|
|
return f"breakpilot/mail/accounts/{account_id}"
|
|
|
|
async def store_credentials(
|
|
self,
|
|
account_id: str,
|
|
email: str,
|
|
password: str,
|
|
imap_host: str,
|
|
imap_port: int,
|
|
smtp_host: str,
|
|
smtp_port: int,
|
|
) -> str:
|
|
"""
|
|
Store mail credentials securely.
|
|
|
|
Returns:
|
|
vault_path: Path or reference to stored credentials
|
|
"""
|
|
if self._vault_available:
|
|
return await self._store_in_vault(
|
|
account_id, email, password, imap_host, imap_port, smtp_host, smtp_port
|
|
)
|
|
else:
|
|
return await self._store_encrypted(
|
|
account_id, email, password, imap_host, imap_port, smtp_host, smtp_port
|
|
)
|
|
|
|
async def _store_in_vault(
|
|
self,
|
|
account_id: str,
|
|
email: str,
|
|
password: str,
|
|
imap_host: str,
|
|
imap_port: int,
|
|
smtp_host: str,
|
|
smtp_port: int,
|
|
) -> str:
|
|
"""Store credentials in Vault KV v2."""
|
|
path = self._get_vault_path(account_id)
|
|
|
|
try:
|
|
self._vault_client.secrets.kv.v2.create_or_update_secret(
|
|
path=path,
|
|
secret={
|
|
"email": email,
|
|
"password": password,
|
|
"imap_host": imap_host,
|
|
"imap_port": str(imap_port),
|
|
"smtp_host": smtp_host,
|
|
"smtp_port": str(smtp_port),
|
|
},
|
|
mount_point="secret",
|
|
)
|
|
logger.info(f"Stored credentials in Vault for account {account_id}")
|
|
return f"vault:{path}"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store credentials in Vault: {e}")
|
|
raise
|
|
|
|
async def _store_encrypted(
|
|
self,
|
|
account_id: str,
|
|
email: str,
|
|
password: str,
|
|
imap_host: str,
|
|
imap_port: int,
|
|
smtp_host: str,
|
|
smtp_port: int,
|
|
) -> str:
|
|
"""Store credentials encrypted (development fallback)."""
|
|
import json
|
|
|
|
credentials = {
|
|
"email": email,
|
|
"password": password,
|
|
"imap_host": imap_host,
|
|
"imap_port": imap_port,
|
|
"smtp_host": smtp_host,
|
|
"smtp_port": smtp_port,
|
|
}
|
|
|
|
# Encrypt the credentials
|
|
encrypted = self._encryption_key.encrypt(json.dumps(credentials).encode())
|
|
|
|
# Store in file (development only)
|
|
os.makedirs(MAIL_CREDENTIALS_DIR, exist_ok=True)
|
|
|
|
path = f"{MAIL_CREDENTIALS_DIR}/{account_id}.enc"
|
|
with open(path, "wb") as f:
|
|
f.write(encrypted)
|
|
|
|
logger.info(f"Stored encrypted credentials for account {account_id}")
|
|
return f"file:{path}"
|
|
|
|
async def get_credentials(self, account_id: str, vault_path: str) -> Optional[MailCredentials]:
|
|
"""
|
|
Retrieve mail credentials.
|
|
|
|
Args:
|
|
account_id: The account ID
|
|
vault_path: The storage path (from store_credentials)
|
|
|
|
Returns:
|
|
MailCredentials or None if not found
|
|
"""
|
|
if vault_path.startswith("vault:"):
|
|
return await self._get_from_vault(vault_path[6:])
|
|
elif vault_path.startswith("file:"):
|
|
return await self._get_from_file(vault_path[5:])
|
|
else:
|
|
# Legacy path format
|
|
return await self._get_from_vault(vault_path)
|
|
|
|
async def _get_from_vault(self, path: str) -> Optional[MailCredentials]:
|
|
"""Retrieve credentials from Vault."""
|
|
if not self._vault_available:
|
|
logger.warning("Vault not available for credential retrieval")
|
|
return None
|
|
|
|
try:
|
|
response = self._vault_client.secrets.kv.v2.read_secret_version(
|
|
path=path,
|
|
mount_point="secret",
|
|
)
|
|
|
|
if response and "data" in response and "data" in response["data"]:
|
|
data = response["data"]["data"]
|
|
return MailCredentials(
|
|
email=data["email"],
|
|
password=data["password"],
|
|
imap_host=data["imap_host"],
|
|
imap_port=int(data["imap_port"]),
|
|
smtp_host=data["smtp_host"],
|
|
smtp_port=int(data["smtp_port"]),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve credentials from Vault: {e}")
|
|
|
|
return None
|
|
|
|
async def _get_from_file(self, path: str) -> Optional[MailCredentials]:
|
|
"""Retrieve credentials from encrypted file (development)."""
|
|
import json
|
|
|
|
try:
|
|
with open(path, "rb") as f:
|
|
encrypted = f.read()
|
|
|
|
decrypted = self._encryption_key.decrypt(encrypted)
|
|
data = json.loads(decrypted.decode())
|
|
|
|
return MailCredentials(
|
|
email=data["email"],
|
|
password=data["password"],
|
|
imap_host=data["imap_host"],
|
|
imap_port=data["imap_port"],
|
|
smtp_host=data["smtp_host"],
|
|
smtp_port=data["smtp_port"],
|
|
)
|
|
|
|
except FileNotFoundError:
|
|
logger.warning(f"Credentials file not found: {path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to decrypt credentials: {e}")
|
|
|
|
return None
|
|
|
|
async def delete_credentials(self, account_id: str, vault_path: str) -> bool:
|
|
"""
|
|
Delete stored credentials.
|
|
|
|
Args:
|
|
account_id: The account ID
|
|
vault_path: The storage path
|
|
|
|
Returns:
|
|
True if deleted successfully
|
|
"""
|
|
if vault_path.startswith("vault:"):
|
|
return await self._delete_from_vault(vault_path[6:])
|
|
elif vault_path.startswith("file:"):
|
|
return await self._delete_from_file(vault_path[5:])
|
|
return False
|
|
|
|
async def _delete_from_vault(self, path: str) -> bool:
|
|
"""Delete credentials from Vault."""
|
|
if not self._vault_available:
|
|
return False
|
|
|
|
try:
|
|
self._vault_client.secrets.kv.v2.delete_metadata_and_all_versions(
|
|
path=path,
|
|
mount_point="secret",
|
|
)
|
|
logger.info(f"Deleted credentials from Vault: {path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete credentials from Vault: {e}")
|
|
return False
|
|
|
|
async def _delete_from_file(self, path: str) -> bool:
|
|
"""Delete credentials file."""
|
|
try:
|
|
os.remove(path)
|
|
logger.info(f"Deleted credentials file: {path}")
|
|
return True
|
|
except FileNotFoundError:
|
|
return True # Already deleted
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete credentials file: {e}")
|
|
return False
|
|
|
|
async def update_password(
|
|
self,
|
|
account_id: str,
|
|
vault_path: str,
|
|
new_password: str,
|
|
) -> bool:
|
|
"""
|
|
Update the password for stored credentials.
|
|
|
|
Args:
|
|
account_id: The account ID
|
|
vault_path: The storage path
|
|
new_password: The new password
|
|
|
|
Returns:
|
|
True if updated successfully
|
|
"""
|
|
# Get existing credentials
|
|
creds = await self.get_credentials(account_id, vault_path)
|
|
if not creds:
|
|
return False
|
|
|
|
# Store with new password
|
|
try:
|
|
await self.store_credentials(
|
|
account_id=account_id,
|
|
email=creds.email,
|
|
password=new_password,
|
|
imap_host=creds.imap_host,
|
|
imap_port=creds.imap_port,
|
|
smtp_host=creds.smtp_host,
|
|
smtp_port=creds.smtp_port,
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to update password: {e}")
|
|
return False
|
|
|
|
|
|
# Global instance
|
|
_credentials_service: Optional[MailCredentialsService] = None
|
|
|
|
|
|
def get_credentials_service() -> MailCredentialsService:
|
|
"""Get or create the global MailCredentialsService instance."""
|
|
global _credentials_service
|
|
|
|
if _credentials_service is None:
|
|
_credentials_service = MailCredentialsService()
|
|
|
|
return _credentials_service
|