11 min read
This article is an excerpt taken from a book Mastering Apache Spark 2.x – Second Edition written by Romeo Kienzler. The book will introduce you to Project Tungsten and Catalyst, two of the major advancements of Apache Spark 2.x.

In today’s tutorial we will show how to take advantage of Apache SparkML to win a Kaggle competition.

We’ll use an archived competition offered by BOSCH, a German multinational engineering and electronics company, on production line performance data. The data for this competition represents measurement of parts as they move through Bosch’s production line. Each part has a unique Id. The goal is to predict which part will fail quality control (represented by a ‘Response’ = 1). For more details on the competition data you may visit the website: https://www.kaggle.com/c/bosch-production-line-p erformance/data.

Data preparation

The challenge data comes in three ZIP packages but we only use two of them. One contains categorical data, one contains continuous data, and the last one contains timestamps of  measurements, which we will ignore for now.

If you extract the data, you’ll get three large CSV files. So the first thing that we want to do is re-encode them into parquet in order to be more space-efficient:


def convert(filePrefix : String) = { val basePath = "yourBasePath"

var df = spark

.read

.option("header",true)

.option("inferSchema", "true")

.csv("basePath+filePrefix+".csv") df = df.repartition(1)

df.write.parquet(basePath+filePrefix+".parquet")

}

convert("train_numeric") convert("train_date") convert("train_categorical")

First, we define a function convert that just reads the .csv file and rewrites it as a .parquet
 file. As you can see, this saves a lot of space:

Data

Now we read the files in again as DataFrames from the parquet files :

var df_numeric = spark.read.parquet(basePath+"train_numeric.parquet")

var df_categorical = spark.read.parquet(basePath+"train_categorical.parquet")

Here is the output of the same:

Scala output

This is very high-dimensional data; therefore, we will take only a subset of the columns for this illustration:

df_categorical.createOrReplaceTempView("dfcat")

var dfcat = spark.sql("select Id, L0_S22_F545 from dfcat")

In the following picture, you can see the unique categorical values of that column:

Scala

Now let’s do the same with the numerical dataset:

df_numeric.createOrReplaceTempView("dfnum")

var dfnum = spark.sql("select Id,L0_S0_F0,L0_S0_F2,L0_S0_F4,Response from dfnum")

Here is the output of the same:

Scala df

Finally, we rejoin these two relations:

var df = dfcat.join(dfnum,"Id") df.createOrReplaceTempView("df")

Then we have to do some NA treatment:

var df_notnull = spark.sql(""" select

Response as label, case

when L0_S22_F545 is null then 'NA' else L0_S22_F545 end as L0_S22_F545,

case

when L0_S0_F0 is null then 0.0 else L0_S0_F0 end as L0_S0_F0,

case

when L0_S0_F2 is null then 0.0 else L0_S0_F2 end as L0_S0_F2,

case

when L0_S0_F4 is null then 0.0 else L0_S0_F4 end as L0_S0_F4

from df """)

Feature engineering

Now it is time to run the first transformer (which is actually an estimator). It is StringIndexer and needs to keep track of an internal mapping table between strings and indexes. Therefore, it is not a transformer but an estimator:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

var indexer = new StringIndexer()

.setHandleInvalid("skip")

.setInputCol("L0_S22_F545")

.setOutputCol("L0_S22_F545Index")

var indexed = indexer.fit(df_notnull).transform(df_notnull) indexed.printSchema

As we can see clearly in the following image, an additional column called L0_S22_F545Index has been created:

root

Finally, let’s examine some content of the newly created column and compare it with the source column. We can clearly see how the category string gets transformed into a float index:

Scala

Now we want to apply OneHotEncoder, which is a transformer, in order to generate better features for our machine learning model:

var encoder = new OneHotEncoder()

.setInputCol("L0_S22_F545Index")

.setOutputCol("L0_S22_F545Vec")

var encoded = encoder.transform(indexed)

As you can see in the following figure, the newly created column
L0_S22_F545Vec contains org.apache. spark.ml.linalg.SparseVector objects, which is a compressed representation of a sparse vector:

Scala encoded

Note: Sparse vector representations: The OneHotEncoder, as many other algorithms, returns a sparse vector of the org.apache.spark.ml.linalg.SparseVector type as, according to the definition, only one element of the vector can be one, the rest has to remain zero. This gives a lot of opportunity for compression as only the position of the elements that are non-zero has to be known. Apache Spark uses a sparse vector representation in the following format: (l,[p],[v]), where l stands for length of the vector, p for position (this can also be an array of positions), and v for the actual values (this can be an array of values). So if we get (13,[10],[1.0]), as in our earlier example, the actual sparse vector looks like this: (0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0).

So now that we are done with our feature engineering, we want to create one overall sparse vector containing all the necessary columns for our machine learner. This is done using VectorAssembler:

import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors

var vectorAssembler = new VectorAssembler()

.setInputCols(Array("L0_S22_F545Vec", "L0_S0_F0", "L0_S0_F2","L0_S0_F4"))

setOutputCol("features")

var assembled = vectorAssembler.transform(encoded)

We basically just define a list of column names and a target column, and the rest is done for us:

scala assembled

As the view of the features column got a bit squashed, let’s inspect one instance of the feature field in more detail:

Scala assembled

We can clearly see that we are dealing with a sparse vector of length 16 where positions 0, 13, 14, and 15 are non-zero and contain the following values: 1.0, 0.03, -0.034, and -0.197. Done! Let’s create a Pipeline out of these components.

Testing the feature engineering pipeline

Let’s create a Pipeline out of our transformers and estimators:

import org.apache.spark.ml.Pipeline import org.apache.spark.ml.PipelineModel

//Create an array out of individual pipeline stages var transformers = Array(indexer,encoder,assembled)

var pipeline = new Pipeline().setStages(transformers).fit(df_notnull) var transformed = pipeline.transform(df_notnull)

Note that the setStages method of Pipeline just expects an array of transformers and estimators, which we had created earlier. As parts of the Pipeline contain estimators, we have to run fit on our DataFrame first. The obtained Pipeline object takes a DataFrame in the transform method and returns the results of the transformations:

Scala transformed

As expected, we obtain the very same DataFrame as we had while running the stages individually in a sequence.

Training the machine learning model

Now it’s time to add another component to the Pipeline: the actual machine learning algorithm-RandomForest:

import org.apache.spark.ml.classification.RandomForestClassifier var rf = new RandomForestClassifier()

.setLabelCol("label")

.setFeaturesCol("features")

var model = new Pipeline().setStages(transformers :+ rf).fit(df_notnull) var result = model.transform(df_notnull)

This code is very straightforward. First, we have to instantiate our algorithm and obtain it as a reference in rf. We could have set additional parameters to the model but we’ll do this later in an automated fashion in the CrossValidation step. Then, we just add the stage to our Pipeline, fit it, and finally transform. The fit method, apart from running all upstream stages, also calls fit on the RandomForestClassifier in order to train it. The trained model is now contained within the Pipeline and the transform method actually creates our predictions column:

Scala result

As we can see, we’ve now obtained an additional column called prediction, which contains the output of the RandomForestClassifier model. Of course, we’ve only used a very limited subset of available features/columns and have also not yet tuned the model, so we don’t expect to do very well; however, let’s take a look at how we can evaluate our model easily with Apache SparkML.

Model evaluation

Without evaluation, a model is worth nothing as we don’t know how accurately it performs. Therefore, we will now use the built-in BinaryClassificationEvaluator in order to assess prediction performance and a widely used measure called areaUnderROC (going into detail here is beyond the scope of this book):

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator val evaluator = new BinaryClassificationEvaluator()

import org.apache.spark.ml.param.ParamMap

var evaluatorParamMap = ParamMap(evaluator.metricName -> "areaUnderROC") var aucTraining = evaluator.evaluate(result, evaluatorParamMap)

As we can see, there is a built-in class called
org.apache.spark.ml.evaluation.BinaryClassificationEvaluator and there are some other classes for other prediction use cases such as RegressionEvaluator or MulticlassClassificationEvaluator. The evaluator takes a parameter map–in this case, we are telling it to use the areaUnderROC metric–and finally, the evaluate method evaluates the result:

Scala aucTraining

As we can see, areaUnderROC is 0.5424418446501833. An ideal classifier would return a score of one. So we are only doing a bit better than random guesses but, as already stated, the number of features that we are looking at is fairly limited.

Note : In the previous example we are using the areaUnderROC metric which is used for evaluation of binary classifiers. There exist an abundance of other metrics used for different disciplines of machine learning such as accuracy, precision, recall and F1 score. The following provides a good overview http://www.cs.cornell.edu/courses/cs578/2003fa/performance_measures.pdf

This areaUnderROC is in fact a very bad value. Let’s see if choosing better parameters for our RandomForest model increases this a bit in the next section.

This areaUnderROC is in fact a very bad value. Let’s see if choosing better parameters for our RandomForest model increases this a bit in the next section.

CrossValidation and hyperparameter tuning

As explained before, a common step in machine learning is cross-validating your model using testing data against training data and also tweaking the knobs of your machine learning algorithms. Let’s use Apache SparkML in order to do this for us, fully automated!

First, we have to configure the parameter map and CrossValidator:

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} var paramGrid = new ParamGridBuilder()

.addGrid(rf.numTrees, 3 :: 5 :: 10 :: 30 :: 50 :: 70 :: 100 :: 150 ::

Nil)

.addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: "sqrt" :: "log2"

:: "onethird" :: Nil)

.addGrid(rf.impurity, "gini" :: "entropy" :: Nil)

.addGrid(rf.maxBins, 2 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)

.addGrid(rf.maxDepth, 3 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)

.build()

var crossValidator = new CrossValidator()

.setEstimator(new Pipeline().setStages(transformers :+ rf))

.setEstimatorParamMaps(paramGrid)

.setNumFolds(5)

.setEvaluator(evaluator)

var crossValidatorModel = crossValidator.fit(df_notnull)

var newPredictions = crossValidatorModel.transform(df_notnull)

The org.apache.spark.ml.tuning.ParamGridBuilder is used in order to define the hyperparameter space where the CrossValidator has to search and finally, the
org.apache.spark.ml.tuning.CrossValidator takes our Pipeline, the hyperparameter space of our RandomForest classifier, and the number of folds for the CrossValidation as parameters. Now, as usual, we just need to call fit and transform on the CrossValidator and it will basically run our Pipeline multiple times and return a model that performs the best. Do you know how many different models are trained? Well, we have five folds on CrossValidation and five-dimensional hyperparameter space cardinalities between two and eight, so let’s do the math: 5 * 8 * 5 * 2 * 7 * 7 = 19600 times!

Using the evaluator to assess the quality of the cross-validated and tuned model

Now that we’ve optimized our Pipeline in a fully automatic fashion, let’s see how our best model can be obtained:

var bestPipelineModel = crossValidatorModel.bestModel.asInstanceOf[PipelineModel]

var stages = bestPipelineModel.stages

import org.apache.spark.ml.classification.RandomForestClassificationModel val rfStage =

stages(stages.length-1).asInstanceOf[RandomForestClassificationModel] rfStage.getNumTrees

rfStage.getFeatureSubsetStrategy rfStage.getImpurity rfStage.getMaxBins rfStage.getMaxDepth

The crossValidatorModel.bestModel code basically returns the best Pipeline. Now we use bestPipelineModel.stages to obtain the individual stages and obtain the tuned RandomForestClassificationModel using stages(stages.length 1).asInstanceOf[RandomForestClassificationModel]. Note that stages.length-1 addresses the last stage in the Pipeline, which is our RandomForestClassifier.

So now, we can basically run evaluator using the best model and see how it performs:

Scala evaluator

You might have noticed that 0.5362224872557545 is less than 0.5424418446501833, as we’ve obtained before. So why is this the case? Actually, this time we used cross-validation, which means that the model is less likely to over fit and therefore the score is a bit lower.

So let’s take a look at the parameters of the best model:

Scala rf stage

Note that we’ve limited the hyperparameter space, so numTreesmaxBins, and maxDepth have been limited to five, and bigger trees will most likely perform better. So feel free to play around with this code and add features, and also use a bigger hyperparameter space, say, bigger trees.

Finally, we’ve applied the concepts that we discussed on a real dataset from a Kaggle competition, which is a good starting point for your own machine learning project with Apache SparkML.

If you found our post useful, do check out this book Mastering Apache Spark 2.x – Second Edition to know more about advanced analytics on your Big Data with latest Apache Spark 2.x.

Mastering Apache Spark 2.x

 

 

LEAVE A REPLY

Please enter your comment!
Please enter your name here