Files
breakpilot-lehrer/backend-lehrer/messenger_api.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

841 lines
26 KiB
Python

"""
BreakPilot Messenger API
Stellt Endpoints fuer:
- Kontaktverwaltung (CRUD)
- Konversationen
- Nachrichten
- CSV-Import fuer Kontakte
- Gruppenmanagement
DSGVO-konform: Alle Daten werden lokal gespeichert.
"""
import os
import csv
import uuid
import json
from io import StringIO
from datetime import datetime
from typing import List, Optional, Dict, Any
from pathlib import Path
from fastapi import APIRouter, HTTPException, UploadFile, File, Query
from pydantic import BaseModel, Field
router = APIRouter(prefix="/api/messenger", tags=["Messenger"])
# Datenspeicherung (JSON-basiert fuer einfache Persistenz)
DATA_DIR = Path(__file__).parent / "data" / "messenger"
DATA_DIR.mkdir(parents=True, exist_ok=True)
CONTACTS_FILE = DATA_DIR / "contacts.json"
CONVERSATIONS_FILE = DATA_DIR / "conversations.json"
MESSAGES_FILE = DATA_DIR / "messages.json"
GROUPS_FILE = DATA_DIR / "groups.json"
# ==========================================
# PYDANTIC MODELS
# ==========================================
class ContactBase(BaseModel):
"""Basis-Modell fuer Kontakte."""
name: str = Field(..., min_length=1, max_length=200)
email: Optional[str] = None
phone: Optional[str] = None
role: str = Field(default="parent", description="parent, teacher, staff, student")
student_name: Optional[str] = Field(None, description="Name des zugehoerigen Schuelers")
class_name: Optional[str] = Field(None, description="Klasse z.B. 10a")
notes: Optional[str] = None
tags: List[str] = Field(default_factory=list)
matrix_id: Optional[str] = Field(None, description="Matrix-ID z.B. @user:matrix.org")
preferred_channel: str = Field(default="email", description="email, matrix, pwa")
class ContactCreate(ContactBase):
"""Model fuer neuen Kontakt."""
pass
class Contact(ContactBase):
"""Vollstaendiger Kontakt mit ID."""
id: str
created_at: str
updated_at: str
online: bool = False
last_seen: Optional[str] = None
class ContactUpdate(BaseModel):
"""Update-Model fuer Kontakte."""
name: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
role: Optional[str] = None
student_name: Optional[str] = None
class_name: Optional[str] = None
notes: Optional[str] = None
tags: Optional[List[str]] = None
matrix_id: Optional[str] = None
preferred_channel: Optional[str] = None
class GroupBase(BaseModel):
"""Basis-Modell fuer Gruppen."""
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = None
group_type: str = Field(default="class", description="class, department, custom")
class GroupCreate(GroupBase):
"""Model fuer neue Gruppe."""
member_ids: List[str] = Field(default_factory=list)
class Group(GroupBase):
"""Vollstaendige Gruppe mit ID."""
id: str
member_ids: List[str] = []
created_at: str
updated_at: str
class MessageBase(BaseModel):
"""Basis-Modell fuer Nachrichten."""
content: str = Field(..., min_length=1)
content_type: str = Field(default="text", description="text, file, image")
file_url: Optional[str] = None
send_email: bool = Field(default=False, description="Nachricht auch per Email senden")
class MessageCreate(MessageBase):
"""Model fuer neue Nachricht."""
conversation_id: str
class Message(MessageBase):
"""Vollstaendige Nachricht mit ID."""
id: str
conversation_id: str
sender_id: str # "self" fuer eigene Nachrichten
timestamp: str
read: bool = False
read_at: Optional[str] = None
email_sent: bool = False
email_sent_at: Optional[str] = None
email_error: Optional[str] = None
class ConversationBase(BaseModel):
"""Basis-Modell fuer Konversationen."""
name: Optional[str] = None
is_group: bool = False
class Conversation(ConversationBase):
"""Vollstaendige Konversation mit ID."""
id: str
participant_ids: List[str] = []
group_id: Optional[str] = None
created_at: str
updated_at: str
last_message: Optional[str] = None
last_message_time: Optional[str] = None
unread_count: int = 0
class CSVImportResult(BaseModel):
"""Ergebnis eines CSV-Imports."""
imported: int
skipped: int
errors: List[str]
contacts: List[Contact]
# ==========================================
# DATA HELPERS
# ==========================================
def load_json(filepath: Path) -> List[Dict]:
"""Laedt JSON-Daten aus Datei."""
if not filepath.exists():
return []
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return []
def save_json(filepath: Path, data: List[Dict]):
"""Speichert Daten in JSON-Datei."""
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_contacts() -> List[Dict]:
return load_json(CONTACTS_FILE)
def save_contacts(contacts: List[Dict]):
save_json(CONTACTS_FILE, contacts)
def get_conversations() -> List[Dict]:
return load_json(CONVERSATIONS_FILE)
def save_conversations(conversations: List[Dict]):
save_json(CONVERSATIONS_FILE, conversations)
def get_messages() -> List[Dict]:
return load_json(MESSAGES_FILE)
def save_messages(messages: List[Dict]):
save_json(MESSAGES_FILE, messages)
def get_groups() -> List[Dict]:
return load_json(GROUPS_FILE)
def save_groups(groups: List[Dict]):
save_json(GROUPS_FILE, groups)
# ==========================================
# CONTACTS ENDPOINTS
# ==========================================
@router.get("/contacts", response_model=List[Contact])
async def list_contacts(
role: Optional[str] = Query(None, description="Filter by role"),
class_name: Optional[str] = Query(None, description="Filter by class"),
search: Optional[str] = Query(None, description="Search in name/email")
):
"""Listet alle Kontakte auf."""
contacts = get_contacts()
# Filter anwenden
if role:
contacts = [c for c in contacts if c.get("role") == role]
if class_name:
contacts = [c for c in contacts if c.get("class_name") == class_name]
if search:
search_lower = search.lower()
contacts = [c for c in contacts if
search_lower in c.get("name", "").lower() or
search_lower in (c.get("email") or "").lower() or
search_lower in (c.get("student_name") or "").lower()]
return contacts
@router.post("/contacts", response_model=Contact)
async def create_contact(contact: ContactCreate):
"""Erstellt einen neuen Kontakt."""
contacts = get_contacts()
# Pruefen ob Email bereits existiert
if contact.email:
existing = [c for c in contacts if c.get("email") == contact.email]
if existing:
raise HTTPException(status_code=400, detail="Kontakt mit dieser Email existiert bereits")
now = datetime.utcnow().isoformat()
new_contact = {
"id": str(uuid.uuid4()),
"created_at": now,
"updated_at": now,
"online": False,
"last_seen": None,
**contact.dict()
}
contacts.append(new_contact)
save_contacts(contacts)
return new_contact
@router.get("/contacts/{contact_id}", response_model=Contact)
async def get_contact(contact_id: str):
"""Ruft einen einzelnen Kontakt ab."""
contacts = get_contacts()
contact = next((c for c in contacts if c["id"] == contact_id), None)
if not contact:
raise HTTPException(status_code=404, detail="Kontakt nicht gefunden")
return contact
@router.put("/contacts/{contact_id}", response_model=Contact)
async def update_contact(contact_id: str, update: ContactUpdate):
"""Aktualisiert einen Kontakt."""
contacts = get_contacts()
contact_idx = next((i for i, c in enumerate(contacts) if c["id"] == contact_id), None)
if contact_idx is None:
raise HTTPException(status_code=404, detail="Kontakt nicht gefunden")
update_data = update.dict(exclude_unset=True)
contacts[contact_idx].update(update_data)
contacts[contact_idx]["updated_at"] = datetime.utcnow().isoformat()
save_contacts(contacts)
return contacts[contact_idx]
@router.delete("/contacts/{contact_id}")
async def delete_contact(contact_id: str):
"""Loescht einen Kontakt."""
contacts = get_contacts()
contacts = [c for c in contacts if c["id"] != contact_id]
save_contacts(contacts)
return {"status": "deleted", "id": contact_id}
@router.post("/contacts/import", response_model=CSVImportResult)
async def import_contacts_csv(file: UploadFile = File(...)):
"""
Importiert Kontakte aus einer CSV-Datei.
Erwartete Spalten:
- name (required)
- email
- phone
- role (parent/teacher/staff/student)
- student_name
- class_name
- notes
- tags (komma-separiert)
"""
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="Nur CSV-Dateien werden unterstuetzt")
content = await file.read()
try:
text = content.decode('utf-8')
except UnicodeDecodeError:
text = content.decode('latin-1')
contacts = get_contacts()
existing_emails = {c.get("email") for c in contacts if c.get("email")}
imported = []
skipped = 0
errors = []
reader = csv.DictReader(StringIO(text), delimiter=';') # Deutsche CSV meist mit Semikolon
if not reader.fieldnames or 'name' not in [f.lower() for f in reader.fieldnames]:
# Versuche mit Komma
reader = csv.DictReader(StringIO(text), delimiter=',')
for row_num, row in enumerate(reader, start=2):
try:
# Normalisiere Spaltennamen
row = {k.lower().strip(): v.strip() if v else "" for k, v in row.items()}
name = row.get('name') or row.get('kontakt') or row.get('elternname')
if not name:
errors.append(f"Zeile {row_num}: Name fehlt")
skipped += 1
continue
email = row.get('email') or row.get('e-mail') or row.get('mail')
if email and email in existing_emails:
errors.append(f"Zeile {row_num}: Email {email} existiert bereits")
skipped += 1
continue
now = datetime.utcnow().isoformat()
tags_str = row.get('tags') or row.get('kategorien') or ""
tags = [t.strip() for t in tags_str.split(',') if t.strip()]
# Matrix-ID und preferred_channel auslesen
matrix_id = row.get('matrix_id') or row.get('matrix') or None
preferred_channel = row.get('preferred_channel') or row.get('kanal') or "email"
if preferred_channel not in ["email", "matrix", "pwa"]:
preferred_channel = "email"
new_contact = {
"id": str(uuid.uuid4()),
"name": name,
"email": email if email else None,
"phone": row.get('phone') or row.get('telefon') or row.get('tel'),
"role": row.get('role') or row.get('rolle') or "parent",
"student_name": row.get('student_name') or row.get('schueler') or row.get('kind'),
"class_name": row.get('class_name') or row.get('klasse'),
"notes": row.get('notes') or row.get('notizen') or row.get('bemerkungen'),
"tags": tags,
"matrix_id": matrix_id if matrix_id else None,
"preferred_channel": preferred_channel,
"created_at": now,
"updated_at": now,
"online": False,
"last_seen": None
}
contacts.append(new_contact)
imported.append(new_contact)
if email:
existing_emails.add(email)
except Exception as e:
errors.append(f"Zeile {row_num}: {str(e)}")
skipped += 1
save_contacts(contacts)
return CSVImportResult(
imported=len(imported),
skipped=skipped,
errors=errors[:20], # Maximal 20 Fehler zurueckgeben
contacts=imported
)
@router.get("/contacts/export/csv")
async def export_contacts_csv():
"""Exportiert alle Kontakte als CSV."""
from fastapi.responses import StreamingResponse
contacts = get_contacts()
output = StringIO()
fieldnames = ['name', 'email', 'phone', 'role', 'student_name', 'class_name', 'notes', 'tags', 'matrix_id', 'preferred_channel']
writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=';')
writer.writeheader()
for contact in contacts:
writer.writerow({
'name': contact.get('name', ''),
'email': contact.get('email', ''),
'phone': contact.get('phone', ''),
'role': contact.get('role', ''),
'student_name': contact.get('student_name', ''),
'class_name': contact.get('class_name', ''),
'notes': contact.get('notes', ''),
'tags': ','.join(contact.get('tags', [])),
'matrix_id': contact.get('matrix_id', ''),
'preferred_channel': contact.get('preferred_channel', 'email')
})
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=kontakte.csv"}
)
# ==========================================
# GROUPS ENDPOINTS
# ==========================================
@router.get("/groups", response_model=List[Group])
async def list_groups():
"""Listet alle Gruppen auf."""
return get_groups()
@router.post("/groups", response_model=Group)
async def create_group(group: GroupCreate):
"""Erstellt eine neue Gruppe."""
groups = get_groups()
now = datetime.utcnow().isoformat()
new_group = {
"id": str(uuid.uuid4()),
"created_at": now,
"updated_at": now,
**group.dict()
}
groups.append(new_group)
save_groups(groups)
return new_group
@router.put("/groups/{group_id}/members")
async def update_group_members(group_id: str, member_ids: List[str]):
"""Aktualisiert die Mitglieder einer Gruppe."""
groups = get_groups()
group_idx = next((i for i, g in enumerate(groups) if g["id"] == group_id), None)
if group_idx is None:
raise HTTPException(status_code=404, detail="Gruppe nicht gefunden")
groups[group_idx]["member_ids"] = member_ids
groups[group_idx]["updated_at"] = datetime.utcnow().isoformat()
save_groups(groups)
return groups[group_idx]
@router.delete("/groups/{group_id}")
async def delete_group(group_id: str):
"""Loescht eine Gruppe."""
groups = get_groups()
groups = [g for g in groups if g["id"] != group_id]
save_groups(groups)
return {"status": "deleted", "id": group_id}
# ==========================================
# CONVERSATIONS ENDPOINTS
# ==========================================
@router.get("/conversations", response_model=List[Conversation])
async def list_conversations():
"""Listet alle Konversationen auf."""
conversations = get_conversations()
messages = get_messages()
# Unread count und letzte Nachricht hinzufuegen
for conv in conversations:
conv_messages = [m for m in messages if m.get("conversation_id") == conv["id"]]
conv["unread_count"] = len([m for m in conv_messages if not m.get("read") and m.get("sender_id") != "self"])
if conv_messages:
last_msg = max(conv_messages, key=lambda m: m.get("timestamp", ""))
conv["last_message"] = last_msg.get("content", "")[:50]
conv["last_message_time"] = last_msg.get("timestamp")
# Nach letzter Nachricht sortieren
conversations.sort(key=lambda c: c.get("last_message_time") or "", reverse=True)
return conversations
@router.post("/conversations", response_model=Conversation)
async def create_conversation(contact_id: Optional[str] = None, group_id: Optional[str] = None):
"""
Erstellt eine neue Konversation.
Entweder mit einem Kontakt (1:1) oder einer Gruppe.
"""
conversations = get_conversations()
if not contact_id and not group_id:
raise HTTPException(status_code=400, detail="Entweder contact_id oder group_id erforderlich")
# Pruefen ob Konversation bereits existiert
if contact_id:
existing = next((c for c in conversations
if not c.get("is_group") and contact_id in c.get("participant_ids", [])), None)
if existing:
return existing
now = datetime.utcnow().isoformat()
if group_id:
groups = get_groups()
group = next((g for g in groups if g["id"] == group_id), None)
if not group:
raise HTTPException(status_code=404, detail="Gruppe nicht gefunden")
new_conv = {
"id": str(uuid.uuid4()),
"name": group.get("name"),
"is_group": True,
"participant_ids": group.get("member_ids", []),
"group_id": group_id,
"created_at": now,
"updated_at": now,
"last_message": None,
"last_message_time": None,
"unread_count": 0
}
else:
contacts = get_contacts()
contact = next((c for c in contacts if c["id"] == contact_id), None)
if not contact:
raise HTTPException(status_code=404, detail="Kontakt nicht gefunden")
new_conv = {
"id": str(uuid.uuid4()),
"name": contact.get("name"),
"is_group": False,
"participant_ids": [contact_id],
"group_id": None,
"created_at": now,
"updated_at": now,
"last_message": None,
"last_message_time": None,
"unread_count": 0
}
conversations.append(new_conv)
save_conversations(conversations)
return new_conv
@router.get("/conversations/{conversation_id}", response_model=Conversation)
async def get_conversation(conversation_id: str):
"""Ruft eine Konversation ab."""
conversations = get_conversations()
conv = next((c for c in conversations if c["id"] == conversation_id), None)
if not conv:
raise HTTPException(status_code=404, detail="Konversation nicht gefunden")
return conv
@router.delete("/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""Loescht eine Konversation und alle zugehoerigen Nachrichten."""
conversations = get_conversations()
conversations = [c for c in conversations if c["id"] != conversation_id]
save_conversations(conversations)
messages = get_messages()
messages = [m for m in messages if m.get("conversation_id") != conversation_id]
save_messages(messages)
return {"status": "deleted", "id": conversation_id}
# ==========================================
# MESSAGES ENDPOINTS
# ==========================================
@router.get("/conversations/{conversation_id}/messages", response_model=List[Message])
async def list_messages(
conversation_id: str,
limit: int = Query(50, ge=1, le=200),
before: Optional[str] = Query(None, description="Load messages before this timestamp")
):
"""Ruft Nachrichten einer Konversation ab."""
messages = get_messages()
conv_messages = [m for m in messages if m.get("conversation_id") == conversation_id]
if before:
conv_messages = [m for m in conv_messages if m.get("timestamp", "") < before]
# Nach Zeit sortieren (neueste zuletzt)
conv_messages.sort(key=lambda m: m.get("timestamp", ""))
return conv_messages[-limit:]
@router.post("/conversations/{conversation_id}/messages", response_model=Message)
async def send_message(conversation_id: str, message: MessageBase):
"""
Sendet eine Nachricht in einer Konversation.
Wenn send_email=True und der Kontakt eine Email-Adresse hat,
wird die Nachricht auch per Email versendet.
"""
conversations = get_conversations()
conv = next((c for c in conversations if c["id"] == conversation_id), None)
if not conv:
raise HTTPException(status_code=404, detail="Konversation nicht gefunden")
now = datetime.utcnow().isoformat()
new_message = {
"id": str(uuid.uuid4()),
"conversation_id": conversation_id,
"sender_id": "self",
"timestamp": now,
"read": True,
"read_at": now,
"email_sent": False,
"email_sent_at": None,
"email_error": None,
**message.dict()
}
# Email-Versand wenn gewuenscht
if message.send_email and not conv.get("is_group"):
# Kontakt laden
participant_ids = conv.get("participant_ids", [])
if participant_ids:
contacts = get_contacts()
contact = next((c for c in contacts if c["id"] == participant_ids[0]), None)
if contact and contact.get("email"):
try:
from email_service import email_service
result = email_service.send_messenger_notification(
to_email=contact["email"],
to_name=contact.get("name", ""),
sender_name="BreakPilot Lehrer", # TODO: Aktuellen User-Namen verwenden
message_content=message.content
)
if result.success:
new_message["email_sent"] = True
new_message["email_sent_at"] = result.sent_at
else:
new_message["email_error"] = result.error
except Exception as e:
new_message["email_error"] = str(e)
messages = get_messages()
messages.append(new_message)
save_messages(messages)
# Konversation aktualisieren
conv_idx = next(i for i, c in enumerate(conversations) if c["id"] == conversation_id)
conversations[conv_idx]["last_message"] = message.content[:50]
conversations[conv_idx]["last_message_time"] = now
conversations[conv_idx]["updated_at"] = now
save_conversations(conversations)
return new_message
@router.put("/messages/{message_id}/read")
async def mark_message_read(message_id: str):
"""Markiert eine Nachricht als gelesen."""
messages = get_messages()
msg_idx = next((i for i, m in enumerate(messages) if m["id"] == message_id), None)
if msg_idx is None:
raise HTTPException(status_code=404, detail="Nachricht nicht gefunden")
messages[msg_idx]["read"] = True
messages[msg_idx]["read_at"] = datetime.utcnow().isoformat()
save_messages(messages)
return {"status": "read", "id": message_id}
@router.put("/conversations/{conversation_id}/read-all")
async def mark_all_messages_read(conversation_id: str):
"""Markiert alle Nachrichten einer Konversation als gelesen."""
messages = get_messages()
now = datetime.utcnow().isoformat()
for msg in messages:
if msg.get("conversation_id") == conversation_id and not msg.get("read"):
msg["read"] = True
msg["read_at"] = now
save_messages(messages)
return {"status": "all_read", "conversation_id": conversation_id}
# ==========================================
# TEMPLATES ENDPOINTS
# ==========================================
DEFAULT_TEMPLATES = [
{
"id": "1",
"name": "Terminbestaetigung",
"content": "Vielen Dank fuer Ihre Terminanfrage. Ich bestaetige den Termin am [DATUM] um [UHRZEIT]. Bitte geben Sie mir Bescheid, falls sich etwas aendern sollte.",
"category": "termin"
},
{
"id": "2",
"name": "Hausaufgaben-Info",
"content": "Zur Information: Die Hausaufgaben fuer diese Woche umfassen [THEMA]. Abgabetermin ist [DATUM]. Bei Fragen stehe ich gerne zur Verfuegung.",
"category": "hausaufgaben"
},
{
"id": "3",
"name": "Entschuldigung bestaetigen",
"content": "Ich bestaetige den Erhalt der Entschuldigung fuer [NAME] am [DATUM]. Die Fehlzeiten wurden entsprechend vermerkt.",
"category": "entschuldigung"
},
{
"id": "4",
"name": "Gespraechsanfrage",
"content": "Ich wuerde gerne einen Termin fuer ein Gespraech mit Ihnen vereinbaren, um [THEMA] zu besprechen. Waeren Sie am [DATUM] um [UHRZEIT] verfuegbar?",
"category": "gespraech"
},
{
"id": "5",
"name": "Krankmeldung bestaetigen",
"content": "Vielen Dank fuer Ihre Krankmeldung fuer [NAME]. Ich wuensche gute Besserung. Bitte reichen Sie eine schriftliche Entschuldigung nach, sobald Ihr Kind wieder gesund ist.",
"category": "krankmeldung"
}
]
@router.get("/templates")
async def list_templates():
"""Listet alle Nachrichtenvorlagen auf."""
templates_file = DATA_DIR / "templates.json"
if templates_file.exists():
templates = load_json(templates_file)
else:
templates = DEFAULT_TEMPLATES
save_json(templates_file, templates)
return templates
@router.post("/templates")
async def create_template(name: str, content: str, category: str = "custom"):
"""Erstellt eine neue Vorlage."""
templates_file = DATA_DIR / "templates.json"
templates = load_json(templates_file) if templates_file.exists() else DEFAULT_TEMPLATES.copy()
new_template = {
"id": str(uuid.uuid4()),
"name": name,
"content": content,
"category": category
}
templates.append(new_template)
save_json(templates_file, templates)
return new_template
@router.delete("/templates/{template_id}")
async def delete_template(template_id: str):
"""Loescht eine Vorlage."""
templates_file = DATA_DIR / "templates.json"
templates = load_json(templates_file) if templates_file.exists() else DEFAULT_TEMPLATES.copy()
templates = [t for t in templates if t["id"] != template_id]
save_json(templates_file, templates)
return {"status": "deleted", "id": template_id}
# ==========================================
# STATS ENDPOINT
# ==========================================
@router.get("/stats")
async def get_messenger_stats():
"""Gibt Statistiken zum Messenger zurueck."""
contacts = get_contacts()
conversations = get_conversations()
messages = get_messages()
groups = get_groups()
unread_total = sum(1 for m in messages if not m.get("read") and m.get("sender_id") != "self")
return {
"total_contacts": len(contacts),
"total_groups": len(groups),
"total_conversations": len(conversations),
"total_messages": len(messages),
"unread_messages": unread_total,
"contacts_by_role": {
role: len([c for c in contacts if c.get("role") == role])
for role in set(c.get("role", "parent") for c in contacts)
}
}