""" Tests for Keycloak Authentication Module Tests cover: - Local JWT validation - Keycloak token detection - HybridAuthenticator token routing - FastAPI dependency integration """ import pytest import jwt import os from datetime import datetime, timezone, timedelta from unittest.mock import AsyncMock, MagicMock, patch # Import the auth module import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from auth.keycloak_auth import ( KeycloakConfig, KeycloakUser, KeycloakAuthenticator, HybridAuthenticator, TokenExpiredError, TokenInvalidError, KeycloakConfigError, get_keycloak_config_from_env, ) # ============================================= # Test Data # ============================================= TEST_JWT_SECRET = "test-secret-key-32-chars-min-here" TEST_USER_ID = "10000000-0000-0000-0000-000000000001" TEST_EMAIL = "lehrer@test.de" def create_local_token( user_id: str = TEST_USER_ID, email: str = TEST_EMAIL, role: str = "teacher", exp_hours: int = 1 ) -> str: """Create a local JWT token for testing.""" payload = { "user_id": user_id, "email": email, "role": role, "iss": "breakpilot", "iat": datetime.now(timezone.utc), "exp": datetime.now(timezone.utc) + timedelta(hours=exp_hours), } return jwt.encode(payload, TEST_JWT_SECRET, algorithm="HS256") def create_expired_token() -> str: """Create an expired JWT token.""" payload = { "user_id": TEST_USER_ID, "email": TEST_EMAIL, "role": "teacher", "iss": "breakpilot", "iat": datetime.now(timezone.utc) - timedelta(hours=2), "exp": datetime.now(timezone.utc) - timedelta(hours=1), } return jwt.encode(payload, TEST_JWT_SECRET, algorithm="HS256") # ============================================= # KeycloakConfig Tests # ============================================= class TestKeycloakConfig: """Tests for KeycloakConfig dataclass.""" def test_config_urls(self): """Test URL generation from config.""" config = KeycloakConfig( server_url="https://keycloak.example.com", realm="test-realm", client_id="test-client" ) assert config.issuer_url == "https://keycloak.example.com/realms/test-realm" assert config.jwks_url == "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/certs" assert config.token_url == "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/token" def test_config_defaults(self): """Test default values.""" config = KeycloakConfig( server_url="https://kc.example.com", realm="breakpilot", client_id="backend" ) assert config.client_secret is None assert config.verify_ssl is True # ============================================= # KeycloakUser Tests # ============================================= class TestKeycloakUser: """Tests for KeycloakUser dataclass.""" def test_has_realm_role(self): """Test realm role checking.""" user = KeycloakUser( user_id="user-123", email="test@example.com", email_verified=True, name="Test User", given_name="Test", family_name="User", realm_roles=["teacher", "admin"], client_roles={}, groups=[], tenant_id=None, raw_claims={} ) assert user.has_realm_role("teacher") is True assert user.has_realm_role("admin") is True assert user.has_realm_role("superadmin") is False def test_has_client_role(self): """Test client role checking.""" user = KeycloakUser( user_id="user-123", email="test@example.com", email_verified=True, name="Test User", given_name=None, family_name=None, realm_roles=[], client_roles={"backend": ["editor", "viewer"]}, groups=[], tenant_id=None, raw_claims={} ) assert user.has_client_role("backend", "editor") is True assert user.has_client_role("backend", "admin") is False assert user.has_client_role("frontend", "viewer") is False def test_is_admin(self): """Test admin detection.""" admin_user = KeycloakUser( user_id="admin-123", email="admin@example.com", email_verified=True, name=None, given_name=None, family_name=None, realm_roles=["admin"], client_roles={}, groups=[], tenant_id=None, raw_claims={} ) regular_user = KeycloakUser( user_id="user-123", email="user@example.com", email_verified=True, name=None, given_name=None, family_name=None, realm_roles=["teacher"], client_roles={}, groups=[], tenant_id=None, raw_claims={} ) assert admin_user.is_admin() is True assert regular_user.is_admin() is False def test_is_teacher(self): """Test teacher detection with German role name.""" teacher_de = KeycloakUser( user_id="t1", email="t@test.de", email_verified=True, name=None, given_name=None, family_name=None, realm_roles=["lehrer"], # German role name client_roles={}, groups=[], tenant_id=None, raw_claims={} ) teacher_en = KeycloakUser( user_id="t2", email="t2@test.de", email_verified=True, name=None, given_name=None, family_name=None, realm_roles=["teacher"], # English role name client_roles={}, groups=[], tenant_id=None, raw_claims={} ) assert teacher_de.is_teacher() is True assert teacher_en.is_teacher() is True # ============================================= # HybridAuthenticator Tests (Local JWT) # ============================================= class TestHybridAuthenticatorLocalJWT: """Tests for HybridAuthenticator with local JWT.""" @pytest.fixture def authenticator(self): """Create authenticator without Keycloak.""" return HybridAuthenticator( keycloak_config=None, local_jwt_secret=TEST_JWT_SECRET, environment="development" ) @pytest.mark.asyncio async def test_valid_local_token(self, authenticator): """Test validation of valid local token.""" token = create_local_token() user = await authenticator.validate_token(token) assert user["user_id"] == TEST_USER_ID assert user["email"] == TEST_EMAIL assert user["role"] == "teacher" assert user["auth_method"] == "local_jwt" @pytest.mark.asyncio async def test_expired_token(self, authenticator): """Test that expired tokens are rejected.""" token = create_expired_token() with pytest.raises(TokenExpiredError): await authenticator.validate_token(token) @pytest.mark.asyncio async def test_invalid_token(self, authenticator): """Test that invalid tokens are rejected.""" with pytest.raises(TokenInvalidError): await authenticator.validate_token("invalid.token.here") @pytest.mark.asyncio async def test_empty_token(self, authenticator): """Test that empty tokens are rejected.""" with pytest.raises(TokenInvalidError): await authenticator.validate_token("") @pytest.mark.asyncio async def test_wrong_secret(self): """Test that tokens signed with wrong secret are rejected.""" auth = HybridAuthenticator( keycloak_config=None, local_jwt_secret="different-secret-key-here-32chars", environment="development" ) token = create_local_token() # Signed with TEST_JWT_SECRET with pytest.raises(TokenInvalidError): await auth.validate_token(token) # ============================================= # HybridAuthenticator Tests (Role Mapping) # ============================================= class TestRoleMapping: """Tests for role mapping in HybridAuthenticator.""" @pytest.fixture def authenticator(self): return HybridAuthenticator( keycloak_config=None, local_jwt_secret=TEST_JWT_SECRET, environment="development" ) @pytest.mark.asyncio async def test_admin_role_mapping(self, authenticator): """Test that admin role is preserved.""" token = create_local_token(role="admin") user = await authenticator.validate_token(token) assert user["role"] == "admin" assert "admin" in user["realm_roles"] @pytest.mark.asyncio async def test_teacher_role_mapping(self, authenticator): """Test that teacher role is preserved.""" token = create_local_token(role="teacher") user = await authenticator.validate_token(token) assert user["role"] == "teacher" @pytest.mark.asyncio async def test_user_role_default(self, authenticator): """Test that unknown roles default to user.""" token = create_local_token(role="custom_role") user = await authenticator.validate_token(token) # Custom role should be preserved assert user["role"] == "custom_role" # ============================================= # Environment Configuration Tests # ============================================= class TestEnvironmentConfiguration: """Tests for environment-based configuration.""" def test_keycloak_config_from_env_missing(self): """Test that missing env vars return None.""" # Clear any existing env vars for key in ["KEYCLOAK_SERVER_URL", "KEYCLOAK_REALM", "KEYCLOAK_CLIENT_ID"]: os.environ.pop(key, None) config = get_keycloak_config_from_env() assert config is None def test_keycloak_config_from_env_complete(self): """Test that complete env vars create config.""" os.environ["KEYCLOAK_SERVER_URL"] = "https://kc.test.com" os.environ["KEYCLOAK_REALM"] = "test" os.environ["KEYCLOAK_CLIENT_ID"] = "test-client" try: config = get_keycloak_config_from_env() assert config is not None assert config.server_url == "https://kc.test.com" assert config.realm == "test" assert config.client_id == "test-client" finally: # Cleanup os.environ.pop("KEYCLOAK_SERVER_URL", None) os.environ.pop("KEYCLOAK_REALM", None) os.environ.pop("KEYCLOAK_CLIENT_ID", None) # ============================================= # Token Detection Tests # ============================================= class TestTokenDetection: """Tests for automatic token type detection.""" @pytest.mark.asyncio async def test_local_token_detection(self): """Test that local tokens are correctly detected.""" auth = HybridAuthenticator( keycloak_config=None, local_jwt_secret=TEST_JWT_SECRET, environment="development" ) token = create_local_token() user = await auth.validate_token(token) assert user["auth_method"] == "local_jwt" @pytest.mark.asyncio async def test_keycloak_token_detection_without_keycloak(self): """Test that Keycloak tokens fail when Keycloak is not configured.""" auth = HybridAuthenticator( keycloak_config=None, local_jwt_secret=TEST_JWT_SECRET, environment="development" ) # Create a fake Keycloak-style token payload = { "sub": "user-123", "email": "test@test.com", "iss": "https://keycloak.example.com/realms/test", "iat": datetime.now(timezone.utc), "exp": datetime.now(timezone.utc) + timedelta(hours=1), } fake_kc_token = jwt.encode(payload, "different-key", algorithm="HS256") # Should fail because local JWT validation will fail with pytest.raises(TokenInvalidError): await auth.validate_token(fake_kc_token) # ============================================= # Integration Tests (with Mock FastAPI Request) # ============================================= class TestFastAPIIntegration: """Tests for FastAPI dependency integration.""" @pytest.mark.asyncio async def test_get_current_user_valid_token(self): """Test get_current_user with valid token.""" from auth.keycloak_auth import get_current_user from unittest.mock import AsyncMock # Create a mock request mock_request = MagicMock() token = create_local_token() mock_request.headers.get.return_value = f"Bearer {token}" # Patch environment with patch.dict(os.environ, { "JWT_SECRET": TEST_JWT_SECRET, "ENVIRONMENT": "development" }): # Reset the global authenticator import auth.keycloak_auth as auth_module auth_module._authenticator = None user = await get_current_user(mock_request) assert user["user_id"] == TEST_USER_ID assert user["email"] == TEST_EMAIL @pytest.mark.asyncio async def test_get_current_user_development_bypass(self): """Test that development mode allows requests without token.""" from auth.keycloak_auth import get_current_user from fastapi import HTTPException mock_request = MagicMock() mock_request.headers.get.return_value = "" # No auth header with patch.dict(os.environ, { "JWT_SECRET": TEST_JWT_SECRET, "ENVIRONMENT": "development" }): import auth.keycloak_auth as auth_module auth_module._authenticator = None # In development, should return demo user user = await get_current_user(mock_request) assert user["auth_method"] == "development_bypass" # ============================================= # Security Tests # ============================================= class TestSecurityEdgeCases: """Tests for security edge cases.""" @pytest.mark.asyncio async def test_no_jwt_secret_in_production(self): """Test that missing JWT_SECRET raises error in production.""" with patch.dict(os.environ, {"ENVIRONMENT": "production"}, clear=True): with pytest.raises(KeycloakConfigError): from auth.keycloak_auth import get_authenticator # This should fail because JWT_SECRET is required in production import auth.keycloak_auth as auth_module auth_module._authenticator = None get_authenticator() @pytest.mark.asyncio async def test_tampered_token(self): """Test that tampered tokens are rejected.""" auth = HybridAuthenticator( keycloak_config=None, local_jwt_secret=TEST_JWT_SECRET, environment="development" ) token = create_local_token() # Tamper with the token parts = token.split(".") parts[1] = parts[1][:-4] + "XXXX" # Modify payload tampered = ".".join(parts) with pytest.raises(TokenInvalidError): await auth.validate_token(tampered) @pytest.mark.asyncio async def test_none_algorithm_attack(self): """Test protection against 'none' algorithm attack.""" auth = HybridAuthenticator( keycloak_config=None, local_jwt_secret=TEST_JWT_SECRET, environment="development" ) # Create a token with 'none' algorithm (attack vector) header = {"alg": "none", "typ": "JWT"} payload = {"user_id": "attacker", "role": "admin"} import base64 import json h = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b"=").decode() p = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b"=").decode() malicious_token = f"{h}.{p}." with pytest.raises(TokenInvalidError): await auth.validate_token(malicious_token)