[box type=”note” align=”” class=”” width=””]This article is an excerpt taken from a book Mastering Apache Spark 2.x – Second Edition written by Romeo Kienzler. The book will introduce you to Project Tungsten and Catalyst, two of the major advancements of Apache Spark 2.x.[/box]
In today’s tutorial we will show how to take advantage of Apache SparkML to win a Kaggle competition.
We’ll use an archived competition offered by BOSCH, a German multinational engineering and electronics company, on production line performance data. The data for this competition represents measurement of parts as they move through Bosch’s production line. Each part has a unique Id. The goal is to predict which part will fail quality control (represented by a ‘Response’ = 1). For more details on the competition data you may visit the website: https://www.kaggle.com/c/bosch-production-line-p erformance/data.
Data preparation
The challenge data comes in three ZIP packages but we only use two of them. One contains categorical data, one contains continuous data, and the last one contains timestamps of measurements, which we will ignore for now.
If you extract the data, you’ll get three large CSV files. So the first thing that we want to do is re-encode them into parquet in order to be more space-efficient:
def convert(filePrefix : String) = { val basePath = "yourBasePath"
var df = spark
.read
.option("header",true)
.option("inferSchema", "true")
.csv("basePath+filePrefix+".csv") df = df.repartition(1)
df.write.parquet(basePath+filePrefix+".parquet")
}
convert("train_numeric") convert("train_date") convert("train_categorical")
First, we define a function convert that just reads the .csv
file and rewrites it as a .parquet
file. As you can see, this saves a lot of space:
Now we read the files in again as DataFrames
from the parquet files :
var df_numeric = spark.read.parquet(basePath+"train_numeric.parquet")
var df_categorical = spark.read.parquet(basePath+"train_categorical.parquet")
Here is the output of the same:
This is very high-dimensional data; therefore, we will take only a subset of the columns for this illustration:
df_categorical.createOrReplaceTempView("dfcat")
var dfcat = spark.sql("select Id, L0_S22_F545 from dfcat")
In the following picture, you can see the unique categorical values of that column:
Now let’s do the same with the numerical dataset:
df_numeric.createOrReplaceTempView("dfnum")
var dfnum = spark.sql("select Id,L0_S0_F0,L0_S0_F2,L0_S0_F4,Response from dfnum")
Here is the output of the same:
Finally, we rejoin these two relations:
var df = dfcat.join(dfnum,"Id") df.createOrReplaceTempView("df")
Then we have to do some NA treatment:
var df_notnull = spark.sql(""" select
Response as label, case
when L0_S22_F545 is null then 'NA' else L0_S22_F545 end as L0_S22_F545,
case
when L0_S0_F0 is null then 0.0 else L0_S0_F0 end as L0_S0_F0,
case
when L0_S0_F2 is null then 0.0 else L0_S0_F2 end as L0_S0_F2,
case
when L0_S0_F4 is null then 0.0 else L0_S0_F4 end as L0_S0_F4
from df """)
Feature engineering
Now it is time to run the first transformer (which is actually an estimator). It is StringIndexer
and needs to keep track of an internal mapping table between strings and indexes. Therefore, it is not a transformer but an estimator:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
var indexer = new StringIndexer()
.setHandleInvalid("skip")
.setInputCol("L0_S22_F545")
.setOutputCol("L0_S22_F545Index")
var indexed = indexer.fit(df_notnull).transform(df_notnull) indexed.printSchema
As we can see clearly in the following image, an additional column called L0_S22_F545Index
has been created:
Finally, let’s examine some content of the newly created column and compare it with the source column. We can clearly see how the category string gets transformed into a float index:
Now we want to apply OneHotEncoder
, which is a transformer, in order to generate better features for our machine learning model:
var encoder = new OneHotEncoder()
.setInputCol("L0_S22_F545Index")
.setOutputCol("L0_S22_F545Vec")
var encoded = encoder.transform(indexed)
As you can see in the following figure, the newly created column
L0_S22_F545Vec
contains org.apache.spark.ml.linalg.SparseVector
objects, which is a compressed representation of a sparse vector:
Note: Sparse vector representations: The OneHotEncoder , as many other algorithms, returns a sparse vector of the org.apache.spark.ml.linalg.SparseVector type as, according to the definition, only one element of the vector can be one, the rest has to remain zero. This gives a lot of opportunity for compression as only the position of the elements that are non-zero has to be known. Apache Spark uses a sparse vector representation in the following format: (l,[p],[v]), where l stands for length of the vector, p for position (this can also be an array of positions), and v for the actual values (this can be an array of values). So if we get (13,[10],[1.0]), as in our earlier example, the actual sparse vector looks like this: (0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0). |
So now that we are done with our feature engineering, we want to create one overall sparse vector containing all the necessary columns for our machine learner. This is done using VectorAssembler
:
import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors
var vectorAssembler = new VectorAssembler()
.setInputCols(Array("L0_S22_F545Vec", "L0_S0_F0", "L0_S0_F2","L0_S0_F4"))
setOutputCol("features")
var assembled = vectorAssembler.transform(encoded)
We basically just define a list of column names and a target column, and the rest is done for us:
As the view of the features column got a bit squashed, let’s inspect one instance of the feature field in more detail:
We can clearly see that we are dealing with a sparse vector of length 16 where positions 0, 13, 14, and 15 are non-zero and contain the following values: 1.0, 0.03, -0.034, and -0.197. Done! Let’s create a Pipeline out of these components.
Testing the feature engineering pipeline
Let’s create a Pipeline out of our transformers and estimators:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.PipelineModel
//Create an array out of individual pipeline stages var transformers = Array(indexer,encoder,assembled)
var pipeline = new Pipeline().setStages(transformers).fit(df_notnull) var transformed = pipeline.transform(df_notnull)
Note that the setStages
method of Pipeline just expects an array of transformers and estimators, which we had created earlier. As parts of the Pipeline contain estimators, we have to run fit on our DataFrame
first. The obtained Pipeline object takes a DataFrame
in the transform method and returns the results of the transformations:
As expected, we obtain the very same DataFrame
as we had while running the stages individually in a sequence.
Training the machine learning model
Now it’s time to add another component to the Pipeline: the actual machine learning algorithm-RandomForest
:
import org.apache.spark.ml.classification.RandomForestClassifier var rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
var model = new Pipeline().setStages(transformers :+ rf).fit(df_notnull) var result = model.transform(df_notnull)
This code is very straightforward. First, we have to instantiate our algorithm and obtain it as a reference in rf. We could have set additional parameters to the model but we’ll do this later in an automated fashion in the CrossValidation
step. Then, we just add the stage to our Pipeline, fit it, and finally transform. The fit method, apart from running all upstream stages, also calls fit on the RandomForestClassifier
in order to train it. The trained model is now contained within the Pipeline and the transform method actually creates our predictions column:
As we can see, we’ve now obtained an additional column called prediction, which contains the output of the RandomForestClassifier
model. Of course, we’ve only used a very limited subset of available features/columns and have also not yet tuned the model, so we don’t expect to do very well; however, let’s take a look at how we can evaluate our model easily with Apache SparkML.
Model evaluation
Without evaluation, a model is worth nothing as we don’t know how accurately it performs. Therefore, we will now use the built-in BinaryClassificationEvaluator
in order to assess prediction performance and a widely used measure called areaUnderROC
(going into detail here is beyond the scope of this book):
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator val evaluator = new BinaryClassificationEvaluator()
import org.apache.spark.ml.param.ParamMap
var evaluatorParamMap = ParamMap(evaluator.metricName -> "areaUnderROC") var aucTraining = evaluator.evaluate(result, evaluatorParamMap)
As we can see, there is a built-in class called
org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
and there are some other classes for other prediction use cases such as RegressionEvaluator
or MulticlassClassificationEvaluator
. The evaluator takes a parameter map–in this case, we are telling it to use the areaUnderROC
metric–and finally, the evaluate method evaluates the result:
As we can see, areaUnderROC
is 0.5424418446501833. An ideal classifier would return a score of one. So we are only doing a bit better than random guesses but, as already stated, the number of features that we are looking at is fairly limited.
Note : In the previous example we are using the areaUnderROC metric which is used for evaluation of binary classifiers. There exist an abundance of other metrics used for different disciplines of machine learning such as accuracy, precision, recall and F1 score. The following provides a good overview http://www.cs.cornell.edu/courses/cs578/2003fa/performance_measures.pdf |
This areaUnderROC
is in fact a very bad value. Let’s see if choosing better parameters for our RandomForest
model increases this a bit in the next section.
This areaUnderROC
is in fact a very bad value. Let’s see if choosing better parameters for our RandomForest
model increases this a bit in the next section.
CrossValidation and hyperparameter tuning
As explained before, a common step in machine learning is cross-validating your model using testing data against training data and also tweaking the knobs of your machine learning algorithms. Let’s use Apache SparkML in order to do this for us, fully automated!
First, we have to configure the parameter map and CrossValidator
:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} var paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, 3 :: 5 :: 10 :: 30 :: 50 :: 70 :: 100 :: 150 ::
Nil)
.addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: "sqrt" :: "log2"
:: "onethird" :: Nil)
.addGrid(rf.impurity, "gini" :: "entropy" :: Nil)
.addGrid(rf.maxBins, 2 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)
.addGrid(rf.maxDepth, 3 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)
.build()
var crossValidator = new CrossValidator()
.setEstimator(new Pipeline().setStages(transformers :+ rf))
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5)
.setEvaluator(evaluator)
var crossValidatorModel = crossValidator.fit(df_notnull)
var newPredictions = crossValidatorModel.transform(df_notnull)
The org.apache.spark.ml.tuning.ParamGridBuilder
is used in order to define the hyperparameter space where the CrossValidator
has to search and finally, the
org.apache.spark.ml.tuning.CrossValidator
takes our Pipeline, the hyperparameter space of our RandomForest
classifier, and the number of folds for the CrossValidation
as parameters. Now, as usual, we just need to call fit and transform on the CrossValidator
and it will basically run our Pipeline multiple times and return a model that performs the best. Do you know how many different models are trained? Well, we have five folds on CrossValidation
and five-dimensional hyperparameter space cardinalities between two and eight, so let’s do the math: 5 * 8 * 5 * 2 * 7 * 7 = 19600 times!
Using the evaluator to assess the quality of the cross-validated and tuned model
Now that we’ve optimized our Pipeline in a fully automatic fashion, let’s see how our best model can be obtained:
var bestPipelineModel = crossValidatorModel.bestModel.asInstanceOf[PipelineModel]
var stages = bestPipelineModel.stages
import org.apache.spark.ml.classification.RandomForestClassificationModel val rfStage =
stages(stages.length-1).asInstanceOf[RandomForestClassificationModel] rfStage.getNumTrees
rfStage.getFeatureSubsetStrategy rfStage.getImpurity rfStage.getMaxBins rfStage.getMaxDepth
The crossValidatorModel.bestModel
code basically returns the best Pipeline. Now we use bestPipelineModel.stages
to obtain the individual stages and obtain the tuned RandomForestClassificationModel
using stages(stages.length 1).asInstanceOf[RandomForestClassificationModel]
. Note that stages.length-1 addresses the last stage in the Pipeline, which is our RandomForestClassifier
.
So now, we can basically run evaluator using the best model and see how it performs:
You might have noticed that 0.5362224872557545 is less than 0.5424418446501833, as we’ve obtained before. So why is this the case? Actually, this time we used cross-validation, which means that the model is less likely to over fit and therefore the score is a bit lower.
So let’s take a look at the parameters of the best model:
Note that we’ve limited the hyperparameter space, so numTrees
, maxBins
, and maxDepth
have been limited to five, and bigger trees will most likely perform better. So feel free to play around with this code and add features, and also use a bigger hyperparameter space, say, bigger trees.
Finally, we’ve applied the concepts that we discussed on a real dataset from a Kaggle competition, which is a good starting point for your own machine learning project with Apache SparkML.
If you found our post useful, do check out this book Mastering Apache Spark 2.x – Second Edition to know more about advanced analytics on your Big Data with latest Apache Spark 2.x.