Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
723 lines
27 KiB
Python
723 lines
27 KiB
Python
"""
|
|
RBAC Test API - Test Runner fuer Rollen- und Berechtigungsverwaltung
|
|
Endpoint: /api/admin/rbac-tests
|
|
"""
|
|
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
from typing import List, Optional, Literal
|
|
import httpx
|
|
import asyncio
|
|
import time
|
|
import os
|
|
|
|
router = APIRouter(prefix="/api/admin/rbac-tests", tags=["RBAC Tests"])
|
|
|
|
# ==============================================
|
|
# Models
|
|
# ==============================================
|
|
|
|
class TestResult(BaseModel):
|
|
name: str
|
|
description: str
|
|
expected: str
|
|
actual: str
|
|
status: Literal["passed", "failed", "pending", "skipped"]
|
|
duration_ms: float
|
|
error_message: Optional[str] = None
|
|
|
|
|
|
class TestCategoryResult(BaseModel):
|
|
category: str
|
|
display_name: str
|
|
description: str
|
|
tests: List[TestResult]
|
|
passed: int
|
|
failed: int
|
|
total: int
|
|
|
|
|
|
class FullTestResults(BaseModel):
|
|
categories: List[TestCategoryResult]
|
|
total_passed: int
|
|
total_failed: int
|
|
total_tests: int
|
|
duration_ms: float
|
|
|
|
|
|
# ==============================================
|
|
# Configuration
|
|
# ==============================================
|
|
|
|
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
|
|
CONSENT_SERVICE_URL = os.getenv("CONSENT_SERVICE_URL", "http://consent-service:8081")
|
|
|
|
|
|
# ==============================================
|
|
# Test Implementations - Authentication
|
|
# ==============================================
|
|
|
|
async def test_auth_api_health() -> TestResult:
|
|
"""Test Auth API Health"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(f"{BACKEND_URL}/api/auth/health")
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 200:
|
|
return TestResult(
|
|
name="Auth API Health",
|
|
description="Prueft ob die Authentifizierungs-API erreichbar ist",
|
|
expected="Auth API aktiv",
|
|
actual="Auth API laeuft",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
elif response.status_code == 404:
|
|
return TestResult(
|
|
name="Auth API Health",
|
|
description="Prueft ob die Authentifizierungs-API erreichbar ist",
|
|
expected="Auth API aktiv",
|
|
actual="Endpoint nicht implementiert",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="Auth Health Endpoint fehlt"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Auth API Health",
|
|
description="Prueft ob die Authentifizierungs-API erreichbar ist",
|
|
expected="Auth API aktiv",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message="Auth API nicht erreichbar"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Auth API Health",
|
|
description="Prueft ob die Authentifizierungs-API erreichbar ist",
|
|
expected="Auth API aktiv",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
async def test_jwt_validation() -> TestResult:
|
|
"""Test JWT Token Validation"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
# Test mit ungueltigem Token - sollte 401 zurueckgeben
|
|
response = await client.get(
|
|
f"{BACKEND_URL}/api/auth/me",
|
|
headers={"Authorization": "Bearer invalid-token"}
|
|
)
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 401:
|
|
return TestResult(
|
|
name="JWT Validierung",
|
|
description="Prueft ob ungueltige Tokens abgelehnt werden",
|
|
expected="401 fuer ungueltige Tokens",
|
|
actual="401 Unauthorized - korrekt abgelehnt",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
elif response.status_code == 404:
|
|
return TestResult(
|
|
name="JWT Validierung",
|
|
description="Prueft ob ungueltige Tokens abgelehnt werden",
|
|
expected="401 fuer ungueltige Tokens",
|
|
actual="Endpoint nicht implementiert",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="/api/auth/me Endpoint fehlt"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="JWT Validierung",
|
|
description="Prueft ob ungueltige Tokens abgelehnt werden",
|
|
expected="401 fuer ungueltige Tokens",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message="JWT Validierung nicht korrekt"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="JWT Validierung",
|
|
description="Prueft ob ungueltige Tokens abgelehnt werden",
|
|
expected="401 fuer ungueltige Tokens",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
async def test_login_endpoint() -> TestResult:
|
|
"""Test Login Endpoint exists"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
# OPTIONS Request um zu pruefen ob Endpoint existiert
|
|
response = await client.post(
|
|
f"{BACKEND_URL}/api/auth/login",
|
|
json={"email": "test@example.com", "password": "wrong"}
|
|
)
|
|
duration = (time.time() - start) * 1000
|
|
|
|
# 401 ist erwartet (falsche Credentials), aber zeigt dass Endpoint existiert
|
|
if response.status_code in [401, 400, 422]:
|
|
return TestResult(
|
|
name="Login Endpoint",
|
|
description="Prueft ob der Login-Endpoint verfuegbar ist",
|
|
expected="Login Endpoint aktiv",
|
|
actual=f"Endpoint aktiv (HTTP {response.status_code})",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
elif response.status_code == 404:
|
|
return TestResult(
|
|
name="Login Endpoint",
|
|
description="Prueft ob der Login-Endpoint verfuegbar ist",
|
|
expected="Login Endpoint aktiv",
|
|
actual="Endpoint nicht implementiert",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="Login Endpoint fehlt"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Login Endpoint",
|
|
description="Prueft ob der Login-Endpoint verfuegbar ist",
|
|
expected="Login Endpoint aktiv",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Login Endpoint",
|
|
description="Prueft ob der Login-Endpoint verfuegbar ist",
|
|
expected="Login Endpoint aktiv",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# Test Implementations - Roles
|
|
# ==============================================
|
|
|
|
async def test_roles_api() -> TestResult:
|
|
"""Test Roles API"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(f"{BACKEND_URL}/api/rbac/roles")
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = len(data) if isinstance(data, list) else data.get("total", 0)
|
|
return TestResult(
|
|
name="Rollen-API",
|
|
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
|
|
expected="Rollen-API aktiv",
|
|
actual=f"{count} Rollen definiert",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
elif response.status_code == 401:
|
|
return TestResult(
|
|
name="Rollen-API",
|
|
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
|
|
expected="Rollen-API aktiv",
|
|
actual="Authentifizierung erforderlich",
|
|
status="passed",
|
|
duration_ms=duration,
|
|
error_message="API erfordert Auth (korrekt)"
|
|
)
|
|
elif response.status_code == 404:
|
|
return TestResult(
|
|
name="Rollen-API",
|
|
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
|
|
expected="Rollen-API aktiv",
|
|
actual="Endpoint nicht implementiert",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="RBAC Roles Endpoint fehlt"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Rollen-API",
|
|
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
|
|
expected="Rollen-API aktiv",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message=f"Unerwarteter Status"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Rollen-API",
|
|
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
|
|
expected="Rollen-API aktiv",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
async def test_default_roles() -> TestResult:
|
|
"""Test Default Roles exist"""
|
|
start = time.time()
|
|
expected_roles = ["user", "admin", "data_protection_officer"]
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(f"{BACKEND_URL}/api/rbac/roles")
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
roles = data if isinstance(data, list) else data.get("roles", [])
|
|
role_names = [r.get("name", r) if isinstance(r, dict) else r for r in roles]
|
|
|
|
found_roles = [r for r in expected_roles if r in role_names]
|
|
missing_roles = [r for r in expected_roles if r not in role_names]
|
|
|
|
if len(missing_roles) == 0:
|
|
return TestResult(
|
|
name="Standard-Rollen",
|
|
description="Prueft ob alle Standard-Rollen existieren",
|
|
expected="user, admin, data_protection_officer",
|
|
actual=f"Alle {len(found_roles)} Standard-Rollen vorhanden",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Standard-Rollen",
|
|
description="Prueft ob alle Standard-Rollen existieren",
|
|
expected="user, admin, data_protection_officer",
|
|
actual=f"Fehlend: {', '.join(missing_roles)}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message=f"{len(missing_roles)} Rollen fehlen"
|
|
)
|
|
elif response.status_code in [401, 404]:
|
|
return TestResult(
|
|
name="Standard-Rollen",
|
|
description="Prueft ob alle Standard-Rollen existieren",
|
|
expected="user, admin, data_protection_officer",
|
|
actual="API nicht verfuegbar",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="Roles API nicht erreichbar"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Standard-Rollen",
|
|
description="Prueft ob alle Standard-Rollen existieren",
|
|
expected="user, admin, data_protection_officer",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message="Unerwartete Antwort"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Standard-Rollen",
|
|
description="Prueft ob alle Standard-Rollen existieren",
|
|
expected="user, admin, data_protection_officer",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# Test Implementations - Permissions
|
|
# ==============================================
|
|
|
|
async def test_permissions_api() -> TestResult:
|
|
"""Test Permissions API"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(f"{BACKEND_URL}/api/rbac/permissions")
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = len(data) if isinstance(data, list) else data.get("total", 0)
|
|
return TestResult(
|
|
name="Berechtigungs-API",
|
|
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
|
|
expected="Permissions-API aktiv",
|
|
actual=f"{count} Berechtigungen definiert",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
elif response.status_code == 401:
|
|
return TestResult(
|
|
name="Berechtigungs-API",
|
|
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
|
|
expected="Permissions-API aktiv",
|
|
actual="Authentifizierung erforderlich",
|
|
status="passed",
|
|
duration_ms=duration,
|
|
error_message="API erfordert Auth (korrekt)"
|
|
)
|
|
elif response.status_code == 404:
|
|
return TestResult(
|
|
name="Berechtigungs-API",
|
|
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
|
|
expected="Permissions-API aktiv",
|
|
actual="Endpoint nicht implementiert",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="RBAC Permissions Endpoint fehlt"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Berechtigungs-API",
|
|
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
|
|
expected="Permissions-API aktiv",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message="Unerwarteter Status"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Berechtigungs-API",
|
|
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
|
|
expected="Permissions-API aktiv",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
async def test_role_permissions_mapping() -> TestResult:
|
|
"""Test Role-Permissions Mapping"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
# Versuche admin Rolle abzurufen
|
|
response = await client.get(f"{BACKEND_URL}/api/rbac/roles/admin/permissions")
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
permissions = data if isinstance(data, list) else data.get("permissions", [])
|
|
return TestResult(
|
|
name="Rollen-Berechtigungs-Zuordnung",
|
|
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
|
|
expected="Mapping funktioniert",
|
|
actual=f"Admin hat {len(permissions)} Berechtigungen",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
elif response.status_code in [401, 404]:
|
|
return TestResult(
|
|
name="Rollen-Berechtigungs-Zuordnung",
|
|
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
|
|
expected="Mapping funktioniert",
|
|
actual="API nicht verfuegbar",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="Role-Permissions Endpoint fehlt"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Rollen-Berechtigungs-Zuordnung",
|
|
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
|
|
expected="Mapping funktioniert",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message="Unerwartete Antwort"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Rollen-Berechtigungs-Zuordnung",
|
|
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
|
|
expected="Mapping funktioniert",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# Test Implementations - Users
|
|
# ==============================================
|
|
|
|
async def test_users_api() -> TestResult:
|
|
"""Test Users API"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(f"{BACKEND_URL}/api/rbac/users")
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
count = len(data) if isinstance(data, list) else data.get("total", 0)
|
|
return TestResult(
|
|
name="Benutzer-API",
|
|
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
|
|
expected="Users-API aktiv",
|
|
actual=f"{count} Benutzer registriert",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
elif response.status_code == 401:
|
|
return TestResult(
|
|
name="Benutzer-API",
|
|
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
|
|
expected="Users-API aktiv",
|
|
actual="Authentifizierung erforderlich",
|
|
status="passed",
|
|
duration_ms=duration,
|
|
error_message="API erfordert Auth (korrekt)"
|
|
)
|
|
elif response.status_code == 404:
|
|
return TestResult(
|
|
name="Benutzer-API",
|
|
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
|
|
expected="Users-API aktiv",
|
|
actual="Endpoint nicht implementiert",
|
|
status="skipped",
|
|
duration_ms=duration,
|
|
error_message="RBAC Users Endpoint fehlt"
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Benutzer-API",
|
|
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
|
|
expected="Users-API aktiv",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message="Unerwarteter Status"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Benutzer-API",
|
|
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
|
|
expected="Users-API aktiv",
|
|
actual=f"Fehler: {str(e)}",
|
|
status="failed",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
async def test_consent_service_auth() -> TestResult:
|
|
"""Test Consent Service Authentication"""
|
|
start = time.time()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(f"{CONSENT_SERVICE_URL}/health")
|
|
duration = (time.time() - start) * 1000
|
|
|
|
if response.status_code == 200:
|
|
return TestResult(
|
|
name="Consent Service Auth",
|
|
description="Prueft ob der Go Consent Service fuer Auth erreichbar ist",
|
|
expected="Consent Service erreichbar",
|
|
actual="Service aktiv",
|
|
status="passed",
|
|
duration_ms=duration
|
|
)
|
|
else:
|
|
return TestResult(
|
|
name="Consent Service Auth",
|
|
description="Prueft ob der Go Consent Service fuer Auth erreichbar ist",
|
|
expected="Consent Service erreichbar",
|
|
actual=f"HTTP {response.status_code}",
|
|
status="failed",
|
|
duration_ms=duration,
|
|
error_message="Consent Service nicht erreichbar"
|
|
)
|
|
except Exception as e:
|
|
return TestResult(
|
|
name="Consent Service Auth",
|
|
description="Prueft ob der Go Consent Service fuer Auth erreichbar ist",
|
|
expected="Consent Service erreichbar",
|
|
actual="Nicht verfuegbar",
|
|
status="skipped",
|
|
duration_ms=(time.time() - start) * 1000,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# Category Runners
|
|
# ==============================================
|
|
|
|
async def run_auth_tests() -> TestCategoryResult:
|
|
"""Run Authentication tests"""
|
|
tests = await asyncio.gather(
|
|
test_auth_api_health(),
|
|
test_jwt_validation(),
|
|
test_login_endpoint(),
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
failed = sum(1 for t in tests if t.status == "failed")
|
|
|
|
return TestCategoryResult(
|
|
category="authentication",
|
|
display_name="Authentifizierung",
|
|
description="Tests fuer Login und JWT-Validierung",
|
|
tests=list(tests),
|
|
passed=passed,
|
|
failed=failed,
|
|
total=len(tests)
|
|
)
|
|
|
|
|
|
async def run_roles_tests() -> TestCategoryResult:
|
|
"""Run Roles tests"""
|
|
tests = await asyncio.gather(
|
|
test_roles_api(),
|
|
test_default_roles(),
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
failed = sum(1 for t in tests if t.status == "failed")
|
|
|
|
return TestCategoryResult(
|
|
category="roles",
|
|
display_name="Rollen",
|
|
description="Tests fuer Rollen-Verwaltung",
|
|
tests=list(tests),
|
|
passed=passed,
|
|
failed=failed,
|
|
total=len(tests)
|
|
)
|
|
|
|
|
|
async def run_permissions_tests() -> TestCategoryResult:
|
|
"""Run Permissions tests"""
|
|
tests = await asyncio.gather(
|
|
test_permissions_api(),
|
|
test_role_permissions_mapping(),
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
failed = sum(1 for t in tests if t.status == "failed")
|
|
|
|
return TestCategoryResult(
|
|
category="permissions",
|
|
display_name="Berechtigungen",
|
|
description="Tests fuer Berechtigungs-Verwaltung",
|
|
tests=list(tests),
|
|
passed=passed,
|
|
failed=failed,
|
|
total=len(tests)
|
|
)
|
|
|
|
|
|
async def run_users_tests() -> TestCategoryResult:
|
|
"""Run Users tests"""
|
|
tests = await asyncio.gather(
|
|
test_users_api(),
|
|
test_consent_service_auth(),
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
failed = sum(1 for t in tests if t.status == "failed")
|
|
|
|
return TestCategoryResult(
|
|
category="users",
|
|
display_name="Benutzer",
|
|
description="Tests fuer Benutzer-Verwaltung",
|
|
tests=list(tests),
|
|
passed=passed,
|
|
failed=failed,
|
|
total=len(tests)
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# API Endpoints
|
|
# ==============================================
|
|
|
|
@router.post("/{category}", response_model=TestCategoryResult)
|
|
async def run_category_tests(category: str):
|
|
"""Run tests for a specific category"""
|
|
runners = {
|
|
"authentication": run_auth_tests,
|
|
"roles": run_roles_tests,
|
|
"permissions": run_permissions_tests,
|
|
"users": run_users_tests,
|
|
}
|
|
|
|
if category not in runners:
|
|
return TestCategoryResult(
|
|
category=category,
|
|
display_name=f"Unbekannt: {category}",
|
|
description="Kategorie nicht gefunden",
|
|
tests=[],
|
|
passed=0,
|
|
failed=0,
|
|
total=0
|
|
)
|
|
|
|
return await runners[category]()
|
|
|
|
|
|
@router.post("/run-all", response_model=FullTestResults)
|
|
async def run_all_tests():
|
|
"""Run all RBAC tests"""
|
|
start = time.time()
|
|
|
|
categories = await asyncio.gather(
|
|
run_auth_tests(),
|
|
run_roles_tests(),
|
|
run_permissions_tests(),
|
|
run_users_tests(),
|
|
)
|
|
|
|
total_passed = sum(c.passed for c in categories)
|
|
total_failed = sum(c.failed for c in categories)
|
|
total_tests = sum(c.total for c in categories)
|
|
|
|
return FullTestResults(
|
|
categories=list(categories),
|
|
total_passed=total_passed,
|
|
total_failed=total_failed,
|
|
total_tests=total_tests,
|
|
duration_ms=(time.time() - start) * 1000
|
|
)
|
|
|
|
|
|
@router.get("/categories")
|
|
async def get_categories():
|
|
"""Get available test categories"""
|
|
return {
|
|
"categories": [
|
|
{"id": "authentication", "name": "Authentifizierung", "description": "Login & JWT"},
|
|
{"id": "roles", "name": "Rollen", "description": "Rollenverwaltung"},
|
|
{"id": "permissions", "name": "Berechtigungen", "description": "Berechtigungszuordnung"},
|
|
{"id": "users", "name": "Benutzer", "description": "Benutzerverwaltung"},
|
|
]
|
|
}
|