In this blog post, we discuss Spark and Spark Pipelines—and how you might be able to export a critical component from your Spark project to Algorithmia by using the MLeap model interchange format and runtime.

MLeap logo and Apache Spark logo

Spark is a powerful big data processing system that’s capable of insane workloads. However, sometimes there are critical paths that don’t scale as effectively as you might want. In this blog post, we discuss Spark and Spark Pipelines—and how you might be able to export a critical component from your Spark project to Algorithmia by using the MLeap model interchange format and runtime.

What makes Spark great?

At its core, Apache Spark is a distributed data transformation engine for very large datasets and workloads. It links directly with powerful, battle-tested distributed data systems like Hadoop and Cassandra, which are industry standard in spaces such as the financial industry.

Spark allows for the processing of huge amounts of information across multiple machines either in a stream or as a batch, enabling the parallelization of workflows that would have otherwise been very difficult. This is great, but in the world of big data and data mining, there is one thing that vanilla Spark can’t do: machine learning.

What are Spark Pipelines?

This is where Spark Pipelines comes into play. Pipelines allow you to have a Spark data transformer that can learn, just like a neural network or a linear regression model. During the training process, you not only provide the inputs for your transformation, but you then “fit” your model (essentially training in ML parlance). Once your model is fitted, it’s ready for testing and finally real-world workloads.

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")

Why you might want to export your Spark Pipeline model outside of Spark

Pipelines allow you to train models to be used directly inside of your Spark environment—but what if you decide later that the model you trained might be better used outside of Spark?

Spark is a fantastic tool, but it’s heavy, and sometimes that heavyweight capability is overkill for a secondary project. Sometimes your workflow is running in Spark, but due to the JVM or other secondary effects, your Spark jobs are starting to run slowly or aren’t reliably completing jobs within an acceptable threshold. Or, maybe you want to  turn a Spark model into an API.

In these instances, trying to fine-tune your Spark cluster can certainly help, but there are tools that might be better suited for the task. If only you could pull your Spark pipeline out of Spark and be able to leverage those tools to the fullest…

Introducing MLeap

MLeap is both an ML runtime framework and a model interchange format that’s compatible with a variety of frameworks, including Tensorflow, Scikit-learn, and Spark.

This is great, because it allows us a mechanism for exporting a Spark Pipeline model into a non-Spark environment. Spark is pretty heavy, and it can be quite difficult to run in a microservice-based environment.

Using MLeap to export Spark Pipelines

As mentioned, MLeap is a runtime environment, but it’s also an ML interchange format. This means that it has its own internal representation of what your Spark Pipeline will look like, which is called a bundle. Let’s look at how we can create one of these bundles, using a Spark Pipeline we created.

import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.feature.{Binarizer, StringIndexer}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import resource._

  val datasetName = "./examples/spark-demo.csv"

  val dataframe: DataFrame = spark.sqlContext.read.format("csv")
    .option("header", true)
    .load(datasetName)
    .withColumn("test_double", col("test_double").cast("double"))

  // User out-of-the-box Spark transformers like you normally would
  val stringIndexer = new StringIndexer().
    setInputCol("test_string").
    setOutputCol("test_index")
  
  val binarizer = new Binarizer().
    setThreshold(0.5).
    setInputCol("test_double").
    setOutputCol("test_bin")
 
  val pipelineEstimator = new Pipeline()
    .setStages(Array(stringIndexer, binarizer))
  // now that our pipeline is created, we can fit it to the example data.
  val pipeline = pipelineEstimator.fit(dataframe)
  
  // -- pre-built pipeline entrypoint -- //
  // If you already have a spark pipeline that you've trained on your data, you can skip the previous
  
  // Pipeline serialization
  // As you can see, not only are we serializing the pipeline, we're serializing sample data along with it.
  // This is to ensure that the MLeap bundle not only has a representation of the model, but also the expected input and output structures to help lock down potentially dynamic model graphs.
  val sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))
  for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
    pipeline.writeBundle.save(bf)(sbc).get
  }

You can find the example data used during the model training on GitHub.

If run in a Spark REPL or a Scala project connected to a running Spark node, this Scala code will create the Spark Pipeline and serialize it along with its IO schema. Assuming you’re on a Unix-based machine, the serialized file will be located at /tmp/simple-spark-pipeline.zip. This is that MLeap bundle we’ve been talking about, and it’s ready to be ingested by a program leveraging the MLeap runtime environment—Spark no longer required.

How to run your exported model on Algorithmia

Now that you have an MLeap bundle containing your Spark Pipeline transformer, weights, and the expected IO for the model, you can export it to Algorithmia.

To start, you’ll need to create a new Scala algorithm.

Create a new algorithm

On the algorithm creation wizard, select the “Other” dropdown, and then select “Scala 2.x & sbt 1.3.x (Environments)”.

Select "Other" dropdown

 

Then select “Scala 2.X” as the environment in the dropdown. With that, click “Create New Algorithm”.

Select “Scala 2.X” environmentTo get started with updating the algorithm code itself, you’ll next need to import the MLeap dependencies, and also change the version of Scala that you’re using from Scala 2.11.8 to 2.11.14. You can do this by clicking the “DEPENDENCIES” button in the web editor, which you can access by clicking “view source”.

// Enter your dependencies here, using SBT syntax. Maven Central is available.
// http://search.maven.org/

// Examples:

// libraryDependencies += "org.apache.commons" % "commons-math3" % "3.4.1"

// libraryDependencies += "org.apache.commons" % "commons-csv" % "1.1"

libraryDependencies += "ml.combust.mleap" %% "mleap-runtime" % "0.16.0"
libraryDependencies += "ml.combust.mleap" %% "mleap-tensor" % "0.16.0"

scalaVersion := "2.11.12"

Save this, and your dependencies are set.

Next you’ll need the actual algorithm code. For this, we recommend adding the InputExample class and updates to the Algorithm class from this code snippet.

The model in this code snippet is also publicly available, so you can use that in your own example for demo purposes. The easiest way to get started is to copy and paste the Algorithm.scala code into your main file, create another file called InputExample.scala, and then copy over that file as well.

Want to see the final results? Check out the fully functioning algorithm running on Algorithmia.

Conclusion

Spark is a pretty potent framework, and it provides a lot of value for heavy enterprise scale workloads. However, if you’re running into issues with your Spark Pipeline model in regards to scalability and overall reliable performance, you don’t have to be locked in. MLeap is a framework and interchange format that allows you to break away from the Spark lock-in and run your models in other locations, like Algorithmia.

James Sutton