Files
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

542 lines
17 KiB
Python

"""
Mail Aggregator Service
Multi-account IMAP aggregation with async support.
"""
import os
import ssl
import email
import asyncio
import logging
import smtplib
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timezone
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import decode_header, make_header
from email.utils import parsedate_to_datetime, parseaddr
from .credentials import get_credentials_service, MailCredentials
from .mail_db import (
get_email_accounts,
get_email_account,
update_account_status,
upsert_email,
get_unified_inbox,
)
from .models import (
AccountStatus,
AccountTestResult,
AggregatedEmail,
EmailComposeRequest,
EmailSendResult,
)
logger = logging.getLogger(__name__)
class IMAPConnectionError(Exception):
"""Raised when IMAP connection fails."""
pass
class SMTPConnectionError(Exception):
"""Raised when SMTP connection fails."""
pass
class MailAggregator:
"""
Aggregates emails from multiple IMAP accounts into a unified inbox.
Features:
- Connect to multiple IMAP accounts
- Fetch and cache emails in PostgreSQL
- Send emails via SMTP
- Handle connection pooling
"""
def __init__(self):
self._credentials_service = get_credentials_service()
self._imap_connections: Dict[str, Any] = {}
self._sync_lock = asyncio.Lock()
async def test_account_connection(
self,
imap_host: str,
imap_port: int,
imap_ssl: bool,
smtp_host: str,
smtp_port: int,
smtp_ssl: bool,
email_address: str,
password: str,
) -> AccountTestResult:
"""
Test IMAP and SMTP connection with provided credentials.
Returns:
AccountTestResult with connection status
"""
result = AccountTestResult(
success=False,
imap_connected=False,
smtp_connected=False,
)
# Test IMAP
try:
import imaplib
if imap_ssl:
imap = imaplib.IMAP4_SSL(imap_host, imap_port)
else:
imap = imaplib.IMAP4(imap_host, imap_port)
imap.login(email_address, password)
result.imap_connected = True
# List folders
status, folders = imap.list()
if status == "OK":
result.folders_found = [
self._parse_folder_name(f) for f in folders if f
]
imap.logout()
except Exception as e:
result.error_message = f"IMAP Error: {str(e)}"
logger.warning(f"IMAP test failed for {email_address}: {e}")
# Test SMTP
try:
if smtp_ssl:
smtp = smtplib.SMTP_SSL(smtp_host, smtp_port)
else:
smtp = smtplib.SMTP(smtp_host, smtp_port)
smtp.starttls()
smtp.login(email_address, password)
result.smtp_connected = True
smtp.quit()
except Exception as e:
smtp_error = f"SMTP Error: {str(e)}"
if result.error_message:
result.error_message += f"; {smtp_error}"
else:
result.error_message = smtp_error
logger.warning(f"SMTP test failed for {email_address}: {e}")
result.success = result.imap_connected and result.smtp_connected
return result
def _parse_folder_name(self, folder_response: bytes) -> str:
"""Parse folder name from IMAP LIST response."""
try:
# Format: '(\\HasNoChildren) "/" "INBOX"'
decoded = folder_response.decode("utf-8") if isinstance(folder_response, bytes) else folder_response
parts = decoded.rsplit('" "', 1)
if len(parts) == 2:
return parts[1].rstrip('"')
return decoded
except Exception:
return str(folder_response)
async def sync_account(
self,
account_id: str,
user_id: str,
max_emails: int = 100,
folders: Optional[List[str]] = None,
) -> Tuple[int, int]:
"""
Sync emails from an IMAP account.
Args:
account_id: The account ID
user_id: The user ID
max_emails: Maximum emails to fetch
folders: Specific folders to sync (default: INBOX)
Returns:
Tuple of (new_emails, total_emails)
"""
import imaplib
account = await get_email_account(account_id, user_id)
if not account:
raise ValueError(f"Account not found: {account_id}")
# Get credentials
vault_path = account.get("vault_path", "")
creds = await self._credentials_service.get_credentials(account_id, vault_path)
if not creds:
await update_account_status(account_id, "error", "Credentials not found")
raise IMAPConnectionError("Credentials not found")
new_count = 0
total_count = 0
try:
# Connect to IMAP
if account["imap_ssl"]:
imap = imaplib.IMAP4_SSL(account["imap_host"], account["imap_port"])
else:
imap = imaplib.IMAP4(account["imap_host"], account["imap_port"])
imap.login(creds.email, creds.password)
# Sync specified folders or just INBOX
sync_folders = folders or ["INBOX"]
for folder in sync_folders:
try:
status, _ = imap.select(folder)
if status != "OK":
continue
# Search for recent emails
status, messages = imap.search(None, "ALL")
if status != "OK":
continue
message_ids = messages[0].split()
total_count += len(message_ids)
# Fetch most recent emails
recent_ids = message_ids[-max_emails:] if len(message_ids) > max_emails else message_ids
for msg_id in recent_ids:
try:
email_data = await self._fetch_and_store_email(
imap, msg_id, account_id, user_id, account["tenant_id"], folder
)
if email_data:
new_count += 1
except Exception as e:
logger.warning(f"Failed to fetch email {msg_id}: {e}")
except Exception as e:
logger.warning(f"Failed to sync folder {folder}: {e}")
imap.logout()
# Update account status
await update_account_status(
account_id,
"active",
email_count=total_count,
unread_count=new_count, # Will be recalculated
)
return new_count, total_count
except Exception as e:
logger.error(f"Account sync failed: {e}")
await update_account_status(account_id, "error", str(e))
raise IMAPConnectionError(str(e))
async def _fetch_and_store_email(
self,
imap,
msg_id: bytes,
account_id: str,
user_id: str,
tenant_id: str,
folder: str,
) -> Optional[str]:
"""Fetch a single email and store it in the database."""
try:
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data or not msg_data[0]:
return None
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Parse headers
message_id = msg.get("Message-ID", str(msg_id))
subject = self._decode_header(msg.get("Subject", ""))
from_header = msg.get("From", "")
sender_name, sender_email = parseaddr(from_header)
sender_name = self._decode_header(sender_name)
# Parse recipients
to_header = msg.get("To", "")
recipients = [addr[1] for addr in email.utils.getaddresses([to_header])]
cc_header = msg.get("Cc", "")
cc = [addr[1] for addr in email.utils.getaddresses([cc_header])]
# Parse dates
date_str = msg.get("Date")
try:
date_sent = parsedate_to_datetime(date_str) if date_str else datetime.now(timezone.utc)
except Exception:
date_sent = datetime.now(timezone.utc)
date_received = datetime.now(timezone.utc)
# Parse body
body_text, body_html, attachments = self._parse_body(msg)
# Create preview
body_preview = (body_text[:200] + "...") if body_text and len(body_text) > 200 else body_text
# Get headers dict
headers = {k: self._decode_header(v) for k, v in msg.items() if k not in ["Body"]}
# Store in database
email_id = await upsert_email(
account_id=account_id,
user_id=user_id,
tenant_id=tenant_id,
message_id=message_id,
subject=subject,
sender_email=sender_email,
sender_name=sender_name,
recipients=recipients,
cc=cc,
body_preview=body_preview,
body_text=body_text,
body_html=body_html,
has_attachments=len(attachments) > 0,
attachments=attachments,
headers=headers,
folder=folder,
date_sent=date_sent,
date_received=date_received,
)
return email_id
except Exception as e:
logger.error(f"Failed to parse email: {e}")
return None
def _decode_header(self, header_value: str) -> str:
"""Decode email header value."""
if not header_value:
return ""
try:
decoded = decode_header(header_value)
return str(make_header(decoded))
except Exception:
return str(header_value)
def _parse_body(self, msg) -> Tuple[Optional[str], Optional[str], List[Dict]]:
"""
Parse email body and attachments.
Returns:
Tuple of (body_text, body_html, attachments)
"""
body_text = None
body_html = None
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Skip multipart containers
if content_type.startswith("multipart/"):
continue
# Check for attachments
if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
attachments.append({
"filename": self._decode_header(filename),
"content_type": content_type,
"size": len(part.get_payload(decode=True) or b""),
})
continue
# Get body content
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
if payload:
text = payload.decode(charset, errors="replace")
if content_type == "text/plain" and not body_text:
body_text = text
elif content_type == "text/html" and not body_html:
body_html = text
except Exception as e:
logger.debug(f"Failed to decode body part: {e}")
else:
# Single part message
content_type = msg.get_content_type()
try:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
if payload:
text = payload.decode(charset, errors="replace")
if content_type == "text/plain":
body_text = text
elif content_type == "text/html":
body_html = text
except Exception as e:
logger.debug(f"Failed to decode body: {e}")
return body_text, body_html, attachments
async def send_email(
self,
account_id: str,
user_id: str,
request: EmailComposeRequest,
) -> EmailSendResult:
"""
Send an email via SMTP.
Args:
account_id: The account to send from
user_id: The user ID
request: The compose request with recipients and content
Returns:
EmailSendResult with success status
"""
account = await get_email_account(account_id, user_id)
if not account:
return EmailSendResult(success=False, error="Account not found")
# Verify the account_id matches
if request.account_id != account_id:
return EmailSendResult(success=False, error="Account mismatch")
# Get credentials
vault_path = account.get("vault_path", "")
creds = await self._credentials_service.get_credentials(account_id, vault_path)
if not creds:
return EmailSendResult(success=False, error="Credentials not found")
try:
# Create message
if request.is_html:
msg = MIMEMultipart("alternative")
msg.attach(MIMEText(request.body, "html"))
else:
msg = MIMEText(request.body, "plain")
msg["Subject"] = request.subject
msg["From"] = account["email"]
msg["To"] = ", ".join(request.to)
if request.cc:
msg["Cc"] = ", ".join(request.cc)
if request.reply_to_message_id:
msg["In-Reply-To"] = request.reply_to_message_id
msg["References"] = request.reply_to_message_id
# Send via SMTP
if account["smtp_ssl"]:
smtp = smtplib.SMTP_SSL(account["smtp_host"], account["smtp_port"])
else:
smtp = smtplib.SMTP(account["smtp_host"], account["smtp_port"])
smtp.starttls()
smtp.login(creds.email, creds.password)
# All recipients
all_recipients = list(request.to)
if request.cc:
all_recipients.extend(request.cc)
if request.bcc:
all_recipients.extend(request.bcc)
smtp.sendmail(account["email"], all_recipients, msg.as_string())
smtp.quit()
return EmailSendResult(
success=True,
message_id=msg.get("Message-ID"),
)
except Exception as e:
logger.error(f"Failed to send email: {e}")
return EmailSendResult(success=False, error=str(e))
async def sync_all_accounts(self, user_id: str, tenant_id: Optional[str] = None) -> Dict[str, Any]:
"""
Sync all accounts for a user.
Returns:
Dict with sync results per account
"""
async with self._sync_lock:
accounts = await get_email_accounts(user_id, tenant_id)
results = {}
for account in accounts:
account_id = account["id"]
try:
new_count, total_count = await self.sync_account(
account_id, user_id, max_emails=50
)
results[account_id] = {
"status": "success",
"new_emails": new_count,
"total_emails": total_count,
}
except Exception as e:
results[account_id] = {
"status": "error",
"error": str(e),
}
return results
async def get_unified_inbox_emails(
self,
user_id: str,
account_ids: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
is_read: Optional[bool] = None,
is_starred: Optional[bool] = None,
limit: int = 50,
offset: int = 0,
) -> List[Dict]:
"""
Get unified inbox with all filters.
Returns:
List of email dictionaries
"""
return await get_unified_inbox(
user_id=user_id,
account_ids=account_ids,
categories=categories,
is_read=is_read,
is_starred=is_starred,
limit=limit,
offset=offset,
)
# Global instance
_aggregator: Optional[MailAggregator] = None
def get_mail_aggregator() -> MailAggregator:
"""Get or create the global MailAggregator instance."""
global _aggregator
if _aggregator is None:
_aggregator = MailAggregator()
return _aggregator