""" Tests for Session Middleware Tests the hybrid Valkey + PostgreSQL session storage and RBAC middleware. Usage: cd backend && pytest tests/test_session_middleware.py -v """ import pytest from unittest.mock import AsyncMock, MagicMock, patch from datetime import datetime, timezone, timedelta from session.session_store import Session, SessionStore, UserType from session.rbac_middleware import ( determine_user_type, get_permissions_for_roles, EMPLOYEE_PERMISSIONS, CUSTOMER_PERMISSIONS, ADMIN_PERMISSIONS, EMPLOYEE_ROLES, CUSTOMER_ROLES, ) class TestSession: """Test Session dataclass.""" def test_session_creation(self): """Test creating a session.""" session = Session( session_id="test-session-id", user_id="test-user-id", email="test@example.com", user_type=UserType.EMPLOYEE, roles=["teacher", "klassenlehrer"], permissions=["grades:read", "grades:write"], ) assert session.session_id == "test-session-id" assert session.user_id == "test-user-id" assert session.email == "test@example.com" assert session.user_type == UserType.EMPLOYEE assert "teacher" in session.roles assert "grades:read" in session.permissions def test_session_to_dict(self): """Test converting session to dictionary.""" session = Session( session_id="test-session-id", user_id="test-user-id", email="test@example.com", user_type=UserType.CUSTOMER, roles=["parent"], permissions=["children:read"], ) data = session.to_dict() assert data["session_id"] == "test-session-id" assert data["user_type"] == "customer" assert data["roles"] == ["parent"] def test_session_from_dict(self): """Test creating session from dictionary.""" data = { "session_id": "test-session-id", "user_id": "test-user-id", "email": "test@example.com", "user_type": "employee", "roles": ["admin"], "permissions": ["users:manage"], "created_at": "2024-01-01T00:00:00+00:00", } session = Session.from_dict(data) assert session.session_id == "test-session-id" assert session.user_type == UserType.EMPLOYEE assert session.roles == ["admin"] def test_has_permission(self): """Test permission checking.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, permissions=["grades:read", "grades:write", "attendance:read"], ) assert session.has_permission("grades:read") is True assert session.has_permission("users:manage") is False def test_has_any_permission(self): """Test any permission checking.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, permissions=["grades:read"], ) assert session.has_any_permission(["grades:read", "grades:write"]) is True assert session.has_any_permission(["users:manage", "audit:read"]) is False def test_has_all_permissions(self): """Test all permissions checking.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, permissions=["grades:read", "grades:write", "attendance:read"], ) assert session.has_all_permissions(["grades:read", "grades:write"]) is True assert session.has_all_permissions(["grades:read", "users:manage"]) is False def test_has_role(self): """Test role checking.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, roles=["teacher", "klassenlehrer"], ) assert session.has_role("teacher") is True assert session.has_role("admin") is False def test_is_employee(self): """Test employee check.""" employee_session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, ) customer_session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.CUSTOMER, ) assert employee_session.is_employee() is True assert employee_session.is_customer() is False assert customer_session.is_customer() is True assert customer_session.is_employee() is False class TestUserType: """Test user type determination.""" def test_determine_employee_type(self): """Test determining employee user type from roles.""" assert determine_user_type(["teacher"]) == UserType.EMPLOYEE assert determine_user_type(["admin"]) == UserType.EMPLOYEE assert determine_user_type(["klassenlehrer"]) == UserType.EMPLOYEE assert determine_user_type(["schul_admin"]) == UserType.EMPLOYEE def test_determine_customer_type(self): """Test determining customer user type from roles.""" assert determine_user_type(["parent"]) == UserType.CUSTOMER assert determine_user_type(["student"]) == UserType.CUSTOMER assert determine_user_type(["user"]) == UserType.CUSTOMER def test_employee_takes_precedence(self): """Test that employee roles take precedence.""" # User has both employee and customer roles assert determine_user_type(["teacher", "parent"]) == UserType.EMPLOYEE def test_unknown_role_defaults_to_customer(self): """Test that unknown roles default to customer.""" assert determine_user_type(["unknown_role"]) == UserType.CUSTOMER assert determine_user_type([]) == UserType.CUSTOMER class TestPermissions: """Test permission assignment.""" def test_employee_permissions(self): """Test that employees get employee permissions.""" permissions = get_permissions_for_roles(["teacher"], UserType.EMPLOYEE) assert "grades:read" in permissions assert "grades:write" in permissions assert "attendance:read" in permissions # Should not have customer-only permissions assert "children:read" not in permissions def test_customer_permissions(self): """Test that customers get customer permissions.""" permissions = get_permissions_for_roles(["parent"], UserType.CUSTOMER) assert "children:read" in permissions assert "own_grades:read" in permissions assert "consent:manage" in permissions # Should not have employee permissions assert "grades:write" not in permissions def test_admin_permissions(self): """Test that admins get admin permissions.""" permissions = get_permissions_for_roles(["admin"], UserType.EMPLOYEE) assert "users:manage" in permissions assert "audit:read" in permissions assert "rbac:write" in permissions def test_schul_admin_permissions(self): """Test that school admins get admin permissions.""" permissions = get_permissions_for_roles(["schul_admin"], UserType.EMPLOYEE) assert "users:manage" in permissions assert "settings:write" in permissions class TestEmployeeRoles: """Test employee role constants.""" def test_employee_roles_defined(self): """Test that expected employee roles are defined.""" expected_roles = [ "admin", "schul_admin", "teacher", "klassenlehrer", "fachlehrer", "sekretariat", "data_protection_officer" ] for role in expected_roles: assert role in EMPLOYEE_ROLES, f"Missing employee role: {role}" def test_customer_roles_not_in_employee(self): """Test that customer roles are not in employee roles.""" for role in CUSTOMER_ROLES: assert role not in EMPLOYEE_ROLES class TestCustomerRoles: """Test customer role constants.""" def test_customer_roles_defined(self): """Test that expected customer roles are defined.""" expected_roles = ["parent", "student", "user"] for role in expected_roles: assert role in CUSTOMER_ROLES, f"Missing customer role: {role}" class TestPermissionConstants: """Test permission constants are properly defined.""" def test_employee_permissions_not_empty(self): """Test employee permissions list is not empty.""" assert len(EMPLOYEE_PERMISSIONS) > 0 def test_customer_permissions_not_empty(self): """Test customer permissions list is not empty.""" assert len(CUSTOMER_PERMISSIONS) > 0 def test_admin_permissions_not_empty(self): """Test admin permissions list is not empty.""" assert len(ADMIN_PERMISSIONS) > 0 def test_no_duplicate_permissions(self): """Test there are no duplicate permissions within each category.""" assert len(EMPLOYEE_PERMISSIONS) == len(set(EMPLOYEE_PERMISSIONS)) assert len(CUSTOMER_PERMISSIONS) == len(set(CUSTOMER_PERMISSIONS)) assert len(ADMIN_PERMISSIONS) == len(set(ADMIN_PERMISSIONS)) class TestSessionStore: """Test SessionStore class.""" @pytest.fixture def mock_store(self): """Create a mock session store.""" store = SessionStore( valkey_url="redis://localhost:6379", database_url=None, session_ttl_hours=24, ) # Disable Valkey for tests store._valkey_available = False return store def test_session_ttl_default(self, mock_store): """Test default session TTL is 24 hours.""" assert mock_store.session_ttl == timedelta(hours=24) assert mock_store.session_ttl_seconds == 24 * 3600 def test_valkey_key_format(self, mock_store): """Test Valkey key format.""" key = mock_store._get_valkey_key("test-session-id") assert key == "session:test-session-id" def test_hash_token(self, mock_store): """Test token hashing.""" hash1 = mock_store._hash_token("token1") hash2 = mock_store._hash_token("token1") hash3 = mock_store._hash_token("token2") assert hash1 == hash2 # Same token, same hash assert hash1 != hash3 # Different tokens, different hashes assert len(hash1) == 64 # SHA-256 hex length class TestSessionExpiry: """Test session expiry handling.""" def test_session_created_with_timestamps(self): """Test that session is created with proper timestamps.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, ) assert session.created_at is not None assert session.last_activity_at is not None assert session.created_at <= datetime.now(timezone.utc) class TestFastAPIIntegration: """Test FastAPI middleware integration.""" @pytest.fixture def mock_request(self): """Create a mock FastAPI request.""" request = MagicMock() request.headers = {} request.cookies = {} request.state = MagicMock() return request def test_extract_session_from_bearer(self, mock_request): """Test extracting session ID from Bearer token.""" from session.session_middleware import _extract_session_id_from_request mock_request.headers = {"authorization": "Bearer test-session-id"} mock_request.cookies = {} session_id = _extract_session_id_from_request(mock_request) assert session_id == "test-session-id" def test_extract_session_from_header(self, mock_request): """Test extracting session ID from X-Session-ID header.""" from session.session_middleware import _extract_session_id_from_request mock_request.headers = {"x-session-id": "test-session-id"} mock_request.cookies = {} session_id = _extract_session_id_from_request(mock_request) assert session_id == "test-session-id" def test_extract_session_from_cookie(self, mock_request): """Test extracting session ID from cookie.""" from session.session_middleware import _extract_session_id_from_request mock_request.headers = {} mock_request.cookies = {"session_id": "test-session-id"} # Dict already has .get() method that works correctly session_id = _extract_session_id_from_request(mock_request) assert session_id == "test-session-id" def test_bearer_takes_precedence(self, mock_request): """Test that Bearer token takes precedence over cookie.""" from session.session_middleware import _extract_session_id_from_request mock_request.headers = {"authorization": "Bearer bearer-id"} mock_request.cookies = {"session_id": "cookie-id"} session_id = _extract_session_id_from_request(mock_request) assert session_id == "bearer-id" def test_demo_session_in_development(self): """Test that demo session is returned in development mode.""" from session.session_middleware import _get_demo_session demo = _get_demo_session() assert demo.session_id == "demo-session-id" assert demo.email == "demo@breakpilot.app" assert demo.user_type == UserType.EMPLOYEE assert "admin" in demo.roles assert "grades:read" in demo.permissions class TestRBACMiddlewareFunctions: """Test RBAC middleware helper functions.""" def test_check_resource_ownership_owner(self): """Test resource ownership check for owner.""" from session.rbac_middleware import check_resource_ownership session = Session( session_id="test", user_id="user-123", email="test@test.com", user_type=UserType.EMPLOYEE, roles=["teacher"], ) assert check_resource_ownership(session, "user-123") is True assert check_resource_ownership(session, "user-456") is False def test_check_resource_ownership_admin(self): """Test resource ownership check for admin.""" from session.rbac_middleware import check_resource_ownership session = Session( session_id="test", user_id="admin-123", email="admin@test.com", user_type=UserType.EMPLOYEE, roles=["admin"], ) # Admin can access other user's resource with allow_admin=True assert check_resource_ownership(session, "user-456", allow_admin=True) is True # Admin cannot access without allow_admin assert check_resource_ownership(session, "user-456", allow_admin=False) is False def test_check_resource_ownership_super_admin(self): """Test resource ownership check for super admin.""" from session.rbac_middleware import check_resource_ownership session = Session( session_id="test", user_id="super-123", email="super@test.com", user_type=UserType.EMPLOYEE, roles=["super_admin"], ) assert check_resource_ownership(session, "user-456", allow_admin=True) is True class TestSessionSerialization: """Test session serialization and deserialization.""" def test_round_trip_serialization(self): """Test that session survives serialization round-trip.""" original = Session( session_id="test-session-id", user_id="test-user-id", email="test@example.com", user_type=UserType.EMPLOYEE, roles=["teacher", "klassenlehrer"], permissions=["grades:read", "grades:write"], tenant_id="tenant-123", ip_address="192.168.1.1", user_agent="Mozilla/5.0", ) # Serialize and deserialize data = original.to_dict() restored = Session.from_dict(data) assert restored.session_id == original.session_id assert restored.user_id == original.user_id assert restored.email == original.email assert restored.user_type == original.user_type assert restored.roles == original.roles assert restored.permissions == original.permissions assert restored.tenant_id == original.tenant_id assert restored.ip_address == original.ip_address assert restored.user_agent == original.user_agent def test_json_serialization(self): """Test JSON serialization of session.""" import json session = Session( session_id="test-session-id", user_id="test-user-id", email="test@example.com", user_type=UserType.CUSTOMER, roles=["parent"], permissions=["children:read"], ) # Should not raise json_str = json.dumps(session.to_dict()) assert "test-session-id" in json_str assert "customer" in json_str def test_from_dict_with_missing_optional_fields(self): """Test creating session from dict with missing optional fields.""" data = { "session_id": "test", "user_id": "user-123", "email": "test@test.com", "user_type": "employee", } session = Session.from_dict(data) assert session.session_id == "test" assert session.roles == [] assert session.permissions == [] assert session.tenant_id is None class TestUserTypeEnum: """Test UserType enum.""" def test_employee_value(self): """Test employee enum value.""" assert UserType.EMPLOYEE.value == "employee" assert UserType.EMPLOYEE == "employee" def test_customer_value(self): """Test customer enum value.""" assert UserType.CUSTOMER.value == "customer" assert UserType.CUSTOMER == "customer" def test_from_string(self): """Test creating UserType from string.""" assert UserType("employee") == UserType.EMPLOYEE assert UserType("customer") == UserType.CUSTOMER class TestSessionStoreConfiguration: """Test SessionStore configuration.""" def test_default_ttl(self): """Test default TTL configuration.""" store = SessionStore( valkey_url=None, database_url=None, ) assert store.session_ttl == timedelta(hours=24) def test_custom_ttl(self): """Test custom TTL configuration.""" store = SessionStore( valkey_url=None, database_url=None, session_ttl_hours=12, ) assert store.session_ttl == timedelta(hours=12) assert store.session_ttl_seconds == 12 * 3600 def test_valkey_key_generation(self): """Test Valkey key generation.""" store = SessionStore() key1 = store._get_valkey_key("session-123") key2 = store._get_valkey_key("session-456") assert key1 == "session:session-123" assert key2 == "session:session-456" assert key1 != key2 class TestPermissionMatching: """Test permission pattern matching.""" def test_exact_permission_match(self): """Test exact permission matching.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, permissions=["grades:read"], ) assert session.has_permission("grades:read") is True assert session.has_permission("grades:write") is False def test_multiple_permissions(self): """Test session with multiple permissions.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.EMPLOYEE, permissions=[ "grades:read", "grades:write", "attendance:read", "attendance:write", "students:read", ], ) assert session.has_permission("grades:read") is True assert session.has_permission("attendance:write") is True assert session.has_permission("users:manage") is False def test_empty_permissions(self): """Test session with no permissions.""" session = Session( session_id="test", user_id="test", email="test@test.com", user_type=UserType.CUSTOMER, permissions=[], ) assert session.has_permission("any:permission") is False assert session.has_any_permission(["grades:read"]) is False assert session.has_all_permissions([]) is True # Empty list = vacuously true if __name__ == "__main__": pytest.main([__file__, "-v"])