Ray for Data Science: Distributed Python tasks at scale

by on April 6, 2021

Editors Note: This article was originally posted on Patterson Consulting’s blog and can be found at http://www.pattersonconsultingtn.com/blog/blog_index.html and has been republished with permission.

Why Do We Need Ray?

Training machine learning models, especially neural networks, is compute-intensive. However, much of the load can be partitioned into smaller tasks and distributed over a large cluster. Several years ago, artificial intelligence researchers at the University of California, Berkeley, needed an easy way to do this for the algorithms they were researching and developing. They were expert Python programmers, but they didn’t want to spend lots of time and energy using most toolkits available. They didn’t need a lot of sophistication or fine-grained controls. They just needed the simplest possible API that made good default choices behind the scenes to scale the work over a cluster, leveraging available resources, restarting failed tasks, and managing the computation results for easy consumption.

Out of this need emerged Ray, an open-source system for scaling Python (and now Java) applications from single machines to large clusters. What appeals to me about Ray is that the API is simple, concise, and intuitive to use, especially for people without distributed computing experience, yet it is very flexible for a wide class of problems.

You can certainly find more sophisticated toolkits with greater extensibility and flexibility, but they always require more effort to learn and use. That may be the right choice for many projects, but when you need an easy-to-use framework and you don’t need absolute control over how it works, say for example in order to achieve the maximum possible performance and efficiency, then Ray is ideal.

Ray Tasks

Let’s look at a simple example that illustrates how easy it is to use Ray for “low-level” distribution over a cluster. Then I’ll briefly mention several higher-level toolkits for machine learning that leverage Ray. The Ray API documentation and Ray website provide a lot more information than I can cover here.

Suppose we want to implement a simple DNS server. We could start as follows. If you’re playing along at home, copy and paste this code into the Python interpreter:

import time                       # We'll use sleep to simulate long operations.

addresses = {                     # Some absolutely correct addresses.
  "google.com":    "4.3.2.1",
  "microsoft.com": "4.3.2.2",
  "amazon.com":    "4.3.2.3",
}

def lookup(name):                 # A function to return an address for a name.
  time.sleep(0.5)                 # It takes a long time to run!
  return name, addresses[name]    # Also return the name with the address.

start = time.time()               # How long will this take?
for name in addresses:            # Use the keys in addresses...
  n, address = lookup(name)       # ... but go through lookup for the values.
  delta = time.time() - start
  print(f"{name}:\t {address}   ({delta:.3f} seconds)")

# The results:
# google.com:      4.3.2.1   (0.504 seconds)
# microsoft.com:   4.3.2.2   (1.008 seconds)
# amazon.com:      4.3.2.3   (1.511 seconds)

The comments at the end show that it takes about 0.5 seconds per query. Let’s reduce this overhead with Ray.

First, you’ll need to install Ray using pip install ray. That’s all you need to do for this exercise, where we’ll just run Ray in a single process, but it will leverage as many threads across our CPU cores as we want. If you wanted to run Ray in a cluster, there are setup steps you would do as described in the documentation.

Restart the Python interpreter after pip installing Ray. Now we can create a Ray task to run these queries in parallel.

import time
import ray                        # Import the Ray library

ray.init()                        # Initialize Ray in this application

addresses = {
  "google.com":    "4.3.2.1",
  "microsoft.com": "4.3.2.2",
  "amazon.com":    "4.3.2.3",
}

@ray.remote                       # Decorator turns functions into Ray Tasks.
def ray_lookup(name):             # Otherwise, it's identical to lookup(). 
  time.sleep(0.5)
  return name, addresses[name]

start = time.time()
for name in addresses:
  reference = ray_lookup.remote(name)   # Start async. task with foo.remote().
  n, address = ray.get(reference)       # Block to get the result.
  delta = time.time() - start
  print(f"{name}:\t {address}   ({delta:.3f} seconds)")

# google.com:      4.3.2.1   (0.520 seconds)
# microsoft.com:   4.3.2.2   (1.024 seconds)
# amazon.com:      4.3.2.3   (1.530 seconds)

We didn’t improve our results, but we’ll fix that in a moment. First, let’s discuss what’s new.

You import the Ray library and call ray.init() to initialize it in your application. You could pass arguments to ray.init() to connect to a running cluster, configure some behaviors, etc.

When you decorate a Python function with @ray.remote, you convert it into a Ray task. When invoked, it will run asynchronously somewhere in your cluster, or just the CPU cores on our laptop, in our case. Already, this gives the ability to easily break the one-thread limitation of Python itself. All the cores are belong to us!

Notice how the loop has changed. When you invoke a task, you append .remote(...) to the function. A benefit of this required change is documentation for the reader; it’s clear that a Ray task is being invoked.

Tasks immediately return a reference that can be used to retrieve the result of the task, once it has finished. We do this immediately by calling ray.get(reference), which blocks until the task is finished.

That’s why we didn’t improve our performance. We waited for each task to finish, one at a time. However, this is easy to fix. Instead of calling ray.get(reference) immediately, we should “fire off” all the tasks, then wait on the results all at once:

start = time.time()
references = [ray_lookup.remote(name) for name in addresses]                 

ns_addresses = ray.get(references)           # Wait on all of them together.
for name, address in ns_addresses:
  delta = time.time() - start
  print(f"{name}:\t {address}   ({delta:.3f} seconds)")

# google.com:      4.3.2.1   (0.513 seconds)
# microsoft.com:   4.3.2.2   (0.513 seconds)
# amazon.com:      4.3.2.3   (0.513 seconds)

Much better! It still takes at least 0.5 seconds, because no one task can finish faster than that. The call to ray.get(array) still blocks until all of them finish. There is also a ray.wait() API call that can be used to avoid blocking and to process results as they become available. See the documentation for it and the Ray tutorials for details.

Ray Actors

A big challenge in distributed programming is managing distributed state. Ray addresses this problem with the concept of Actors. If you have ever worked with the Erlang language or the Akka system for the JVM, you have used actors.

Basically, an actor is implemented with a Python class and any state is held by fields in the class instances. The Ray actor encapsulation of these instances ensures thread safety when many Ray tasks or other actors are simultaneously interacting with the actor. Our actors are like “mini servers”.

Let’s use an actor to hold the DNS data. Until now, we’ve had a bottleneck trying to access the single dictionary and it was “stuck” in our driver ipython process. With actors, we can spin up as many of them as we want over a cluster and distribute the load to them. There are even facilities in Ray where you can query Ray for the running actors. We won’t look at those two features now, we’ll just use a single actor.

First, here’s the DNSServer Ray actor:

import ray

@ray.remote
class DNSServer(object):
    def __init__(self, initial_addresses):
        # A dictionary of names to IP addresses.
        self.addresses = initial_addresses

    def lookup(self, name):
        return name, self.addresses[name]

    def get_addresses(self):
        return self.addresses

    def update_address(self, name, ip):
        self.addresses[name] = ip

Except for the familiar @ray.remote decorator, this looks like a regular Python class, although we also added a get_addresses method. In a normal Python object, you could just read fields like addresses. Ray actors require getter methods to read fields.

Now let’s use it. For convenience, I’ll show the whole Python script, including some things we already defined above. Let’s start with the setup of the actor:

import ray
import time
from dns_server import DNSServer

#ray.init()                       # Uncomment if this is a new ipython session.

server = DNSServer.remote({       # Construct actor instances with .remote
  "google.com":    "4.3.2.1",
  "microsoft.com": "4.3.2.2",
  "amazon.com":    "4.3.2.3",
})
server.update_address.remote("twitter.com", "4.3.2.4")
server.update_address.remote("instagram.com", "4.3.2.5")

ref = server.get_addresses.remote()
names_addresses = ray.get(ref)
for name, address in names_addresses.items():
  print(f"{name}:\t {address}")

# google.com:      4.3.2.1
# microsoft.com:   4.3.2.2
# amazon.com:      4.3.2.3
# twitter.com:     4.3.2.4
# instagram.com:   4.3.2.5

Note that instances are constructed and methods are invoked with remote, just like we did for tasks. Now we can use the actor:

@ray.remote                 
def ray_lookup(name):             # Now use the server.
  time.sleep(0.5)
  return name, server.lookup.remote(name)


start = time.time()
refs = [ray_lookup.remote(name) for name in names_addresses.keys()]

names_refs2 = ray.get(refs)
for name, ref2 in names_refs2:
  delta = time.time() - start
  name2, address = ray.get(ref2)
  print(f"{name}:\t {address}   ({delta:.3f} seconds)")

# google.com:      4.3.2.1   (0.512 seconds)
# microsoft.com:   4.3.2.2   (0.512 seconds)
# amazon.com:      4.3.2.3   (0.516 seconds)
# twitter.com:     4.3.2.4   (0.517 seconds)
# instagram.com:   4.3.2.5   (0.519 seconds)

We don’t really need to go through ray_lookup to call the server, but we’ll do it anyway. There are two levels of references that result. First, ray_lookup returns a name and a reference to the IP address that the server returns. Therefore names_refs is an array of name-reference pairs. Then, when we call ray.get(ref2) on each reference, we get back another copy of the name and the address. It’s worth printing out what each call to ray.get returns, to understand what’s happening.

If you write a lot of Python code and you occasionally find yourself needing to parallelize work to make it faster, whether on your laptop or in a cluster, I hope you can appreciate how concise Ray is for this purpose. You can even manage state with actors. As the last example showed, you have to carefully manage the “indirection” through references so it doesn’t become too complex, but in most real-world applications, this is not that hard to do.

The Ray API is quite general purpose and flexible for all kinds of applications. While it emerged in the ML/AI world, it is not restricted to data science applications at all.

Ray for Machine Learning

However, because Ray emerged in the ML/AI research community, most of the available libraries that use Ray are ML and AI focused. I’ll discuss a few now.

Ray RLlib

Reinforcement Learning became a hot topic in ML when Deep Mind used it to achieve expert-level game play, first in Atari games, and then to beat the world’s best Go players. Ray RLlib is one of the world’s leading libraries for RL, with performance on par with custom implementations of RL algorithms, yet it is very general purpose to support a wide class of algorithms for RL and environments, like games, robotics, etc., for which you might train an RL system. Here is a quick example using the command-line tool for RLlib, although you could also use the Python API. First, you need to install RLlib, pip install ‘ray[rllib]’. Then you can run the following command. The $ is the command prompt (*NIX or Windows). The command wraps across two lines. There is a lot of output. I’ll show the first and last “status” messages, which I edited to fit the page:

$ rllib train --run PPO --env CartPole-v1 --stop='{"training_iteration": 20}' --checkpoint-freq 10 --checkpoint-at-end
…
== Status ==
Memory usage on this node: 19.4/32.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 3/8 CPUs, 0/0 GPUs, 0.0/11.52 GiB heap, 0.0/3.96 GiB objects
Result logdir: /Users/deanwampler/ray_results/default
Number of trials: 1 (1 RUNNING)
+--------------------+------------+-------+------+--------------+-------+----------+
| Trial name         | status     | loc   | iter |     time (s) |    ts |   reward |
+--------------------+------------+-------+------+--------------+-------+----------+
| PPO_CartPole-v1_…. | RUNNING    | ip:N  |    1 |      7.39127 |  4000 |  22.2011 |
+--------------------+------------+-------+------+--------------+-------+----------+


…
== Status ==
Memory usage on this node: 19.3/32.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/8 CPUs, 0/0 GPUs, 0.0/11.52 GiB heap, 0.0/3.96 GiB objects
Result logdir: /Users/deanwampler/ray_results/default
Number of trials: 1 (1 TERMINATED)
+--------------------+------------+-------+------+--------------+-------+----------+
| Trial name         | status     | loc   | iter |     time (s) |    ts |   reward |
|--------------------+------------+-------+------+--------------+-------+----------|
| PPO_CartPole-v1_…. | TERMINATED | ip:N  |   20 |       95.503 | 80000 |   494.77 |
+--------------------+------------+-------+------+--------------+-------+----------+

The command trains a neural network on the CartPole environment, which I’ll describe in a moment. It uses a popular distributed training algorithm called PPO (“proximal policy optimization”) with a stopping condition that it should run for 20 iterations only. You could also specify stopping conditions related to performance, such as the reward value. The –checkpoint* flags control how often a checkpoint of the “agent” (the thing that operates in the CartPole environment) is saved. This includes a simple neural network being trained. We mostly care about the final agent, which –checkpoint-at-end saves, but the –checkpoint-freq flag is useful in case the job fails for some reason. We can restart from the last checkpoint.

CartPole is the “hello world” of training environments. It is part of the OpenAI library of “gyms” for “exercising” RL algorithms. The idea is to train a cart to balance a vertical pole, where the cart can move left or right and the pole is restricted to two dimensions. Here is a video of the algorithm in action.

RandomAgent on CartPole-v1

I said the model was checkpointed, but where? By default, it’s written to your $HOME/ray_results directory. We can use the last checkpoint to “rollout” the model and see how well it works. Here I have elided the full directory name starting with PPO_CartPole-v1. Once again, the command takes two lines:

$ rllib rollout ~/ray_results/default/PPO_CartPole-v1.../checkpoint_20/checkpoint-20 --run PPO
…
Episode #0: reward: 500.0
Episode #1: reward: 484.0
...
Episode #19: reward: 458.0
Episode #20: reward: 488.0
Episode #21: reward: 367.0

A dialog will pop up that animates the rollout so you can see how well it works. You’ll see the pole start to fall left or right and the cart will move in an attempt to keep it upright. The reward points count the number of iterations where it successfully stays vertical and the cart doesn’t hit the left or right boundary, for a maximum of 500 iterations. It works very well.

So, the command line rllib is great for many quick training runs and experiments, but there is a full Python API when you need to dive deeper. See the Anyscale Academy RLlib tutorials for in-depth examples.

Ray Tune

I said that we trained a neural network with RLlib, but what were the parameters of that network and were they optimal? By default, a network with two hidden layers of 256 parameters each is used. These are two of many hyperparameters that we might want to tune to optimize the architecture of the neural network and the effectiveness of our RL agent.

Ray Tune is built for this purpose. Using its own CLI or API, you specify the hyperparameters you want to tune, usually specifying a range of allowed values, along with any fixed hyperparameters, then Tune runs experiments for you to find the optimal performance. Tune uses several algorithms to optimize this potentially-expensive process, such as early termination of training runs for hyperparameter sets that appear to be suboptimal.

Other Ray Libraries

Several other libraries come with Ray, as shown in the documentation and many third-party libraries now use Ray to implement scalable computation.

Where to Go from Here

I hope you found this short introduction to Ray intriguing. To learn more about Ray, check out these resources:

Share