This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/tests/test_keycloak_auth.py
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

517 lines
16 KiB
Python

"""
Tests for Keycloak Authentication Module
Tests cover:
- Local JWT validation
- Keycloak token detection
- HybridAuthenticator token routing
- FastAPI dependency integration
"""
import pytest
import jwt
import os
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
# Import the auth module
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from auth.keycloak_auth import (
KeycloakConfig,
KeycloakUser,
KeycloakAuthenticator,
HybridAuthenticator,
TokenExpiredError,
TokenInvalidError,
KeycloakConfigError,
get_keycloak_config_from_env,
)
# =============================================
# Test Data
# =============================================
TEST_JWT_SECRET = "test-secret-key-32-chars-min-here"
TEST_USER_ID = "10000000-0000-0000-0000-000000000001"
TEST_EMAIL = "lehrer@test.de"
def create_local_token(
user_id: str = TEST_USER_ID,
email: str = TEST_EMAIL,
role: str = "teacher",
exp_hours: int = 1
) -> str:
"""Create a local JWT token for testing."""
payload = {
"user_id": user_id,
"email": email,
"role": role,
"iss": "breakpilot",
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=exp_hours),
}
return jwt.encode(payload, TEST_JWT_SECRET, algorithm="HS256")
def create_expired_token() -> str:
"""Create an expired JWT token."""
payload = {
"user_id": TEST_USER_ID,
"email": TEST_EMAIL,
"role": "teacher",
"iss": "breakpilot",
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
}
return jwt.encode(payload, TEST_JWT_SECRET, algorithm="HS256")
# =============================================
# KeycloakConfig Tests
# =============================================
class TestKeycloakConfig:
"""Tests for KeycloakConfig dataclass."""
def test_config_urls(self):
"""Test URL generation from config."""
config = KeycloakConfig(
server_url="https://keycloak.example.com",
realm="test-realm",
client_id="test-client"
)
assert config.issuer_url == "https://keycloak.example.com/realms/test-realm"
assert config.jwks_url == "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/certs"
assert config.token_url == "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/token"
def test_config_defaults(self):
"""Test default values."""
config = KeycloakConfig(
server_url="https://kc.example.com",
realm="breakpilot",
client_id="backend"
)
assert config.client_secret is None
assert config.verify_ssl is True
# =============================================
# KeycloakUser Tests
# =============================================
class TestKeycloakUser:
"""Tests for KeycloakUser dataclass."""
def test_has_realm_role(self):
"""Test realm role checking."""
user = KeycloakUser(
user_id="user-123",
email="test@example.com",
email_verified=True,
name="Test User",
given_name="Test",
family_name="User",
realm_roles=["teacher", "admin"],
client_roles={},
groups=[],
tenant_id=None,
raw_claims={}
)
assert user.has_realm_role("teacher") is True
assert user.has_realm_role("admin") is True
assert user.has_realm_role("superadmin") is False
def test_has_client_role(self):
"""Test client role checking."""
user = KeycloakUser(
user_id="user-123",
email="test@example.com",
email_verified=True,
name="Test User",
given_name=None,
family_name=None,
realm_roles=[],
client_roles={"backend": ["editor", "viewer"]},
groups=[],
tenant_id=None,
raw_claims={}
)
assert user.has_client_role("backend", "editor") is True
assert user.has_client_role("backend", "admin") is False
assert user.has_client_role("frontend", "viewer") is False
def test_is_admin(self):
"""Test admin detection."""
admin_user = KeycloakUser(
user_id="admin-123",
email="admin@example.com",
email_verified=True,
name=None,
given_name=None,
family_name=None,
realm_roles=["admin"],
client_roles={},
groups=[],
tenant_id=None,
raw_claims={}
)
regular_user = KeycloakUser(
user_id="user-123",
email="user@example.com",
email_verified=True,
name=None,
given_name=None,
family_name=None,
realm_roles=["teacher"],
client_roles={},
groups=[],
tenant_id=None,
raw_claims={}
)
assert admin_user.is_admin() is True
assert regular_user.is_admin() is False
def test_is_teacher(self):
"""Test teacher detection with German role name."""
teacher_de = KeycloakUser(
user_id="t1",
email="t@test.de",
email_verified=True,
name=None,
given_name=None,
family_name=None,
realm_roles=["lehrer"], # German role name
client_roles={},
groups=[],
tenant_id=None,
raw_claims={}
)
teacher_en = KeycloakUser(
user_id="t2",
email="t2@test.de",
email_verified=True,
name=None,
given_name=None,
family_name=None,
realm_roles=["teacher"], # English role name
client_roles={},
groups=[],
tenant_id=None,
raw_claims={}
)
assert teacher_de.is_teacher() is True
assert teacher_en.is_teacher() is True
# =============================================
# HybridAuthenticator Tests (Local JWT)
# =============================================
class TestHybridAuthenticatorLocalJWT:
"""Tests for HybridAuthenticator with local JWT."""
@pytest.fixture
def authenticator(self):
"""Create authenticator without Keycloak."""
return HybridAuthenticator(
keycloak_config=None,
local_jwt_secret=TEST_JWT_SECRET,
environment="development"
)
@pytest.mark.asyncio
async def test_valid_local_token(self, authenticator):
"""Test validation of valid local token."""
token = create_local_token()
user = await authenticator.validate_token(token)
assert user["user_id"] == TEST_USER_ID
assert user["email"] == TEST_EMAIL
assert user["role"] == "teacher"
assert user["auth_method"] == "local_jwt"
@pytest.mark.asyncio
async def test_expired_token(self, authenticator):
"""Test that expired tokens are rejected."""
token = create_expired_token()
with pytest.raises(TokenExpiredError):
await authenticator.validate_token(token)
@pytest.mark.asyncio
async def test_invalid_token(self, authenticator):
"""Test that invalid tokens are rejected."""
with pytest.raises(TokenInvalidError):
await authenticator.validate_token("invalid.token.here")
@pytest.mark.asyncio
async def test_empty_token(self, authenticator):
"""Test that empty tokens are rejected."""
with pytest.raises(TokenInvalidError):
await authenticator.validate_token("")
@pytest.mark.asyncio
async def test_wrong_secret(self):
"""Test that tokens signed with wrong secret are rejected."""
auth = HybridAuthenticator(
keycloak_config=None,
local_jwt_secret="different-secret-key-here-32chars",
environment="development"
)
token = create_local_token() # Signed with TEST_JWT_SECRET
with pytest.raises(TokenInvalidError):
await auth.validate_token(token)
# =============================================
# HybridAuthenticator Tests (Role Mapping)
# =============================================
class TestRoleMapping:
"""Tests for role mapping in HybridAuthenticator."""
@pytest.fixture
def authenticator(self):
return HybridAuthenticator(
keycloak_config=None,
local_jwt_secret=TEST_JWT_SECRET,
environment="development"
)
@pytest.mark.asyncio
async def test_admin_role_mapping(self, authenticator):
"""Test that admin role is preserved."""
token = create_local_token(role="admin")
user = await authenticator.validate_token(token)
assert user["role"] == "admin"
assert "admin" in user["realm_roles"]
@pytest.mark.asyncio
async def test_teacher_role_mapping(self, authenticator):
"""Test that teacher role is preserved."""
token = create_local_token(role="teacher")
user = await authenticator.validate_token(token)
assert user["role"] == "teacher"
@pytest.mark.asyncio
async def test_user_role_default(self, authenticator):
"""Test that unknown roles default to user."""
token = create_local_token(role="custom_role")
user = await authenticator.validate_token(token)
# Custom role should be preserved
assert user["role"] == "custom_role"
# =============================================
# Environment Configuration Tests
# =============================================
class TestEnvironmentConfiguration:
"""Tests for environment-based configuration."""
def test_keycloak_config_from_env_missing(self):
"""Test that missing env vars return None."""
# Clear any existing env vars
for key in ["KEYCLOAK_SERVER_URL", "KEYCLOAK_REALM", "KEYCLOAK_CLIENT_ID"]:
os.environ.pop(key, None)
config = get_keycloak_config_from_env()
assert config is None
def test_keycloak_config_from_env_complete(self):
"""Test that complete env vars create config."""
os.environ["KEYCLOAK_SERVER_URL"] = "https://kc.test.com"
os.environ["KEYCLOAK_REALM"] = "test"
os.environ["KEYCLOAK_CLIENT_ID"] = "test-client"
try:
config = get_keycloak_config_from_env()
assert config is not None
assert config.server_url == "https://kc.test.com"
assert config.realm == "test"
assert config.client_id == "test-client"
finally:
# Cleanup
os.environ.pop("KEYCLOAK_SERVER_URL", None)
os.environ.pop("KEYCLOAK_REALM", None)
os.environ.pop("KEYCLOAK_CLIENT_ID", None)
# =============================================
# Token Detection Tests
# =============================================
class TestTokenDetection:
"""Tests for automatic token type detection."""
@pytest.mark.asyncio
async def test_local_token_detection(self):
"""Test that local tokens are correctly detected."""
auth = HybridAuthenticator(
keycloak_config=None,
local_jwt_secret=TEST_JWT_SECRET,
environment="development"
)
token = create_local_token()
user = await auth.validate_token(token)
assert user["auth_method"] == "local_jwt"
@pytest.mark.asyncio
async def test_keycloak_token_detection_without_keycloak(self):
"""Test that Keycloak tokens fail when Keycloak is not configured."""
auth = HybridAuthenticator(
keycloak_config=None,
local_jwt_secret=TEST_JWT_SECRET,
environment="development"
)
# Create a fake Keycloak-style token
payload = {
"sub": "user-123",
"email": "test@test.com",
"iss": "https://keycloak.example.com/realms/test",
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
}
fake_kc_token = jwt.encode(payload, "different-key", algorithm="HS256")
# Should fail because local JWT validation will fail
with pytest.raises(TokenInvalidError):
await auth.validate_token(fake_kc_token)
# =============================================
# Integration Tests (with Mock FastAPI Request)
# =============================================
class TestFastAPIIntegration:
"""Tests for FastAPI dependency integration."""
@pytest.mark.asyncio
async def test_get_current_user_valid_token(self):
"""Test get_current_user with valid token."""
from auth.keycloak_auth import get_current_user
from unittest.mock import AsyncMock
# Create a mock request
mock_request = MagicMock()
token = create_local_token()
mock_request.headers.get.return_value = f"Bearer {token}"
# Patch environment
with patch.dict(os.environ, {
"JWT_SECRET": TEST_JWT_SECRET,
"ENVIRONMENT": "development"
}):
# Reset the global authenticator
import auth.keycloak_auth as auth_module
auth_module._authenticator = None
user = await get_current_user(mock_request)
assert user["user_id"] == TEST_USER_ID
assert user["email"] == TEST_EMAIL
@pytest.mark.asyncio
async def test_get_current_user_development_bypass(self):
"""Test that development mode allows requests without token."""
from auth.keycloak_auth import get_current_user
from fastapi import HTTPException
mock_request = MagicMock()
mock_request.headers.get.return_value = "" # No auth header
with patch.dict(os.environ, {
"JWT_SECRET": TEST_JWT_SECRET,
"ENVIRONMENT": "development"
}):
import auth.keycloak_auth as auth_module
auth_module._authenticator = None
# In development, should return demo user
user = await get_current_user(mock_request)
assert user["auth_method"] == "development_bypass"
# =============================================
# Security Tests
# =============================================
class TestSecurityEdgeCases:
"""Tests for security edge cases."""
@pytest.mark.asyncio
async def test_no_jwt_secret_in_production(self):
"""Test that missing JWT_SECRET raises error in production."""
with patch.dict(os.environ, {"ENVIRONMENT": "production"}, clear=True):
with pytest.raises(KeycloakConfigError):
from auth.keycloak_auth import get_authenticator
# This should fail because JWT_SECRET is required in production
import auth.keycloak_auth as auth_module
auth_module._authenticator = None
get_authenticator()
@pytest.mark.asyncio
async def test_tampered_token(self):
"""Test that tampered tokens are rejected."""
auth = HybridAuthenticator(
keycloak_config=None,
local_jwt_secret=TEST_JWT_SECRET,
environment="development"
)
token = create_local_token()
# Tamper with the token
parts = token.split(".")
parts[1] = parts[1][:-4] + "XXXX" # Modify payload
tampered = ".".join(parts)
with pytest.raises(TokenInvalidError):
await auth.validate_token(tampered)
@pytest.mark.asyncio
async def test_none_algorithm_attack(self):
"""Test protection against 'none' algorithm attack."""
auth = HybridAuthenticator(
keycloak_config=None,
local_jwt_secret=TEST_JWT_SECRET,
environment="development"
)
# Create a token with 'none' algorithm (attack vector)
header = {"alg": "none", "typ": "JWT"}
payload = {"user_id": "attacker", "role": "admin"}
import base64
import json
h = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b"=").decode()
p = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b"=").decode()
malicious_token = f"{h}.{p}."
with pytest.raises(TokenInvalidError):
await auth.validate_token(malicious_token)