Files
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

422 lines
13 KiB
Python

"""
Inbox Task Service (Arbeitsvorrat)
Manages tasks extracted from emails and manual task creation.
"""
import logging
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from .models import (
TaskStatus,
TaskPriority,
InboxTask,
TaskCreate,
TaskUpdate,
TaskDashboardStats,
SenderType,
DeadlineExtraction,
)
from .mail_db import (
create_task as db_create_task,
get_tasks as db_get_tasks,
get_task as db_get_task,
update_task as db_update_task,
get_task_dashboard_stats as db_get_dashboard_stats,
get_email,
log_mail_audit,
)
logger = logging.getLogger(__name__)
class TaskService:
"""
Service for managing inbox tasks (Arbeitsvorrat).
Features:
- Create tasks from emails (auto or manual)
- Track deadlines and priorities
- Dashboard statistics
- Reminders (to be integrated with notification service)
"""
async def create_task_from_email(
self,
user_id: str,
tenant_id: str,
email_id: str,
deadlines: List[DeadlineExtraction],
sender_type: Optional[SenderType] = None,
auto_created: bool = False,
) -> Optional[str]:
"""
Create a task from an analyzed email.
Args:
user_id: The user ID
tenant_id: The tenant ID
email_id: The source email ID
deadlines: Extracted deadlines
sender_type: Classified sender type
auto_created: Whether this was auto-created by AI
Returns:
Task ID if created successfully
"""
# Get email details
email_data = await get_email(email_id, user_id)
if not email_data:
logger.warning(f"Email not found: {email_id}")
return None
# Determine priority
priority = TaskPriority.MEDIUM
if sender_type:
priority = self._get_priority_from_sender(sender_type)
# Get earliest deadline
deadline = None
if deadlines:
deadline = min(d.deadline_date for d in deadlines)
# Adjust priority based on deadline
priority = self._adjust_priority_for_deadline(priority, deadline)
# Create task title from email subject
subject = email_data.get("subject", "Keine Betreffzeile")
title = f"Bearbeiten: {subject[:100]}"
# Create description from deadlines
description = self._build_task_description(deadlines, email_data)
# Create the task
task_id = await db_create_task(
user_id=user_id,
tenant_id=tenant_id,
title=title,
description=description,
priority=priority.value,
deadline=deadline,
email_id=email_id,
account_id=email_data.get("account_id"),
source_email_subject=subject,
source_sender=email_data.get("sender_email"),
source_sender_type=sender_type.value if sender_type else None,
ai_extracted=auto_created,
confidence_score=deadlines[0].confidence if deadlines else None,
)
if task_id:
# Log audit event
await log_mail_audit(
user_id=user_id,
action="task_created",
entity_type="task",
entity_id=task_id,
details={
"email_id": email_id,
"auto_created": auto_created,
"deadline": deadline.isoformat() if deadline else None,
},
tenant_id=tenant_id,
)
logger.info(f"Created task {task_id} from email {email_id}")
return task_id
async def create_manual_task(
self,
user_id: str,
tenant_id: str,
task_data: TaskCreate,
) -> Optional[str]:
"""
Create a task manually (not from email).
Args:
user_id: The user ID
tenant_id: The tenant ID
task_data: Task creation data
Returns:
Task ID if created successfully
"""
# Get email details if linked
source_subject = None
source_sender = None
account_id = None
if task_data.email_id:
email_data = await get_email(task_data.email_id, user_id)
if email_data:
source_subject = email_data.get("subject")
source_sender = email_data.get("sender_email")
account_id = email_data.get("account_id")
task_id = await db_create_task(
user_id=user_id,
tenant_id=tenant_id,
title=task_data.title,
description=task_data.description,
priority=task_data.priority.value,
deadline=task_data.deadline,
email_id=task_data.email_id,
account_id=account_id,
source_email_subject=source_subject,
source_sender=source_sender,
ai_extracted=False,
)
if task_id:
await log_mail_audit(
user_id=user_id,
action="task_created_manual",
entity_type="task",
entity_id=task_id,
details={"title": task_data.title},
tenant_id=tenant_id,
)
return task_id
async def get_user_tasks(
self,
user_id: str,
status: Optional[TaskStatus] = None,
priority: Optional[TaskPriority] = None,
include_completed: bool = False,
limit: int = 50,
offset: int = 0,
) -> List[Dict]:
"""
Get tasks for a user with filtering.
Args:
user_id: The user ID
status: Filter by status
priority: Filter by priority
include_completed: Include completed tasks
limit: Maximum results
offset: Pagination offset
Returns:
List of task dictionaries
"""
return await db_get_tasks(
user_id=user_id,
status=status.value if status else None,
priority=priority.value if priority else None,
include_completed=include_completed,
limit=limit,
offset=offset,
)
async def get_task(self, task_id: str, user_id: str) -> Optional[Dict]:
"""Get a single task by ID."""
return await db_get_task(task_id, user_id)
async def update_task(
self,
task_id: str,
user_id: str,
updates: TaskUpdate,
) -> bool:
"""
Update a task.
Args:
task_id: The task ID
user_id: The user ID
updates: Fields to update
Returns:
True if successful
"""
update_dict = {}
if updates.title is not None:
update_dict["title"] = updates.title
if updates.description is not None:
update_dict["description"] = updates.description
if updates.priority is not None:
update_dict["priority"] = updates.priority.value
if updates.status is not None:
update_dict["status"] = updates.status.value
if updates.deadline is not None:
update_dict["deadline"] = updates.deadline
success = await db_update_task(task_id, user_id, **update_dict)
if success:
await log_mail_audit(
user_id=user_id,
action="task_updated",
entity_type="task",
entity_id=task_id,
details={"updates": update_dict},
)
return success
async def mark_completed(self, task_id: str, user_id: str) -> bool:
"""Mark a task as completed."""
success = await db_update_task(
task_id, user_id, status=TaskStatus.COMPLETED.value
)
if success:
await log_mail_audit(
user_id=user_id,
action="task_completed",
entity_type="task",
entity_id=task_id,
)
return success
async def mark_in_progress(self, task_id: str, user_id: str) -> bool:
"""Mark a task as in progress."""
return await db_update_task(
task_id, user_id, status=TaskStatus.IN_PROGRESS.value
)
async def get_dashboard_stats(self, user_id: str) -> TaskDashboardStats:
"""
Get dashboard statistics for a user.
Returns:
TaskDashboardStats with all metrics
"""
stats = await db_get_dashboard_stats(user_id)
return TaskDashboardStats(
total_tasks=stats.get("total_tasks", 0),
pending_tasks=stats.get("pending_tasks", 0),
in_progress_tasks=stats.get("in_progress_tasks", 0),
completed_tasks=stats.get("completed_tasks", 0),
overdue_tasks=stats.get("overdue_tasks", 0),
due_today=stats.get("due_today", 0),
due_this_week=stats.get("due_this_week", 0),
by_priority=stats.get("by_priority", {}),
by_sender_type=stats.get("by_sender_type", {}),
)
async def get_overdue_tasks(self, user_id: str) -> List[Dict]:
"""Get all overdue tasks for a user."""
all_tasks = await db_get_tasks(user_id, include_completed=False, limit=500)
now = datetime.now()
overdue = [
task for task in all_tasks
if task.get("deadline") and task["deadline"] < now
]
return overdue
async def get_tasks_due_soon(
self,
user_id: str,
days: int = 3,
) -> List[Dict]:
"""Get tasks due within the specified number of days."""
all_tasks = await db_get_tasks(user_id, include_completed=False, limit=500)
now = datetime.now()
deadline_cutoff = now + timedelta(days=days)
due_soon = [
task for task in all_tasks
if task.get("deadline") and now <= task["deadline"] <= deadline_cutoff
]
return sorted(due_soon, key=lambda t: t["deadline"])
# =========================================================================
# Helper Methods
# =========================================================================
def _get_priority_from_sender(self, sender_type: SenderType) -> TaskPriority:
"""Determine priority based on sender type."""
high_priority_senders = {
SenderType.KULTUSMINISTERIUM,
SenderType.LANDESSCHULBEHOERDE,
SenderType.RLSB,
SenderType.SCHULAMT,
}
medium_priority_senders = {
SenderType.NIBIS,
SenderType.SCHULTRAEGER,
SenderType.ELTERNVERTRETER,
}
if sender_type in high_priority_senders:
return TaskPriority.HIGH
elif sender_type in medium_priority_senders:
return TaskPriority.MEDIUM
else:
return TaskPriority.LOW
def _adjust_priority_for_deadline(
self,
current_priority: TaskPriority,
deadline: datetime,
) -> TaskPriority:
"""Adjust priority based on deadline proximity."""
now = datetime.now()
days_until = (deadline - now).days
if days_until <= 1:
return TaskPriority.URGENT
elif days_until <= 3:
return max(current_priority, TaskPriority.HIGH)
elif days_until <= 7:
return max(current_priority, TaskPriority.MEDIUM)
else:
return current_priority
def _build_task_description(
self,
deadlines: List[DeadlineExtraction],
email_data: Dict,
) -> str:
"""Build a task description from deadlines and email data."""
parts = []
# Add deadlines
if deadlines:
parts.append("**Fristen:**")
for d in deadlines:
date_str = d.deadline_date.strftime("%d.%m.%Y")
firm_str = " (verbindlich)" if d.is_firm else ""
parts.append(f"- {date_str}: {d.description}{firm_str}")
parts.append("")
# Add email info
sender = email_data.get("sender_email", "Unbekannt")
parts.append(f"**Von:** {sender}")
# Add preview
preview = email_data.get("body_preview", "")
if preview:
parts.append(f"\n**Vorschau:**\n{preview[:300]}...")
return "\n".join(parts)
# Global instance
_task_service: Optional[TaskService] = None
def get_task_service() -> TaskService:
"""Get or create the global TaskService instance."""
global _task_service
if _task_service is None:
_task_service = TaskService()
return _task_service