Creating Multi-language Pipelines with Apache Spark or Avoid Having to Rewrite spaCy into Java

by on December 23, 2018

In this guest post, Holden Karau, Apache Spark Committer, provides insights on how to create multi-language pipelines with Apache Spark and avoid rewriting spaCy into Java. She has already written a complementary blog post on using spaCy to process text data for Domino. Karau is a Developer Advocate at Google as well as a co-author on High Performance Spark and Learning Spark. She also has a repository of her talks, code reviews, and code sessions on Twitch and Youtube.

Introduction

As a Scala/Java data/ML engineer, do you find yourself needing to work with some existing PySpark code but not wanting to rewrite all of the libraries that were used? Or maybe you’re just tired of the NLP options in the JVM and want to give spaCy a shot? Apache Spark’s new Apache Arrow based UDFs not only offer performance improvement, but can also be combined with experimental features to allow the development of cross-language pipelines. This blog post focuses on using spaCy, and I have another in the works focused on NLTK that I will post on my blog. If you’re looking to use spaCy with PySpark in a single-language pipeline, you’ll be better served by my earlier post “Making PySpark Work with spaCy: Overcoming Serialization Errors.”

If you choose to use these techniques in production, be forewarned that debugging multi-language pipelines in Spark will add extra layers and complications on top of that. You should check out Holden’s debugging Spark video and, if you have a corporate expense account, there is a deep dive on Debugging Apache Spark available on Safari.

Calling and Registering Python UDFs from Scala / Java

Our first step needs to set up a way for the JVM to call into Python and have Python register the UDFs into Java.

Inside of something called startup.py, we can create an entry point so that our Java code can call over Py4J. This boilerplate to connect Python and Scala is a bit annoying. If you want to see the details (or ruin the magic) you can check out Initialize.scala and startup.py. The key part of this is a register provider that is defined in both Scala & Python:


# This class is used to allow the Scala process to call into Python
# It may not run in the same Python process as your regular Python
# shell if you are running PySpark normally.
class PythonRegistrationProvider(object):
  """
  Provide an entry point for Scala to call to register functions.
  """

  def __init__(self, gateway):
    self.gateway = gateway
    self._sc = None
    self._session = None
    self._count = 0

  def registerFunction(self, ssc, jsession, function_name, params):
    jvm = self.gateway.jvm
    # If we don't have a reference to a running SparkContext
    # Get the SparkContext from the provided SparkSession.
    if not self._sc:
        master = ssc.master()
        jsc = jvm.org.apache.spark.api.java.JavaSparkContext(ssc)
        jsparkConf = ssc.conf()
        sparkConf = SparkConf(_jconf=jsparkConf)
        self._sc = SparkContext(
           master=master,
           conf=sparkConf,
           gateway=self.gateway,
           jsc=jsc)
        self._session = SparkSession.builder.getOrCreate()
    if function_name in functions_info:
        function_info = functions_info[function_name]
        if params:
            evaledParams = ast.literal_eval(params)
        else:
          evaledParams = []
        func = function_info.func(*evaledParams)
        ret_type = function_info.returnType()
        self._count = self._count + 1
        registration_name = function_name + str(self._count)
        udf = UserDefinedFunction(func, ret_type, registration_name)
        # Configure non-standard evaluation types (e.g. Arrow)
        udf.evalType = function_info.evalType()
        judf = udf._judf
        return judf
    else:
       return None

    class Java:
       package = "com.sparklingpandas.sparklingml.util.python"
       className = "PythonRegisterationProvider"
       implements = [package + "." + className]

On the Scala side it ends up looking like:


/**
 * Abstract trait to implement in Python to allow Scala to call in to perform
 * registration.
 */
trait PythonRegisterationProvider {
  // Takes a SparkContext, SparkSession, String, and String
  // Returns UserDefinedPythonFunction but types + py4j :(
  def registerFunction(
    sc: SparkContext, session: Object,
    functionName: Object, params: Object): Object
}


Constructing the user defined function is done in the base PythonTransformer. The main call is done with:


  // Call the registration provider from startup.py to get a Python UDF back.
    val pythonUdf = Option(registrationProvider.registerFunction(
      session.sparkContext,
      session,
      pythonFunctionName,
      miniSerializeParams()))
    val castUdf = pythonUdf.map(_.asInstanceOf[UserDefinedPythonFunction])


Since we now need to pass a parameter in between the JVM and Python (e.g., the language we’re working in), the wrapper has to have logic to point to both the desired python functions and the parameters to configure it with:


  final val lang = new Param[String](this, "lang", "language for tokenization")

  /** @group getParam */
  final def getLang: String = $(lang)

  final def setLang(value: String): this.type = set(this.lang, value)

  def this() = this(Identifiable.randomUID("SpacyTokenizePython"))

  override val pythonFunctionName = "spaCytokenize"
  override protected def outputDataType = ArrayType(StringType)
  override protected def validateInputType(inputType: DataType): Unit = {
    if (inputType != StringType) {
      throw new IllegalArgumentException(
        s"Expected input type StringType instead found ${inputType}")
    }
  }

  override def copy(extra: ParamMap) = {
    defaultCopy(extra)
  }

  def miniSerializeParams() = {
    "[\"" + $(lang) + "\"]"
  }
}

We then need to refactor our Python code in such a way that it’s easy for the Scala code to call with these params. Inside SparklingML we have a base class we can use, ScalarVectorizedTransformationFunction, to handle some of the boilerplate so it can look like:


class SpacyTokenize(ScalarVectorizedTransformationFunction):
    @classmethod
    def setup(cls, sc, session, *args):
        pass

    @classmethod
    def func(cls, *args):
        lang = args[0]

        def inner(inputSeries):
            """Tokenize the inputString using spaCy for
            the provided language."""
            nlp = SpacyMagic.get(lang) # Optimization for spacy.load

            def tokenizeElem(elem):
                result_itr =  [token.text for token in nlp(elem)]
                return list(result_itr)

            return inputSeries.apply(tokenizeElem)
        return inner

    @classmethod
    def returnType(cls, *args):
        return ArrayType(StringType())


functions_info["spacytokenize"] = SpacyTokenize


SpaCyMagic: Going beyond `spacy.load()` everywhere

One of the big challenges with PySpark is serialization issues, and this challenge almost doubles with multi-language pipelines.


# Spacy isn't serializable but loading it is semi-expensive
@ignore_unicode_prefix
class SpacyMagic(object):
    """
    Simple Spacy Magic to minimize loading time.
    >>> SpacyMagic.get("en")
    <spacy.lang.en.English ...
    """
    _spacys = {}

    @classmethod
    def get(cls, lang):
        if lang not in cls._spacys:
            import spacy
            try:
                try:
                    cls._spacys[lang] = spacy.load(lang)
                except Exception:
                    spacy.cli.download(lang)
                    cls._spacys[lang] = spacy.load(lang)
            except Exception as e:
                raise Exception(
                    "Failed to find or download language {0}: {1}"
                    .format(lang, e))

        return cls._spacys[lang]


Then in our code we access spaCy through our friend SpacyMagic instead.

Spark 2.4 has some interesting new tricks coming where we could do the spaCy load pre-fork, but that’s a topic for another blog post. (Once again, keep an eye on my blog / medium / twitter where I’ll share it.)

Tying it all together with cross-language WordCount

This wouldn’t seem like a real big data blog post if we didn’t focus unnecessarily on WordCount, but we’re going to do it with the different twist of using both Scala and Python. In Scala we can use the transformer we created to tokenize the words for WordCount:



val data = session.load(...)
val transformer = new SpacyTokenizePython()
transformer.setLang("en")
transformer.setInputCol("input")
transformer.setOutputCol("tokens")
val tokens = transformer.transform(data)
val counts = tokens.groupBy("tokens").count()
counts.write.format("json").save("...")


Wrapping Up

While these techniques work well enough for a simple cross-language pipeline like WordCount, we have to consider some additional aspects when building more complex pipelines. For instance, even in our simple example, spaCy has more information beside just the tokens, and the current lack of compound types of arrays in our Panda’s UDFs makes it awkward to return complex results.

If you found this fun, hopefully you’ll join us in contributing to the Sparkling ML project, Apache Arrow, or the Apache Spark Python and Arrow integration.

Further Reading

If you’re interested in seeing behind the scenes for this post, you can take a look at this live coding session and the corresponding sparkling ml repo. And if you’re interested in introducing children to the magical world of distributed systems, sign up to hear more about my next book “Distributed Computing 4 Kids”. If everything catches on fire, these debugging resources (free talk and subscription Safari deep-dive) should be able to help you at least a little bit.

Share