""" RBAC Test API - Test Runner fuer Rollen- und Berechtigungsverwaltung Endpoint: /api/admin/rbac-tests """ from fastapi import APIRouter from pydantic import BaseModel from typing import List, Optional, Literal import httpx import asyncio import time import os router = APIRouter(prefix="/api/admin/rbac-tests", tags=["RBAC Tests"]) # ============================================== # Models # ============================================== class TestResult(BaseModel): name: str description: str expected: str actual: str status: Literal["passed", "failed", "pending", "skipped"] duration_ms: float error_message: Optional[str] = None class TestCategoryResult(BaseModel): category: str display_name: str description: str tests: List[TestResult] passed: int failed: int total: int class FullTestResults(BaseModel): categories: List[TestCategoryResult] total_passed: int total_failed: int total_tests: int duration_ms: float # ============================================== # Configuration # ============================================== BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000") CONSENT_SERVICE_URL = os.getenv("CONSENT_SERVICE_URL", "http://consent-service:8081") # ============================================== # Test Implementations - Authentication # ============================================== async def test_auth_api_health() -> TestResult: """Test Auth API Health""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/api/auth/health") duration = (time.time() - start) * 1000 if response.status_code == 200: return TestResult( name="Auth API Health", description="Prueft ob die Authentifizierungs-API erreichbar ist", expected="Auth API aktiv", actual="Auth API laeuft", status="passed", duration_ms=duration ) elif response.status_code == 404: return TestResult( name="Auth API Health", description="Prueft ob die Authentifizierungs-API erreichbar ist", expected="Auth API aktiv", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="Auth Health Endpoint fehlt" ) else: return TestResult( name="Auth API Health", description="Prueft ob die Authentifizierungs-API erreichbar ist", expected="Auth API aktiv", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Auth API nicht erreichbar" ) except Exception as e: return TestResult( name="Auth API Health", description="Prueft ob die Authentifizierungs-API erreichbar ist", expected="Auth API aktiv", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_jwt_validation() -> TestResult: """Test JWT Token Validation""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: # Test mit ungueltigem Token - sollte 401 zurueckgeben response = await client.get( f"{BACKEND_URL}/api/auth/me", headers={"Authorization": "Bearer invalid-token"} ) duration = (time.time() - start) * 1000 if response.status_code == 401: return TestResult( name="JWT Validierung", description="Prueft ob ungueltige Tokens abgelehnt werden", expected="401 fuer ungueltige Tokens", actual="401 Unauthorized - korrekt abgelehnt", status="passed", duration_ms=duration ) elif response.status_code == 404: return TestResult( name="JWT Validierung", description="Prueft ob ungueltige Tokens abgelehnt werden", expected="401 fuer ungueltige Tokens", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="/api/auth/me Endpoint fehlt" ) else: return TestResult( name="JWT Validierung", description="Prueft ob ungueltige Tokens abgelehnt werden", expected="401 fuer ungueltige Tokens", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="JWT Validierung nicht korrekt" ) except Exception as e: return TestResult( name="JWT Validierung", description="Prueft ob ungueltige Tokens abgelehnt werden", expected="401 fuer ungueltige Tokens", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_login_endpoint() -> TestResult: """Test Login Endpoint exists""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: # OPTIONS Request um zu pruefen ob Endpoint existiert response = await client.post( f"{BACKEND_URL}/api/auth/login", json={"email": "test@example.com", "password": "wrong"} ) duration = (time.time() - start) * 1000 # 401 ist erwartet (falsche Credentials), aber zeigt dass Endpoint existiert if response.status_code in [401, 400, 422]: return TestResult( name="Login Endpoint", description="Prueft ob der Login-Endpoint verfuegbar ist", expected="Login Endpoint aktiv", actual=f"Endpoint aktiv (HTTP {response.status_code})", status="passed", duration_ms=duration ) elif response.status_code == 404: return TestResult( name="Login Endpoint", description="Prueft ob der Login-Endpoint verfuegbar ist", expected="Login Endpoint aktiv", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="Login Endpoint fehlt" ) else: return TestResult( name="Login Endpoint", description="Prueft ob der Login-Endpoint verfuegbar ist", expected="Login Endpoint aktiv", actual=f"HTTP {response.status_code}", status="passed", duration_ms=duration ) except Exception as e: return TestResult( name="Login Endpoint", description="Prueft ob der Login-Endpoint verfuegbar ist", expected="Login Endpoint aktiv", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) # ============================================== # Test Implementations - Roles # ============================================== async def test_roles_api() -> TestResult: """Test Roles API""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/api/rbac/roles") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() count = len(data) if isinstance(data, list) else data.get("total", 0) return TestResult( name="Rollen-API", description="Prueft ob die Rollen-Verwaltung verfuegbar ist", expected="Rollen-API aktiv", actual=f"{count} Rollen definiert", status="passed", duration_ms=duration ) elif response.status_code == 401: return TestResult( name="Rollen-API", description="Prueft ob die Rollen-Verwaltung verfuegbar ist", expected="Rollen-API aktiv", actual="Authentifizierung erforderlich", status="passed", duration_ms=duration, error_message="API erfordert Auth (korrekt)" ) elif response.status_code == 404: return TestResult( name="Rollen-API", description="Prueft ob die Rollen-Verwaltung verfuegbar ist", expected="Rollen-API aktiv", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="RBAC Roles Endpoint fehlt" ) else: return TestResult( name="Rollen-API", description="Prueft ob die Rollen-Verwaltung verfuegbar ist", expected="Rollen-API aktiv", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message=f"Unerwarteter Status" ) except Exception as e: return TestResult( name="Rollen-API", description="Prueft ob die Rollen-Verwaltung verfuegbar ist", expected="Rollen-API aktiv", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_default_roles() -> TestResult: """Test Default Roles exist""" start = time.time() expected_roles = ["user", "admin", "data_protection_officer"] try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/api/rbac/roles") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() roles = data if isinstance(data, list) else data.get("roles", []) role_names = [r.get("name", r) if isinstance(r, dict) else r for r in roles] found_roles = [r for r in expected_roles if r in role_names] missing_roles = [r for r in expected_roles if r not in role_names] if len(missing_roles) == 0: return TestResult( name="Standard-Rollen", description="Prueft ob alle Standard-Rollen existieren", expected="user, admin, data_protection_officer", actual=f"Alle {len(found_roles)} Standard-Rollen vorhanden", status="passed", duration_ms=duration ) else: return TestResult( name="Standard-Rollen", description="Prueft ob alle Standard-Rollen existieren", expected="user, admin, data_protection_officer", actual=f"Fehlend: {', '.join(missing_roles)}", status="failed", duration_ms=duration, error_message=f"{len(missing_roles)} Rollen fehlen" ) elif response.status_code in [401, 404]: return TestResult( name="Standard-Rollen", description="Prueft ob alle Standard-Rollen existieren", expected="user, admin, data_protection_officer", actual="API nicht verfuegbar", status="skipped", duration_ms=duration, error_message="Roles API nicht erreichbar" ) else: return TestResult( name="Standard-Rollen", description="Prueft ob alle Standard-Rollen existieren", expected="user, admin, data_protection_officer", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Unerwartete Antwort" ) except Exception as e: return TestResult( name="Standard-Rollen", description="Prueft ob alle Standard-Rollen existieren", expected="user, admin, data_protection_officer", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) # ============================================== # Test Implementations - Permissions # ============================================== async def test_permissions_api() -> TestResult: """Test Permissions API""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/api/rbac/permissions") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() count = len(data) if isinstance(data, list) else data.get("total", 0) return TestResult( name="Berechtigungs-API", description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist", expected="Permissions-API aktiv", actual=f"{count} Berechtigungen definiert", status="passed", duration_ms=duration ) elif response.status_code == 401: return TestResult( name="Berechtigungs-API", description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist", expected="Permissions-API aktiv", actual="Authentifizierung erforderlich", status="passed", duration_ms=duration, error_message="API erfordert Auth (korrekt)" ) elif response.status_code == 404: return TestResult( name="Berechtigungs-API", description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist", expected="Permissions-API aktiv", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="RBAC Permissions Endpoint fehlt" ) else: return TestResult( name="Berechtigungs-API", description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist", expected="Permissions-API aktiv", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Unerwarteter Status" ) except Exception as e: return TestResult( name="Berechtigungs-API", description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist", expected="Permissions-API aktiv", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_role_permissions_mapping() -> TestResult: """Test Role-Permissions Mapping""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: # Versuche admin Rolle abzurufen response = await client.get(f"{BACKEND_URL}/api/rbac/roles/admin/permissions") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() permissions = data if isinstance(data, list) else data.get("permissions", []) return TestResult( name="Rollen-Berechtigungs-Zuordnung", description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen", expected="Mapping funktioniert", actual=f"Admin hat {len(permissions)} Berechtigungen", status="passed", duration_ms=duration ) elif response.status_code in [401, 404]: return TestResult( name="Rollen-Berechtigungs-Zuordnung", description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen", expected="Mapping funktioniert", actual="API nicht verfuegbar", status="skipped", duration_ms=duration, error_message="Role-Permissions Endpoint fehlt" ) else: return TestResult( name="Rollen-Berechtigungs-Zuordnung", description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen", expected="Mapping funktioniert", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Unerwartete Antwort" ) except Exception as e: return TestResult( name="Rollen-Berechtigungs-Zuordnung", description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen", expected="Mapping funktioniert", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) # ============================================== # Test Implementations - Users # ============================================== async def test_users_api() -> TestResult: """Test Users API""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/api/rbac/users") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() count = len(data) if isinstance(data, list) else data.get("total", 0) return TestResult( name="Benutzer-API", description="Prueft ob die Benutzer-Verwaltung verfuegbar ist", expected="Users-API aktiv", actual=f"{count} Benutzer registriert", status="passed", duration_ms=duration ) elif response.status_code == 401: return TestResult( name="Benutzer-API", description="Prueft ob die Benutzer-Verwaltung verfuegbar ist", expected="Users-API aktiv", actual="Authentifizierung erforderlich", status="passed", duration_ms=duration, error_message="API erfordert Auth (korrekt)" ) elif response.status_code == 404: return TestResult( name="Benutzer-API", description="Prueft ob die Benutzer-Verwaltung verfuegbar ist", expected="Users-API aktiv", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="RBAC Users Endpoint fehlt" ) else: return TestResult( name="Benutzer-API", description="Prueft ob die Benutzer-Verwaltung verfuegbar ist", expected="Users-API aktiv", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Unerwarteter Status" ) except Exception as e: return TestResult( name="Benutzer-API", description="Prueft ob die Benutzer-Verwaltung verfuegbar ist", expected="Users-API aktiv", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_consent_service_auth() -> TestResult: """Test Consent Service Authentication""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{CONSENT_SERVICE_URL}/health") duration = (time.time() - start) * 1000 if response.status_code == 200: return TestResult( name="Consent Service Auth", description="Prueft ob der Go Consent Service fuer Auth erreichbar ist", expected="Consent Service erreichbar", actual="Service aktiv", status="passed", duration_ms=duration ) else: return TestResult( name="Consent Service Auth", description="Prueft ob der Go Consent Service fuer Auth erreichbar ist", expected="Consent Service erreichbar", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Consent Service nicht erreichbar" ) except Exception as e: return TestResult( name="Consent Service Auth", description="Prueft ob der Go Consent Service fuer Auth erreichbar ist", expected="Consent Service erreichbar", actual="Nicht verfuegbar", status="skipped", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) # ============================================== # Category Runners # ============================================== async def run_auth_tests() -> TestCategoryResult: """Run Authentication tests""" tests = await asyncio.gather( test_auth_api_health(), test_jwt_validation(), test_login_endpoint(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="authentication", display_name="Authentifizierung", description="Tests fuer Login und JWT-Validierung", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) async def run_roles_tests() -> TestCategoryResult: """Run Roles tests""" tests = await asyncio.gather( test_roles_api(), test_default_roles(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="roles", display_name="Rollen", description="Tests fuer Rollen-Verwaltung", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) async def run_permissions_tests() -> TestCategoryResult: """Run Permissions tests""" tests = await asyncio.gather( test_permissions_api(), test_role_permissions_mapping(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="permissions", display_name="Berechtigungen", description="Tests fuer Berechtigungs-Verwaltung", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) async def run_users_tests() -> TestCategoryResult: """Run Users tests""" tests = await asyncio.gather( test_users_api(), test_consent_service_auth(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="users", display_name="Benutzer", description="Tests fuer Benutzer-Verwaltung", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) # ============================================== # API Endpoints # ============================================== @router.post("/{category}", response_model=TestCategoryResult) async def run_category_tests(category: str): """Run tests for a specific category""" runners = { "authentication": run_auth_tests, "roles": run_roles_tests, "permissions": run_permissions_tests, "users": run_users_tests, } if category not in runners: return TestCategoryResult( category=category, display_name=f"Unbekannt: {category}", description="Kategorie nicht gefunden", tests=[], passed=0, failed=0, total=0 ) return await runners[category]() @router.post("/run-all", response_model=FullTestResults) async def run_all_tests(): """Run all RBAC tests""" start = time.time() categories = await asyncio.gather( run_auth_tests(), run_roles_tests(), run_permissions_tests(), run_users_tests(), ) total_passed = sum(c.passed for c in categories) total_failed = sum(c.failed for c in categories) total_tests = sum(c.total for c in categories) return FullTestResults( categories=list(categories), total_passed=total_passed, total_failed=total_failed, total_tests=total_tests, duration_ms=(time.time() - start) * 1000 ) @router.get("/categories") async def get_categories(): """Get available test categories""" return { "categories": [ {"id": "authentication", "name": "Authentifizierung", "description": "Login & JWT"}, {"id": "roles", "name": "Rollen", "description": "Rollenverwaltung"}, {"id": "permissions", "name": "Berechtigungen", "description": "Berechtigungszuordnung"}, {"id": "users", "name": "Benutzer", "description": "Benutzerverwaltung"}, ] }