This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/rbac_test_api.py
Benjamin Admin bfdaf63ba9 fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

723 lines
27 KiB
Python

"""
RBAC Test API - Test Runner fuer Rollen- und Berechtigungsverwaltung
Endpoint: /api/admin/rbac-tests
"""
from fastapi import APIRouter
from pydantic import BaseModel
from typing import List, Optional, Literal
import httpx
import asyncio
import time
import os
router = APIRouter(prefix="/api/admin/rbac-tests", tags=["RBAC Tests"])
# ==============================================
# Models
# ==============================================
class TestResult(BaseModel):
name: str
description: str
expected: str
actual: str
status: Literal["passed", "failed", "pending", "skipped"]
duration_ms: float
error_message: Optional[str] = None
class TestCategoryResult(BaseModel):
category: str
display_name: str
description: str
tests: List[TestResult]
passed: int
failed: int
total: int
class FullTestResults(BaseModel):
categories: List[TestCategoryResult]
total_passed: int
total_failed: int
total_tests: int
duration_ms: float
# ==============================================
# Configuration
# ==============================================
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
CONSENT_SERVICE_URL = os.getenv("CONSENT_SERVICE_URL", "http://consent-service:8081")
# ==============================================
# Test Implementations - Authentication
# ==============================================
async def test_auth_api_health() -> TestResult:
"""Test Auth API Health"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{BACKEND_URL}/api/auth/health")
duration = (time.time() - start) * 1000
if response.status_code == 200:
return TestResult(
name="Auth API Health",
description="Prueft ob die Authentifizierungs-API erreichbar ist",
expected="Auth API aktiv",
actual="Auth API laeuft",
status="passed",
duration_ms=duration
)
elif response.status_code == 404:
return TestResult(
name="Auth API Health",
description="Prueft ob die Authentifizierungs-API erreichbar ist",
expected="Auth API aktiv",
actual="Endpoint nicht implementiert",
status="skipped",
duration_ms=duration,
error_message="Auth Health Endpoint fehlt"
)
else:
return TestResult(
name="Auth API Health",
description="Prueft ob die Authentifizierungs-API erreichbar ist",
expected="Auth API aktiv",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message="Auth API nicht erreichbar"
)
except Exception as e:
return TestResult(
name="Auth API Health",
description="Prueft ob die Authentifizierungs-API erreichbar ist",
expected="Auth API aktiv",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
async def test_jwt_validation() -> TestResult:
"""Test JWT Token Validation"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Test mit ungueltigem Token - sollte 401 zurueckgeben
response = await client.get(
f"{BACKEND_URL}/api/auth/me",
headers={"Authorization": "Bearer invalid-token"}
)
duration = (time.time() - start) * 1000
if response.status_code == 401:
return TestResult(
name="JWT Validierung",
description="Prueft ob ungueltige Tokens abgelehnt werden",
expected="401 fuer ungueltige Tokens",
actual="401 Unauthorized - korrekt abgelehnt",
status="passed",
duration_ms=duration
)
elif response.status_code == 404:
return TestResult(
name="JWT Validierung",
description="Prueft ob ungueltige Tokens abgelehnt werden",
expected="401 fuer ungueltige Tokens",
actual="Endpoint nicht implementiert",
status="skipped",
duration_ms=duration,
error_message="/api/auth/me Endpoint fehlt"
)
else:
return TestResult(
name="JWT Validierung",
description="Prueft ob ungueltige Tokens abgelehnt werden",
expected="401 fuer ungueltige Tokens",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message="JWT Validierung nicht korrekt"
)
except Exception as e:
return TestResult(
name="JWT Validierung",
description="Prueft ob ungueltige Tokens abgelehnt werden",
expected="401 fuer ungueltige Tokens",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
async def test_login_endpoint() -> TestResult:
"""Test Login Endpoint exists"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# OPTIONS Request um zu pruefen ob Endpoint existiert
response = await client.post(
f"{BACKEND_URL}/api/auth/login",
json={"email": "test@example.com", "password": "wrong"}
)
duration = (time.time() - start) * 1000
# 401 ist erwartet (falsche Credentials), aber zeigt dass Endpoint existiert
if response.status_code in [401, 400, 422]:
return TestResult(
name="Login Endpoint",
description="Prueft ob der Login-Endpoint verfuegbar ist",
expected="Login Endpoint aktiv",
actual=f"Endpoint aktiv (HTTP {response.status_code})",
status="passed",
duration_ms=duration
)
elif response.status_code == 404:
return TestResult(
name="Login Endpoint",
description="Prueft ob der Login-Endpoint verfuegbar ist",
expected="Login Endpoint aktiv",
actual="Endpoint nicht implementiert",
status="skipped",
duration_ms=duration,
error_message="Login Endpoint fehlt"
)
else:
return TestResult(
name="Login Endpoint",
description="Prueft ob der Login-Endpoint verfuegbar ist",
expected="Login Endpoint aktiv",
actual=f"HTTP {response.status_code}",
status="passed",
duration_ms=duration
)
except Exception as e:
return TestResult(
name="Login Endpoint",
description="Prueft ob der Login-Endpoint verfuegbar ist",
expected="Login Endpoint aktiv",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
# ==============================================
# Test Implementations - Roles
# ==============================================
async def test_roles_api() -> TestResult:
"""Test Roles API"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{BACKEND_URL}/api/rbac/roles")
duration = (time.time() - start) * 1000
if response.status_code == 200:
data = response.json()
count = len(data) if isinstance(data, list) else data.get("total", 0)
return TestResult(
name="Rollen-API",
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
expected="Rollen-API aktiv",
actual=f"{count} Rollen definiert",
status="passed",
duration_ms=duration
)
elif response.status_code == 401:
return TestResult(
name="Rollen-API",
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
expected="Rollen-API aktiv",
actual="Authentifizierung erforderlich",
status="passed",
duration_ms=duration,
error_message="API erfordert Auth (korrekt)"
)
elif response.status_code == 404:
return TestResult(
name="Rollen-API",
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
expected="Rollen-API aktiv",
actual="Endpoint nicht implementiert",
status="skipped",
duration_ms=duration,
error_message="RBAC Roles Endpoint fehlt"
)
else:
return TestResult(
name="Rollen-API",
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
expected="Rollen-API aktiv",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message=f"Unerwarteter Status"
)
except Exception as e:
return TestResult(
name="Rollen-API",
description="Prueft ob die Rollen-Verwaltung verfuegbar ist",
expected="Rollen-API aktiv",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
async def test_default_roles() -> TestResult:
"""Test Default Roles exist"""
start = time.time()
expected_roles = ["user", "admin", "data_protection_officer"]
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{BACKEND_URL}/api/rbac/roles")
duration = (time.time() - start) * 1000
if response.status_code == 200:
data = response.json()
roles = data if isinstance(data, list) else data.get("roles", [])
role_names = [r.get("name", r) if isinstance(r, dict) else r for r in roles]
found_roles = [r for r in expected_roles if r in role_names]
missing_roles = [r for r in expected_roles if r not in role_names]
if len(missing_roles) == 0:
return TestResult(
name="Standard-Rollen",
description="Prueft ob alle Standard-Rollen existieren",
expected="user, admin, data_protection_officer",
actual=f"Alle {len(found_roles)} Standard-Rollen vorhanden",
status="passed",
duration_ms=duration
)
else:
return TestResult(
name="Standard-Rollen",
description="Prueft ob alle Standard-Rollen existieren",
expected="user, admin, data_protection_officer",
actual=f"Fehlend: {', '.join(missing_roles)}",
status="failed",
duration_ms=duration,
error_message=f"{len(missing_roles)} Rollen fehlen"
)
elif response.status_code in [401, 404]:
return TestResult(
name="Standard-Rollen",
description="Prueft ob alle Standard-Rollen existieren",
expected="user, admin, data_protection_officer",
actual="API nicht verfuegbar",
status="skipped",
duration_ms=duration,
error_message="Roles API nicht erreichbar"
)
else:
return TestResult(
name="Standard-Rollen",
description="Prueft ob alle Standard-Rollen existieren",
expected="user, admin, data_protection_officer",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message="Unerwartete Antwort"
)
except Exception as e:
return TestResult(
name="Standard-Rollen",
description="Prueft ob alle Standard-Rollen existieren",
expected="user, admin, data_protection_officer",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
# ==============================================
# Test Implementations - Permissions
# ==============================================
async def test_permissions_api() -> TestResult:
"""Test Permissions API"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{BACKEND_URL}/api/rbac/permissions")
duration = (time.time() - start) * 1000
if response.status_code == 200:
data = response.json()
count = len(data) if isinstance(data, list) else data.get("total", 0)
return TestResult(
name="Berechtigungs-API",
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
expected="Permissions-API aktiv",
actual=f"{count} Berechtigungen definiert",
status="passed",
duration_ms=duration
)
elif response.status_code == 401:
return TestResult(
name="Berechtigungs-API",
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
expected="Permissions-API aktiv",
actual="Authentifizierung erforderlich",
status="passed",
duration_ms=duration,
error_message="API erfordert Auth (korrekt)"
)
elif response.status_code == 404:
return TestResult(
name="Berechtigungs-API",
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
expected="Permissions-API aktiv",
actual="Endpoint nicht implementiert",
status="skipped",
duration_ms=duration,
error_message="RBAC Permissions Endpoint fehlt"
)
else:
return TestResult(
name="Berechtigungs-API",
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
expected="Permissions-API aktiv",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message="Unerwarteter Status"
)
except Exception as e:
return TestResult(
name="Berechtigungs-API",
description="Prueft ob die Berechtigungs-Verwaltung verfuegbar ist",
expected="Permissions-API aktiv",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
async def test_role_permissions_mapping() -> TestResult:
"""Test Role-Permissions Mapping"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Versuche admin Rolle abzurufen
response = await client.get(f"{BACKEND_URL}/api/rbac/roles/admin/permissions")
duration = (time.time() - start) * 1000
if response.status_code == 200:
data = response.json()
permissions = data if isinstance(data, list) else data.get("permissions", [])
return TestResult(
name="Rollen-Berechtigungs-Zuordnung",
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
expected="Mapping funktioniert",
actual=f"Admin hat {len(permissions)} Berechtigungen",
status="passed",
duration_ms=duration
)
elif response.status_code in [401, 404]:
return TestResult(
name="Rollen-Berechtigungs-Zuordnung",
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
expected="Mapping funktioniert",
actual="API nicht verfuegbar",
status="skipped",
duration_ms=duration,
error_message="Role-Permissions Endpoint fehlt"
)
else:
return TestResult(
name="Rollen-Berechtigungs-Zuordnung",
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
expected="Mapping funktioniert",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message="Unerwartete Antwort"
)
except Exception as e:
return TestResult(
name="Rollen-Berechtigungs-Zuordnung",
description="Prueft ob Berechtigungen Rollen zugeordnet werden koennen",
expected="Mapping funktioniert",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
# ==============================================
# Test Implementations - Users
# ==============================================
async def test_users_api() -> TestResult:
"""Test Users API"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{BACKEND_URL}/api/rbac/users")
duration = (time.time() - start) * 1000
if response.status_code == 200:
data = response.json()
count = len(data) if isinstance(data, list) else data.get("total", 0)
return TestResult(
name="Benutzer-API",
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
expected="Users-API aktiv",
actual=f"{count} Benutzer registriert",
status="passed",
duration_ms=duration
)
elif response.status_code == 401:
return TestResult(
name="Benutzer-API",
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
expected="Users-API aktiv",
actual="Authentifizierung erforderlich",
status="passed",
duration_ms=duration,
error_message="API erfordert Auth (korrekt)"
)
elif response.status_code == 404:
return TestResult(
name="Benutzer-API",
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
expected="Users-API aktiv",
actual="Endpoint nicht implementiert",
status="skipped",
duration_ms=duration,
error_message="RBAC Users Endpoint fehlt"
)
else:
return TestResult(
name="Benutzer-API",
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
expected="Users-API aktiv",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message="Unerwarteter Status"
)
except Exception as e:
return TestResult(
name="Benutzer-API",
description="Prueft ob die Benutzer-Verwaltung verfuegbar ist",
expected="Users-API aktiv",
actual=f"Fehler: {str(e)}",
status="failed",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
async def test_consent_service_auth() -> TestResult:
"""Test Consent Service Authentication"""
start = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{CONSENT_SERVICE_URL}/health")
duration = (time.time() - start) * 1000
if response.status_code == 200:
return TestResult(
name="Consent Service Auth",
description="Prueft ob der Go Consent Service fuer Auth erreichbar ist",
expected="Consent Service erreichbar",
actual="Service aktiv",
status="passed",
duration_ms=duration
)
else:
return TestResult(
name="Consent Service Auth",
description="Prueft ob der Go Consent Service fuer Auth erreichbar ist",
expected="Consent Service erreichbar",
actual=f"HTTP {response.status_code}",
status="failed",
duration_ms=duration,
error_message="Consent Service nicht erreichbar"
)
except Exception as e:
return TestResult(
name="Consent Service Auth",
description="Prueft ob der Go Consent Service fuer Auth erreichbar ist",
expected="Consent Service erreichbar",
actual="Nicht verfuegbar",
status="skipped",
duration_ms=(time.time() - start) * 1000,
error_message=str(e)
)
# ==============================================
# Category Runners
# ==============================================
async def run_auth_tests() -> TestCategoryResult:
"""Run Authentication tests"""
tests = await asyncio.gather(
test_auth_api_health(),
test_jwt_validation(),
test_login_endpoint(),
)
passed = sum(1 for t in tests if t.status == "passed")
failed = sum(1 for t in tests if t.status == "failed")
return TestCategoryResult(
category="authentication",
display_name="Authentifizierung",
description="Tests fuer Login und JWT-Validierung",
tests=list(tests),
passed=passed,
failed=failed,
total=len(tests)
)
async def run_roles_tests() -> TestCategoryResult:
"""Run Roles tests"""
tests = await asyncio.gather(
test_roles_api(),
test_default_roles(),
)
passed = sum(1 for t in tests if t.status == "passed")
failed = sum(1 for t in tests if t.status == "failed")
return TestCategoryResult(
category="roles",
display_name="Rollen",
description="Tests fuer Rollen-Verwaltung",
tests=list(tests),
passed=passed,
failed=failed,
total=len(tests)
)
async def run_permissions_tests() -> TestCategoryResult:
"""Run Permissions tests"""
tests = await asyncio.gather(
test_permissions_api(),
test_role_permissions_mapping(),
)
passed = sum(1 for t in tests if t.status == "passed")
failed = sum(1 for t in tests if t.status == "failed")
return TestCategoryResult(
category="permissions",
display_name="Berechtigungen",
description="Tests fuer Berechtigungs-Verwaltung",
tests=list(tests),
passed=passed,
failed=failed,
total=len(tests)
)
async def run_users_tests() -> TestCategoryResult:
"""Run Users tests"""
tests = await asyncio.gather(
test_users_api(),
test_consent_service_auth(),
)
passed = sum(1 for t in tests if t.status == "passed")
failed = sum(1 for t in tests if t.status == "failed")
return TestCategoryResult(
category="users",
display_name="Benutzer",
description="Tests fuer Benutzer-Verwaltung",
tests=list(tests),
passed=passed,
failed=failed,
total=len(tests)
)
# ==============================================
# API Endpoints
# ==============================================
@router.post("/{category}", response_model=TestCategoryResult)
async def run_category_tests(category: str):
"""Run tests for a specific category"""
runners = {
"authentication": run_auth_tests,
"roles": run_roles_tests,
"permissions": run_permissions_tests,
"users": run_users_tests,
}
if category not in runners:
return TestCategoryResult(
category=category,
display_name=f"Unbekannt: {category}",
description="Kategorie nicht gefunden",
tests=[],
passed=0,
failed=0,
total=0
)
return await runners[category]()
@router.post("/run-all", response_model=FullTestResults)
async def run_all_tests():
"""Run all RBAC tests"""
start = time.time()
categories = await asyncio.gather(
run_auth_tests(),
run_roles_tests(),
run_permissions_tests(),
run_users_tests(),
)
total_passed = sum(c.passed for c in categories)
total_failed = sum(c.failed for c in categories)
total_tests = sum(c.total for c in categories)
return FullTestResults(
categories=list(categories),
total_passed=total_passed,
total_failed=total_failed,
total_tests=total_tests,
duration_ms=(time.time() - start) * 1000
)
@router.get("/categories")
async def get_categories():
"""Get available test categories"""
return {
"categories": [
{"id": "authentication", "name": "Authentifizierung", "description": "Login & JWT"},
{"id": "roles", "name": "Rollen", "description": "Rollenverwaltung"},
{"id": "permissions", "name": "Berechtigungen", "description": "Berechtigungszuordnung"},
{"id": "users", "name": "Benutzer", "description": "Benutzerverwaltung"},
]
}