""" Tests fuer die Messenger API. Testet: - Kontakt CRUD Operationen - Konversations-Management - Nachrichten-Versand - CSV Import/Export - Vorlagen-Verwaltung - Gruppen-Verwaltung - Statistiken Hinweis: Diese Tests nutzen den Docker-Container. Starten Sie den Backend-Container vor dem Testen: docker compose up -d backend """ import pytest import requests import uuid from typing import Optional # Backend URL (Docker) BASE_URL = "http://localhost:8000/api/messenger" def skip_if_backend_unavailable(): """Skip tests if backend is not running.""" try: requests.get(f"{BASE_URL}/contacts", timeout=2) except requests.exceptions.RequestException: pytest.skip("Backend not available - start with 'docker compose up -d backend'") def generate_unique_email(): """Generiert eine eindeutige Email-Adresse fuer Tests.""" return f"test_{uuid.uuid4().hex[:8]}@example.com" class TestMessengerContacts: """Tests fuer Kontakt-Endpoints.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_get_contacts(self): """Test: Kontaktliste abrufen.""" response = requests.get(f"{BASE_URL}/contacts") assert response.status_code == 200 assert isinstance(response.json(), list) def test_create_and_delete_contact(self): """Test: Kontakt erstellen und loeschen.""" contact_data = { "name": "Test Familie Mueller", "email": generate_unique_email(), "phone": "+49 123 456789", "student_name": "Max Mueller", "class_name": "10a" } # Erstellen response = requests.post(f"{BASE_URL}/contacts", json=contact_data) assert response.status_code == 200 data = response.json() assert data["name"] == "Test Familie Mueller" assert "id" in data assert "created_at" in data assert "updated_at" in data # Loeschen contact_id = data["id"] delete_response = requests.delete(f"{BASE_URL}/contacts/{contact_id}") assert delete_response.status_code == 200 assert delete_response.json()["status"] == "deleted" def test_update_contact(self): """Test: Kontakt aktualisieren.""" # Erstellen contact_data = {"name": "Original Name", "email": generate_unique_email()} create_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = create_response.json()["id"] # Aktualisieren update_data = {"name": "Aktualisierter Name", "email": generate_unique_email()} response = requests.put(f"{BASE_URL}/contacts/{contact_id}", json=update_data) assert response.status_code == 200 assert response.json()["name"] == "Aktualisierter Name" # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_get_single_contact(self): """Test: Einzelnen Kontakt abrufen.""" # Erstellen contact_data = {"name": "Single Test", "email": generate_unique_email()} create_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = create_response.json()["id"] # Abrufen response = requests.get(f"{BASE_URL}/contacts/{contact_id}") assert response.status_code == 200 assert response.json()["name"] == "Single Test" # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_get_nonexistent_contact(self): """Test: Nicht existierenden Kontakt abrufen gibt 404.""" response = requests.get(f"{BASE_URL}/contacts/nonexistent-uuid") assert response.status_code == 404 def test_filter_contacts_by_role(self): """Test: Kontakte nach Rolle filtern.""" # Kontakt mit Rolle erstellen email = generate_unique_email() contact_data = {"name": "Teacher Test", "email": email, "role": "teacher"} create_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = create_response.json()["id"] # Nach Rolle filtern response = requests.get(f"{BASE_URL}/contacts", params={"role": "teacher"}) assert response.status_code == 200 contacts = response.json() assert all(c.get("role") == "teacher" for c in contacts) # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_filter_contacts_by_class(self): """Test: Kontakte nach Klasse filtern.""" email = generate_unique_email() contact_data = {"name": "Class Test", "email": email, "class_name": "12b"} create_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = create_response.json()["id"] # Nach Klasse filtern response = requests.get(f"{BASE_URL}/contacts", params={"class_name": "12b"}) assert response.status_code == 200 contacts = response.json() assert all(c.get("class_name") == "12b" for c in contacts) # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_search_contacts(self): """Test: Kontakte durchsuchen.""" unique_name = f"Suchtest_{uuid.uuid4().hex[:8]}" email = generate_unique_email() contact_data = {"name": unique_name, "email": email} create_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = create_response.json()["id"] # Suchen response = requests.get(f"{BASE_URL}/contacts", params={"search": unique_name}) assert response.status_code == 200 contacts = response.json() assert len(contacts) >= 1 assert any(c["name"] == unique_name for c in contacts) # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_create_contact_with_tags(self): """Test: Kontakt mit Tags erstellen.""" contact_data = { "name": "Tags Test", "email": generate_unique_email(), "tags": ["Elternbeirat", "Foerderverein"] } response = requests.post(f"{BASE_URL}/contacts", json=contact_data) assert response.status_code == 200 assert "Elternbeirat" in response.json()["tags"] # Cleanup requests.delete(f"{BASE_URL}/contacts/{response.json()['id']}") def test_duplicate_email_rejected(self): """Test: Doppelte Email-Adresse wird abgelehnt.""" email = generate_unique_email() contact_data = {"name": "First Contact", "email": email} response1 = requests.post(f"{BASE_URL}/contacts", json=contact_data) assert response1.status_code == 200 contact_id = response1.json()["id"] # Zweiter Kontakt mit gleicher Email contact_data2 = {"name": "Second Contact", "email": email} response2 = requests.post(f"{BASE_URL}/contacts", json=contact_data2) assert response2.status_code == 400 # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") class TestMessengerConversations: """Tests fuer Konversations-Endpoints.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_get_conversations(self): """Test: Konversationsliste abrufen.""" response = requests.get(f"{BASE_URL}/conversations") assert response.status_code == 200 assert isinstance(response.json(), list) def test_create_conversation_with_contact(self): """Test: Konversation mit Kontakt erstellen.""" # Kontakt erstellen contact_data = {"name": "Conv Test Contact", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] # Konversation erstellen response = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) assert response.status_code == 200 conv = response.json() assert conv["name"] == "Conv Test Contact" assert contact_id in conv["participant_ids"] # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv['id']}") requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_create_conversation_returns_existing(self): """Test: Erneutes Erstellen gibt bestehende Konversation zurueck.""" # Kontakt erstellen contact_data = {"name": "Existing Conv Test", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] # Erste Konversation response1 = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id1 = response1.json()["id"] # Zweite Anfrage sollte gleiche Konversation zurueckgeben response2 = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id2 = response2.json()["id"] assert conv_id1 == conv_id2 # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv_id1}") requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_create_conversation_without_params_fails(self): """Test: Konversation ohne Parameter gibt Fehler.""" response = requests.post(f"{BASE_URL}/conversations") assert response.status_code == 400 def test_delete_conversation(self): """Test: Konversation loeschen.""" # Kontakt und Konversation erstellen contact_data = {"name": "Delete Conv Test", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] conv_response = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id = conv_response.json()["id"] # Loeschen delete_response = requests.delete(f"{BASE_URL}/conversations/{conv_id}") assert delete_response.status_code == 200 # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") class TestMessengerMessages: """Tests fuer Nachrichten-Endpoints.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_send_and_get_messages(self): """Test: Nachricht senden und abrufen.""" # Kontakt und Konversation erstellen contact_data = {"name": "Message Test", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] conv_response = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id = conv_response.json()["id"] # Nachricht senden msg_data = {"content": "Test Nachricht"} send_response = requests.post(f"{BASE_URL}/conversations/{conv_id}/messages", json=msg_data) assert send_response.status_code == 200 msg = send_response.json() assert msg["content"] == "Test Nachricht" assert msg["sender_id"] == "self" # Nachrichten abrufen get_response = requests.get(f"{BASE_URL}/conversations/{conv_id}/messages") assert get_response.status_code == 200 messages = get_response.json() assert len(messages) >= 1 # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv_id}") requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_message_updates_conversation(self): """Test: Nachricht aktualisiert Konversation.""" # Setup contact_data = {"name": "Update Conv Test", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] conv_response = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id = conv_response.json()["id"] # Nachricht senden msg_data = {"content": "Aktualisiert letzte Nachricht"} requests.post(f"{BASE_URL}/conversations/{conv_id}/messages", json=msg_data) # Konversation pruefen conv = requests.get(f"{BASE_URL}/conversations/{conv_id}").json() assert "Aktualisiert" in conv.get("last_message", "") # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv_id}") requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_mark_message_as_read(self): """Test: Nachricht als gelesen markieren.""" # Setup contact_data = {"name": "Read Test", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] conv_response = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id = conv_response.json()["id"] msg_response = requests.post( f"{BASE_URL}/conversations/{conv_id}/messages", json={"content": "Read me"} ) msg_id = msg_response.json()["id"] # Als gelesen markieren read_response = requests.put(f"{BASE_URL}/messages/{msg_id}/read") assert read_response.status_code == 200 assert read_response.json()["status"] == "read" # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv_id}") requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_mark_all_messages_read(self): """Test: Alle Nachrichten einer Konversation als gelesen markieren.""" # Setup contact_data = {"name": "Read All Test", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] conv_response = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id = conv_response.json()["id"] # Mehrere Nachrichten senden requests.post(f"{BASE_URL}/conversations/{conv_id}/messages", json={"content": "Msg 1"}) requests.post(f"{BASE_URL}/conversations/{conv_id}/messages", json={"content": "Msg 2"}) # Alle als gelesen markieren response = requests.put(f"{BASE_URL}/conversations/{conv_id}/read-all") assert response.status_code == 200 assert response.json()["status"] == "all_read" # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv_id}") requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_messages_pagination(self): """Test: Nachrichten mit Limit abrufen.""" # Setup contact_data = {"name": "Pagination Test", "email": generate_unique_email()} contact_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = contact_response.json()["id"] conv_response = requests.post(f"{BASE_URL}/conversations", params={"contact_id": contact_id}) conv_id = conv_response.json()["id"] # Mehrere Nachrichten senden for i in range(5): requests.post(f"{BASE_URL}/conversations/{conv_id}/messages", json={"content": f"Msg {i}"}) # Mit Limit abrufen response = requests.get(f"{BASE_URL}/conversations/{conv_id}/messages", params={"limit": 3}) assert response.status_code == 200 assert len(response.json()) <= 3 # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv_id}") requests.delete(f"{BASE_URL}/contacts/{contact_id}") class TestMessengerTemplates: """Tests fuer Vorlagen-Endpoints.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_get_templates(self): """Test: Vorlagenliste abrufen.""" response = requests.get(f"{BASE_URL}/templates") assert response.status_code == 200 templates = response.json() assert isinstance(templates, list) # Standard-Vorlagen sollten vorhanden sein assert len(templates) >= 1 def test_create_and_delete_template(self): """Test: Vorlage erstellen und loeschen.""" # Erstellen (Query-Parameter) response = requests.post( f"{BASE_URL}/templates", params={ "name": "Test Vorlage", "content": "Test mit [PLATZHALTER]", "category": "test" } ) assert response.status_code == 200 template = response.json() assert template["name"] == "Test Vorlage" assert "[PLATZHALTER]" in template["content"] # Loeschen delete_response = requests.delete(f"{BASE_URL}/templates/{template['id']}") assert delete_response.status_code == 200 def test_default_templates_exist(self): """Test: Standard-Vorlagen sind vorhanden.""" response = requests.get(f"{BASE_URL}/templates") templates = response.json() # Mindestens eine Vorlage sollte existieren assert len(templates) >= 1 # Jede Vorlage hat die erforderlichen Felder for template in templates: assert "id" in template assert "name" in template assert "content" in template assert "category" in template class TestMessengerGroups: """Tests fuer Gruppen-Endpoints.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_get_groups(self): """Test: Gruppenliste abrufen.""" response = requests.get(f"{BASE_URL}/groups") assert response.status_code == 200 assert isinstance(response.json(), list) def test_create_and_delete_group(self): """Test: Gruppe erstellen und loeschen.""" group_data = { "name": f"Test Klasse {uuid.uuid4().hex[:4]}", "description": "Test Elterngruppe", "group_type": "class", "member_ids": [] } # Erstellen response = requests.post(f"{BASE_URL}/groups", json=group_data) assert response.status_code == 200 group = response.json() assert "Test Klasse" in group["name"] assert group["group_type"] == "class" # Loeschen delete_response = requests.delete(f"{BASE_URL}/groups/{group['id']}") assert delete_response.status_code == 200 def test_update_group_members(self): """Test: Gruppen-Mitglieder aktualisieren.""" # Kontakte erstellen contact1 = requests.post( f"{BASE_URL}/contacts", json={"name": "Member 1", "email": generate_unique_email()} ).json() contact2 = requests.post( f"{BASE_URL}/contacts", json={"name": "Member 2", "email": generate_unique_email()} ).json() # Gruppe erstellen group = requests.post( f"{BASE_URL}/groups", json={"name": "Members Test", "description": "Test", "member_ids": []} ).json() # Mitglieder hinzufuegen member_ids = [contact1["id"], contact2["id"]] response = requests.put( f"{BASE_URL}/groups/{group['id']}/members", json=member_ids ) assert response.status_code == 200 assert contact1["id"] in response.json()["member_ids"] assert contact2["id"] in response.json()["member_ids"] # Cleanup requests.delete(f"{BASE_URL}/groups/{group['id']}") requests.delete(f"{BASE_URL}/contacts/{contact1['id']}") requests.delete(f"{BASE_URL}/contacts/{contact2['id']}") def test_create_group_conversation(self): """Test: Gruppenkonversation erstellen.""" # Gruppe erstellen group = requests.post( f"{BASE_URL}/groups", json={"name": "Group Conv Test", "description": "Test"} ).json() # Konversation mit Gruppe erstellen response = requests.post( f"{BASE_URL}/conversations", params={"group_id": group["id"]} ) assert response.status_code == 200 conv = response.json() assert conv["is_group"] == True assert conv["group_id"] == group["id"] # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv['id']}") requests.delete(f"{BASE_URL}/groups/{group['id']}") class TestMessengerCSV: """Tests fuer CSV Import/Export.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_export_csv(self): """Test: Kontakte als CSV exportieren.""" response = requests.get(f"{BASE_URL}/contacts/export/csv") assert response.status_code == 200 assert "text/csv" in response.headers.get("content-type", "") # CSV-Inhalt pruefen content = response.text assert "name" in content.lower() assert "email" in content.lower() def test_import_csv_semicolon(self): """Test: Kontakte aus CSV mit Semikolon-Trennung importieren.""" unique_email = generate_unique_email() csv_content = f"Name;Email;Telefon;Schueler;Klasse\nCSV Test Import;{unique_email};0123;CSV Schueler;5b" files = {"file": ("test.csv", csv_content, "text/csv")} response = requests.post(f"{BASE_URL}/contacts/import", files=files) assert response.status_code == 200 result = response.json() assert "imported" in result assert result["imported"] >= 1 # Cleanup - importierten Kontakt loeschen if result.get("contacts"): for contact in result["contacts"]: requests.delete(f"{BASE_URL}/contacts/{contact['id']}") def test_import_csv_german_columns(self): """Test: CSV mit deutschen Spaltennamen importieren.""" unique_email = generate_unique_email() csv_content = f"Name;E-Mail;Telefon;Kind;Klasse;Notizen\nDeutsche Spalten;{unique_email};0123;Kind Name;7c;Test Notizen" files = {"file": ("german.csv", csv_content, "text/csv")} response = requests.post(f"{BASE_URL}/contacts/import", files=files) assert response.status_code == 200 result = response.json() assert result["imported"] >= 1 # Cleanup if result.get("contacts"): for contact in result["contacts"]: requests.delete(f"{BASE_URL}/contacts/{contact['id']}") def test_import_csv_skip_duplicates(self): """Test: CSV Import ueberspringt doppelte Emails.""" # Ersten Kontakt erstellen email = generate_unique_email() requests.post(f"{BASE_URL}/contacts", json={"name": "Original", "email": email}) # CSV mit gleicher Email importieren csv_content = f"Name;Email\nDuplikat;{email}" files = {"file": ("dup.csv", csv_content, "text/csv")} response = requests.post(f"{BASE_URL}/contacts/import", files=files) assert response.status_code == 200 result = response.json() assert result["skipped"] >= 1 assert any("existiert bereits" in err for err in result.get("errors", [])) def test_import_csv_missing_name(self): """Test: CSV Import meldet fehlende Namen.""" csv_content = "Name;Email\n;noname@test.com" files = {"file": ("noname.csv", csv_content, "text/csv")} response = requests.post(f"{BASE_URL}/contacts/import", files=files) assert response.status_code == 200 result = response.json() assert result["skipped"] >= 1 assert any("Name fehlt" in err for err in result.get("errors", [])) def test_import_non_csv_rejected(self): """Test: Nicht-CSV Dateien werden abgelehnt.""" files = {"file": ("test.txt", "not a csv", "text/plain")} response = requests.post(f"{BASE_URL}/contacts/import", files=files) assert response.status_code == 400 class TestMessengerStats: """Tests fuer Statistik-Endpoint.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_get_stats(self): """Test: Statistiken abrufen.""" response = requests.get(f"{BASE_URL}/stats") assert response.status_code == 200 stats = response.json() assert "total_contacts" in stats assert "total_groups" in stats assert "total_conversations" in stats assert "total_messages" in stats assert "unread_messages" in stats assert "contacts_by_role" in stats def test_stats_reflect_data(self): """Test: Statistiken spiegeln Daten wider.""" # Initiale Stats initial_stats = requests.get(f"{BASE_URL}/stats").json() initial_contacts = initial_stats["total_contacts"] # Kontakt hinzufuegen contact = requests.post( f"{BASE_URL}/contacts", json={"name": "Stats Test", "email": generate_unique_email()} ).json() # Neue Stats new_stats = requests.get(f"{BASE_URL}/stats").json() assert new_stats["total_contacts"] == initial_contacts + 1 # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact['id']}") class TestMessengerEdgeCases: """Tests fuer Randfaelle und Fehlerbehandlung.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_update_nonexistent_contact(self): """Test: Update nicht existierendem Kontakt gibt 404.""" response = requests.put( f"{BASE_URL}/contacts/nonexistent-uuid", json={"name": "Test"} ) assert response.status_code == 404 def test_message_to_nonexistent_conversation(self): """Test: Nachricht an nicht existierende Konversation gibt 404.""" response = requests.post( f"{BASE_URL}/conversations/nonexistent-uuid/messages", json={"content": "Test"} ) assert response.status_code == 404 def test_create_conversation_nonexistent_contact(self): """Test: Konversation mit nicht existierendem Kontakt gibt 404.""" response = requests.post( f"{BASE_URL}/conversations", params={"contact_id": "nonexistent-uuid"} ) assert response.status_code == 404 def test_create_conversation_nonexistent_group(self): """Test: Konversation mit nicht existierender Gruppe gibt 404.""" response = requests.post( f"{BASE_URL}/conversations", params={"group_id": "nonexistent-uuid"} ) assert response.status_code == 404 def test_mark_nonexistent_message_read(self): """Test: Nicht existierende Nachricht als gelesen markieren gibt 404.""" response = requests.put(f"{BASE_URL}/messages/nonexistent-uuid/read") assert response.status_code == 404 def test_contact_without_email(self): """Test: Kontakt ohne Email erstellen ist erlaubt.""" contact_data = {"name": "Nur Name"} response = requests.post(f"{BASE_URL}/contacts", json=contact_data) assert response.status_code == 200 assert response.json()["email"] is None # Cleanup requests.delete(f"{BASE_URL}/contacts/{response.json()['id']}") def test_empty_message_rejected(self): """Test: Leere Nachricht wird abgelehnt.""" # Setup contact = requests.post( f"{BASE_URL}/contacts", json={"name": "Empty Msg Test", "email": generate_unique_email()} ).json() conv = requests.post( f"{BASE_URL}/conversations", params={"contact_id": contact["id"]} ).json() # Leere Nachricht sollte Fehler geben response = requests.post( f"{BASE_URL}/conversations/{conv['id']}/messages", json={"content": ""} ) assert response.status_code == 422 # Validation error # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv['id']}") requests.delete(f"{BASE_URL}/contacts/{contact['id']}") class TestMessengerMatrixFeatures: """Tests fuer Matrix-ID und bevorzugten Kanal.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_create_contact_with_matrix_id(self): """Test: Kontakt mit Matrix-ID erstellen.""" contact_data = { "name": "Matrix User", "email": generate_unique_email(), "matrix_id": "@testuser:matrix.org" } response = requests.post(f"{BASE_URL}/contacts", json=contact_data) assert response.status_code == 200 data = response.json() assert data["matrix_id"] == "@testuser:matrix.org" # Cleanup requests.delete(f"{BASE_URL}/contacts/{data['id']}") def test_create_contact_with_preferred_channel(self): """Test: Kontakt mit bevorzugtem Kanal erstellen.""" contact_data = { "name": "Channel Test", "email": generate_unique_email(), "preferred_channel": "matrix" } response = requests.post(f"{BASE_URL}/contacts", json=contact_data) assert response.status_code == 200 data = response.json() assert data["preferred_channel"] == "matrix" # Cleanup requests.delete(f"{BASE_URL}/contacts/{data['id']}") def test_update_contact_matrix_id(self): """Test: Matrix-ID eines Kontakts aktualisieren.""" # Kontakt erstellen contact_data = {"name": "Update Matrix Test", "email": generate_unique_email()} create_response = requests.post(f"{BASE_URL}/contacts", json=contact_data) contact_id = create_response.json()["id"] # Matrix-ID hinzufuegen update_data = {"matrix_id": "@updated:matrix.org"} response = requests.put(f"{BASE_URL}/contacts/{contact_id}", json=update_data) assert response.status_code == 200 assert response.json()["matrix_id"] == "@updated:matrix.org" # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact_id}") def test_default_preferred_channel_is_email(self): """Test: Standard bevorzugter Kanal ist 'email'.""" contact_data = {"name": "Default Channel Test", "email": generate_unique_email()} response = requests.post(f"{BASE_URL}/contacts", json=contact_data) assert response.status_code == 200 data = response.json() assert data["preferred_channel"] == "email" # Cleanup requests.delete(f"{BASE_URL}/contacts/{data['id']}") def test_csv_export_includes_matrix_fields(self): """Test: CSV-Export enthaelt Matrix-ID und bevorzugten Kanal.""" # Kontakt mit Matrix-Feldern erstellen contact_data = { "name": "CSV Matrix Test", "email": generate_unique_email(), "matrix_id": "@csvtest:matrix.org", "preferred_channel": "matrix" } contact = requests.post(f"{BASE_URL}/contacts", json=contact_data).json() # CSV exportieren response = requests.get(f"{BASE_URL}/contacts/export/csv") assert response.status_code == 200 csv_content = response.text.lower() assert "matrix" in csv_content # Cleanup requests.delete(f"{BASE_URL}/contacts/{contact['id']}") class TestMessengerEmailFeatures: """Tests fuer Email-Versand bei Nachrichten.""" @pytest.fixture(autouse=True) def check_backend(self): skip_if_backend_unavailable() def test_send_message_with_email(self): """Test: Nachricht mit Email-Versand senden.""" # Kontakt mit Email erstellen contact_data = { "name": "Email Test Contact", "email": generate_unique_email() } contact = requests.post(f"{BASE_URL}/contacts", json=contact_data).json() # Konversation erstellen conv = requests.post( f"{BASE_URL}/conversations", params={"contact_id": contact["id"]} ).json() # Nachricht mit send_email=true senden msg_data = { "content": "Diese Nachricht wird per Email gesendet.", "send_email": True } response = requests.post( f"{BASE_URL}/conversations/{conv['id']}/messages", json=msg_data ) assert response.status_code == 200 data = response.json() assert data["content"] == "Diese Nachricht wird per Email gesendet." # Email-Status wird zurueckgegeben assert "email_sent" in data # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv['id']}") requests.delete(f"{BASE_URL}/contacts/{contact['id']}") def test_send_message_without_email(self): """Test: Nachricht ohne Email-Versand senden.""" # Kontakt erstellen contact = requests.post( f"{BASE_URL}/contacts", json={"name": "No Email Test", "email": generate_unique_email()} ).json() # Konversation erstellen conv = requests.post( f"{BASE_URL}/conversations", params={"contact_id": contact["id"]} ).json() # Nachricht ohne send_email senden (Standard: false) msg_data = {"content": "Normale Nachricht ohne Email."} response = requests.post( f"{BASE_URL}/conversations/{conv['id']}/messages", json=msg_data ) assert response.status_code == 200 data = response.json() # Email sollte nicht gesendet worden sein assert data.get("email_sent", False) == False # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv['id']}") requests.delete(f"{BASE_URL}/contacts/{contact['id']}") def test_send_message_email_to_contact_without_email_address(self): """Test: Email-Versand an Kontakt ohne Email-Adresse.""" # Kontakt ohne Email erstellen contact = requests.post( f"{BASE_URL}/contacts", json={"name": "No Email Address"} ).json() # Konversation erstellen conv = requests.post( f"{BASE_URL}/conversations", params={"contact_id": contact["id"]} ).json() # Nachricht mit send_email=true senden msg_data = { "content": "Versuch Email zu senden ohne Adresse.", "send_email": True } response = requests.post( f"{BASE_URL}/conversations/{conv['id']}/messages", json=msg_data ) assert response.status_code == 200 # Nachricht sollte gespeichert werden, aber Email schlaegt fehl data = response.json() assert data["email_sent"] == False # Cleanup requests.delete(f"{BASE_URL}/conversations/{conv['id']}") requests.delete(f"{BASE_URL}/contacts/{contact['id']}")