Dean Wampler provides a distilled overview of Ray, an open source system for scaling Python systems from single machines to large clusters. If you are interested in additional insights, register for the upcoming Ray Summit.
This post is for people making technology decisions, by which I mean data science team leads, architects, dev team leads, even managers who are involved in strategic decisions about the technology used in their organizations. If your team has started using Ray and you’re wondering what it is, this post is for you. If you’re wondering if Ray should be part of your technical strategy for Python-based applications, especially ML and AI, this post is for you. If you want a more in-depth technical introduction to Ray, see this post on the Ray project blog.
Ray: Scaling Python Applications
Ray is an open-source system for scaling Python applications from single machines to large clusters. Its design is driven by the unique challenges of next-generation ML and AI systems, but its features make Ray an excellent choice for all Python-based applications that need to scale across a cluster, especially if they have distributed state. Ray also provides a minimally-invasive and intuitive API, so you get these benefits without a lot of effort and expertise in distributed systems programming.
Developers indicate in their code which parts should be distributed across a cluster and run asynchronously, then Ray handles the distribution for you. If run locally, the application can use all the cores in the machine (you can also specify a limit). When one machine isn’t enough, it’s straightforward to run Ray on a cluster of machines and have the application leverage the cluster. The only code change required at this point is the options you pass when initializing Ray in the application.
ML libraries that use Ray, such as RLlib for reinforcement learning (RL), Tune for hyper parameter tuning, and Serve for model serving (experimental), are implemented with Ray internally for its scalable, distributed computing and state management benefits, while providing a domain-specific API for the purposes they serve.
Motivations for Ray: Training a Reinforcement Learning (RL) Model
To understand the motivations for Ray, consider the example of training a reinforcement learning (RL) model. RL is the type of machine learning that was used recently to beat the world’s best Go players and achieve expert game play for Atari and similar games.
Scalable RL requires many capabilities that Ray was designed to provide:
- Highly parallelized and efficient execution of tasks (millions or more) – When training models, we repeat the same calculations over and over again to find the best model approach (“hyper parameters”) and, once the best structure is chosen, to find the model parameters that work best. We also require proper sequencing of tasks when they have dependencies on the results of other tasks.
- Automatic Fault Tolerance – With all these tasks, a percentage of them may fail for a variety of reasons so we need a system that supports monitoring of tasks and recovery from failures.
- Diverse computing patterns – Model training involves a lot of computational mathematics. Most RL model training, in particular, also requires efficient execution of a simulator—for example, a game engine we want to beat or a model representing real-world activity like autonomous driving. The computing patterns used (algorithms, memory access patterns, etc.) are more typical of general computing systems, which can be very different from the computing patterns common in data systems where high-throughput transformations and aggregations of records are the norm. Another difference is the dynamic nature of these computations. Think of how a game player (or simulator) adapts to the evolving state of a game, improving strategy, trying new tactics, etc. These diverse requirements are seen in a variety of newer ML-based systems like robotics, autonomous vehicles, computer vision systems, automatic dialog systems, etc.
- Distributed state management– With RL, the current model parameters and the simulator state need to be tracked between training iterations. This state becomes distributed because the tasks are distributed. Proper state management also requires proper sequencing of stateful operations..
Of course, other ML/AI systems require some or all of these capabilities. So do general Python applications operating at scale.
The Gist of Ray
Ray libraries like RLlib, Tune, and Serve, use Ray but mostly hide it from users. However, using the Ray API itself is straightforward. Suppose you have an “expensive” function to run repeatedly over data records. If it’s stateless, meaning it doesn’t maintain any state between invocations, and you want to invoke it in parallel, all you need to do is turn the function into a Ray task by adding the
@ray.remote annotation as follows:
@ray.remote def slow(record): new_record = expensive_process(record) return new_record
Then initialize Ray and call it over your data set as follows:
ray.init() # Arguments can specify the cluster location, etc. futures = [slow.remote(r) for r in records]
Notice how we invoke the function
slow.remote instead. Each call returns immediately with a future. We have a collection of them. If we’re running in a cluster, Ray manages the resources available and places this task on a node with the resources necessary to run the function.
We can now ask Ray to return each result as it finishes using
ray.wait. Here’s one idiomatic way to do this:
while len(futures) > 0: finished, rest = ray.wait(futures) # Do something with “finished”, which has 1 value: value = ray.get(finished) # Get the value from the future print(value) futures = rest
As written, we’ll wait until one of the invocations of slow completes, at which point
ray.wait will return two lists. The first will have a single entry, the id of the future for the completed slow invocation. The rest of the list of futures that we passed in will be in the second list—
rest. We call
ray.get to retrieve the value of the finished future. (Note: that’s a blocking call, but it returns immediately because we already know it’s done.) We finish the loop by resetting our list to be what’s remaining, then repeat until all remote invocations have completed and the results have been processed.
You can also pass arguments to
ray.wait to return more than one at a time and to set a timeout. If you aren’t waiting on a collection of concurrent tasks, you can also wait on a specific future by calling
Without arguments, ray.init assumes local execution and uses all available CPU cores. You can provide arguments to specify a cluster to run on, the number of CPU or GPU cores to use, etc.
Suppose one remote function has passed the future from another remote function invocation. Ray will automatically sequence such dependencies so they are evaluated in the required order. You don’t have to do anything yourself to handle this situation.
Suppose you have a stateful computation to do. When we used
ray.get above, we were actually retrieving the values from a distributed object store. You can explicitly put objects there yourself if you want with
ray.put which returns an id you can pass later to
ray.get to retrieve it again.
Handling Stateful Computation with an Actor Model
Ray supports a more intuitive and flexible way to manage setting and retrieving state with an actor model. It uses regular Python classes that are converted into remote actors with the same
@ray.remote annotation. For simplicity, suppose you need to count the number of times that slow is called. Here is a class to do just that:
@ray.remote class CountedSlows: def __init__(self, initial_count = 0): self.count = initial_count def slow(self, record): self.count += 1 new_record = expensive_process(record) return new_record def get_count(self): return self.count
Except for the annotation, this looks like a normal Python class declaration, although normally you wouldn’t define the
get_count method just to retrieve the count. I’ll come back to this shortly.
Now use it in a similar way. Note how an instance of the class is constructed and how methods on the instance are invoked, using
remote as before:
cs = CountedSlows.remote() # Note how actor construction works futures = [cs.slow.remote(r) for r in records] while len(futures) > 0: finished, rest = ray.wait(futures) value = ray.get(finished) print(value) futures = rest count_future_id = cs.get_count.remote() ray.get(count_future_id)
The last line should print the number that equals the size of the original collection. Note that I called the method
get_count to retrieve the value of the
count attribute. At this time, Ray doesn’t support retrieving instance attributes like
count directly, so adding the method to retrieve it is the one required difference when compared to a regular Python class.
Ray Unifies Tasks and Actors
In both of the above cases, Ray keeps track of where the tasks and actors are located in the cluster, eliminating the need to explicitly know and manage such locations in user code. Mutation of state inside actors is handled in a thread-safe way, without the need for explicit concurrency primitives. Hence, Ray provides intuitive, distributed state management for applications, which means that Ray is an excellent platform for implementing stateful serverless applications in general. Furthermore, when communicating between tasks and actors on the same machine, the state is transparently managed through shared memory, with zero-copy serialization between the actors and tasks, for optimal performance.
Note: Let me emphasize an important benefit Ray is providing here. Without Ray, when you need to scale out an application over a cluster, you have to decide how many instances to create, where to place them in the cluster (or use a system like Kubernetes), how to manage their life cycles, how they will communicate information and coordinate between themselves, etc., etc. Ray does all this for you with minimal effort on your part. You mostly just write normal Python code. It’s a powerful tool for simplifying the design and management of your microservice architecture.
What if you’re already using other concurrency APIs like multiprocessing, asyncio, or joblib? While they work well for scaling on a single machine, they don’t provide scaling to a cluster. Ray recently introduced experimental implementations of these APIs that allow your applications to scale to a cluster. The only change required in your code is the import statement. For example, if you are using
multiprocessing.Pool this is the usual import statement:
from multiprocessing.pool import Pool
To use the Ray implementation, use this statement instead:
from ray.experimental.multiprocessing.pool import Pool
That’s all it takes.
What about Dask, which appears to provide many of the same capabilities as Ray? Dask is a good choice if you want distributed collections, like numpy arrays and Pandas DataFrames. (A research project called Modin that uses Ray will eventually meet this need.) Ray is designed for more general scenarios where distributed state management is required and where heterogeneous task execution must be very efficient at massive scale, like we need for reinforcement learning.
We’ve seen how Ray’s abstractions and features make it a straightforward tool to use, while providing powerful distributed computing and state-management capabilities. Although the design of Ray was motivated by the specific needs of high-performance, highly demanding ML/AI applications, it is broadly applicable, even offering a new way to approach microservice-based architectures.
I hope you found this brief explanation of Ray intriguing. Please give it a try and let me know what you think! Send to: firstname.lastname@example.org
To Learn More
For more information about Ray, take a look at the following:
- Ray Summit in San Francisco, May 27–28, 2020. Hear about case studies, research projects, and deep dives into Ray, plus morning keynotes from leaders in the data science and AI communities!
- Ray website is the starting point for all things Ray.
- Several notebook-based Ray tutorials let you try out Ray.
- The Ray GitHub page is where you’ll find all the Ray source code.
- The Ray documentation explains everything: landing page, installation instructions.
- Direct questions about Ray addressed to the Ray Slack workspace or the ray-dev Google Group.
- The Ray account on Twitter.
- Some Ray projects:
- RLlib: Scalable reinforcement learning with Ray (and this RLlib research paper)
- Tune: Efficient hyper parameter tuning with Ray
- Serve: Flexible, scalable model serving with Ray
- Modin: Research project on speeding up Pandas with Ray
- FLOW: a computational framework using reinforcement learning for traffic control modeling
- Anyscale: the company behind Ray
- For even more technical details: