Orchestrating Pipelines with Luigi and Domino

by on June 15, 2016

Building a data pipeline may sound like a daunting task. In this post, we will examine how you can use Luigi – a library specifically designed to assist pipeline construction – to build a data pipeline that works with Domino.

What is a data pipeline?

A data pipeline is a collection of processes (“Tasks”), each of which operates on data output by other Tasks. Information “flows” through the pipeline as it is transformed.

For example, a pipeline for statistical model development might look like this:

clean data -> extract features -> fit model -> predict

A more complex ensemble model might look like this:

clean data -> extract features -> fit models -> combine models -> predict

The most straightforward pipeline is just a series of steps in a script. This is often sufficient – but not always.

For example, if you later add new data sources or tasks and complicate the pipeline’s dependency structure, your script may not scale well.

Also, when tasks fail, it’s better not to repeat previous steps, especially if the task is near the end of the pipeline. A failed stage can leave behind broken or partial output, which can cause a ripple effect if corrupted data is inadvertently consumed by other tasks.

Finally, a scripted pipeline may not account for hardware limitations, or it may not adequately address the need for additional resources for certain tasks (GPU, multiple CPUs, extra memory, etc).

Luigi

Luigi is a Python library that simplifies the creation and execution of data pipelines by addressing these and other related issues. It originated at Spotify and is also used by Foursquare, Stripe, Asana, and Buffer, among many others. It can manage all kinds of Tasks, including operations in other programming languages, making it useful for polyglot projects typical of modern data science.

With Luigi, you define each task in terms of its data dependencies (the output of other tasks). At runtime, a central scheduler automatically sorts out the execution ordering for you. Luigi also supports atomic file operations (useful for ensuring reproducibility) and automatic caching of previously-run pipeline steps (which can be a real time-saver).

If you’re interested in learning Luigi, start with the overview in the official documentation. Then move on to the execution model and common patterns. You may also want to take a look at this slideshow overview from the project’s original author.

Multi-machine pipelines on Domino

Even on a single machine, Luigi is an effective tool. Combining it with the capabilities of the Domino API gives it even more power, by allowing pipelines to scale across arbitrarily many runs. This can be done by adding just a few extra classes.

For example, you can train an ensemble of models in parallel on a mixture of GPUs and high-memory instances, then collect the resulting model files for comparison or other use – all kicked off with a single command.

Remote execution

DominoRunTask is a subclass of luigi.Task that represents remote execution on a different Domino machine. Simply tell the command to run remotely, give it the expected output location, and specify the hardware tier. At runtime, DominoRunTask will start a new Domino run, wait for it to complete, and then copy the output back to the main pipeline run so it can be used by subsequent tasks.

Here’s a simple example, demonstrated by these runs:

    import luigi  
    from domino_luigi import DominoRunTask

    class Hello(luigi.Task):  
        def output(self):
            return luigi.LocalTarget('hello_world_pipeline/hello.txt')

    def run(self):
        with self.output().open('w') as f:
            f.write('hello\n')


    class DominoHelloWorld(luigi.Task):  
        def requires(self):
            return DominoRunTask(
            domino_run_command=['example_pipeline.py', 'Hello'], #  --workers N
            domino_run_tier='Small',
            output_file_path=Hello().output().path,
        )

        def output(self):
            return luigi.LocalTarget('hello_world_pipeline/hello_world.txt')

        def run(self):
            with self.input().open('r') as in_f:
                with self.output().open('w') as out_f:
                    out_f.write(in_f.read() + 'world\n')

This starts a second run that creates the output file hello_world_pipeline/hello.txt. This file is then retrieved by the initial run, and the file’s contents are used to create hello_world_pipeline/hello_world.txt.

Notes:

  • Following the Luigi pattern, the output of the external run must be a single file. This is to ensure the atomicity of each task, which can prevent data corruption and is good for reproducibility. If your external job creates many files, simply create a zip archive and treat that as the output.
  • DominoRunTask takes a parameter called commit_id, which indicates the Domino project state that will be used as input for the run. If this parameter is not provided, it defaults to the input commit_id of the current Domino run. You can see the commit_id in URLs when using the UI’s file browser, e.g. https://app.dominodatalab.com/u/domino/luigi-multi-run/browse?commitId=1dee27202efaabe0baf780e6b67a3c0b739a2f4c.

Multi-machine ensembles

DominoRunTask can be applied to predictive model development. Take this ensemble of models as an example:

Each of these models can be trained on its own machine. The random forest model might run on the X-Large hardware tier, while the neural network uses a GPU tier. Since each training task is its own run, they can be inspected individually.

You can kick off the entire process of training and evaluating the model using a single Domino command:

domino run example_ensemble.py ScoreReport --workers 3

This is handy for retraining on a schedule.

During development, you can work on a sub-component of the pipeline by running a different task:

domino run example_ensemble.py TrainRForest

Once successfully completed, the resulting model file can be used by subsequent runs of the pipeline without having to retrain.

Notes:

  • In this example project, the random forest training and prediction code is found in forest_train.R and forest_predict.R. Note the extra steps taken to zip the directory containing the H2O model; these were added to ensure the atomicity of the task.
  • These models have not been tuned. The intent is to illustrate a proof-of-concept in an extendable way, rather than present the best fit.
  • Data and random forest examples were borrowed from here.
Share