In this article by Denny Lee, the author of the book Learning PySpark, has provided a brief implementation and theory on ML packages.
So, let’s get to it!
In this article, we will reuse a portion of the dataset. The data can be downloaded from http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz.
(For more resources related to this topic, see here.)
At the top level, the package exposes three main abstract classes: a Transformer, an Estimator, and a Pipeline. We will shortly explain each with some short examples.
The Transformer class, like the name suggests, transforms your data by (normally) appending a new column to your DataFrame.
At the high level, when deriving from the Transformer abstract class, each and every new Transformer needs to implement a .transform(…) method. The method, as a first and normally the only obligatory parameter, requires passing a DataFrame to be transformed. This, of course, varies method-by-method in the ML package: other popular parameters are inputCol and outputCol; these, however, frequently default to some predefined values, such as ‘features’ for the inputCol parameter.
There are many Transformers offered in the spark.ml.feature and we will briefly describe them here:
More information on Chi-square can be found here: http://ccnmtl.columbia.edu/projects/qmss/the_chisquare_test/about_the_chisquare_test.html.
Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the numFeatures parameter; otherwise the features will not be mapped evenly to the columns.
The FROM statement should be selecting from __THIS__ indicating you are accessing the DataFrame. For example: SELECT alpha * 3 + beta AS vec FROM __THIS__.
df = spark.createDataFrame(
[(12, 10, 3), (1, 4, 2)],
['a', 'b', 'c'])
The output of calling:
ft.VectorAssembler(inputCols=['a', 'b', 'c'],
outputCol='features')
.transform(df)
.select('features')
.collect()
It would look as follows:
[Row(features=DenseVector([12.0, 10.0, 3.0])),
Row(features=DenseVector([1.0, 4.0, 2.0]))]
Note that there are many methods in the ML package that have an E letter next to it; this means the method is currently in beta (or Experimental) and it sometimes might fail or produce erroneous results. Beware.
Estimators can be thought of as statistical models that need to be estimated to make predictions or classify your observations.
If deriving from the abstract Estimator class, the new model has to implement the .fit(…) method that fits the model given the data found in a DataFrame and some default or user-specified parameters.
There are a lot of estimators available in PySpark and we will now shortly describe the models available in Spark 2.0.
The ML package provides a data scientist with seven classification models to choose from. These range from the simplest ones (such as Logistic Regression) to more sophisticated ones. We will provide short descriptions of each of them in the following section:
There are seven models available for regression tasks in the PySpark ML package. As with classification, these range from some basic ones (such as obligatory Linear Regression) to more complex ones:
Clustering is a family of unsupervised models that is used to find underlying patterns in your data. The PySpark ML package provides four most popular models at the moment:
Check out this website for more information on pseudo-algorithm: http://minethedata.blogspot.com/2012/08/bisecting-k-means.html.
Beware that for datasets with many features this model might perform poorly due to the curse of dimensionality and numerical issues with Gaussian distributions.
There is also one recommendation model available in PySpark ML, but I will refrain from describing it here.
A Pipeline in PySpark ML is a concept of an end-to-end transformation-estimation process (with distinct stages) that ingests some raw data (in a DataFrame form), performs necessary data carpentry (transformations), and finally estimates a statistical model (estimator).
A Pipeline can be purely transformative, that is, consisting of Transformers only.
A Pipeline can be thought of as a chain of multiple discrete stages. When a .fit(…) method is executed on a Pipeline object, all the stages are executed in the order they were specified in the stages parameter; the stages parameter is a list of Transformer and Estimator objects. The .fit(…) method of the Pipeline object executes the .transform(…) method for the Transformers and the .fit(…) method for the Estimators.
Normally, the output of a preceding stage becomes the input for the following stage: when deriving from either the Transformer or Estimator abstract classes, one needs to implement the .getOutputCol() method that returns the value of the outputCol parameter specified when creating an object.
In this section, we will use the portion of the dataset to present the ideas of PySpark ML.
If you have not yet downloaded the data, it can be accessed here: http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz.
In this section, we will, once again, attempt to predict the chances of the survival of an infant.
First, we load the data with the help of the following code:
import pyspark.sql.types as typ
labels = [
('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
]
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('births_transformed.csv.gz',
header=True,
schema=schema)
We specify the schema of the DataFrame; our severely limited dataset now only has 17 columns.
Before we can use the dataset to estimate a model, we need to do some transformations. Since statistical models can only operate on numeric data, we will have to encode the BIRTH_PLACE variable.
Before we do any of this, since we will use a number of different feature transformations. Let’s import them all:
import pyspark.ml.feature as ft
To encode the BIRTH_PLACE column, we will use the OneHotEncoder method. However, the method cannot accept StringType columns – it can only deal with numeric types so first we will cast the column to an IntegerType:
births = births
.withColumn( 'BIRTH_PLACE_INT',
births['BIRTH_PLACE']
.cast(typ.IntegerType()))
Having done this, we can now create our first Transformer:
encoder = ft.OneHotEncoder(
inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')
Let’s now create a single column with all the features collated together. We will use the VectorAssembler method:
featuresCreator = ft.VectorAssembler(
inputCols=[
col[0]
for col
in labels[2:]] +
[encoder.getOutputCol()],
outputCol='features'
)
The inputCols parameter passed to the VectorAssembler object is a list of all the columns to be combined together to form the outputCol – the ‘features’. Note that we use the output of the encoder object (by calling the .getOutputCol() method), so we do not have to remember to change this parameter’s value should we change the name of the output column in the encoder object at any point.
It’s now time to create our first estimator.
In this example, we will (once again) use the Logistic Regression model. However, we will showcase some more complex models from the .classification set of PySpark ML models, so we load the whole section:
import pyspark.ml.classification as cl
Once loaded, let’s create the model by using the following code:
logistic = cl.LogisticRegression(
maxIter=10,
regParam=0.01,
labelCol='INFANT_ALIVE_AT_REPORT')
We would not have to specify the labelCol parameter if our target column had the name ‘label’. Also, if the output of our featuresCreator would not be called ‘features’ we would have to specify the featuresCol by (most conveniently) calling the getOutputCol() method on the featuresCreator object.
All that is left now is to create a Pipeline and fit the model. First, let’s load the Pipeline from the ML package:
from pyspark.ml import Pipeline
Creating Pipeline is really easy. Here’s how our pipeline should look like conceptually:
Converting this structure into a Pipeline is a walk in the park:
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
logistic
])
That’s it! Our pipeline is now created so we can (finally!) estimate the model.
Before you fit the model we need to split our dataset into training and testing datasets. Conveniently, the DataFrame API has the .randomSplit(…) method:
births_train, births_test = births
.randomSplit([0.7, 0.3], seed=666)
The first parameter is a list of dataset proportions that should end up in, respectively, births_train and births_test subsets. The seed parameter provides a seed to the randomizer.
You can also split the dataset into more than two subsets as long as the elements of the list sum up to 1, and you unpack the output into as many subsets.
For example, we could split the births dataset into three subsets like this:
train, test, val = births. randomSplit([0.7, 0.2, 0.1], seed=666)
The preceding code would put a random 70% of the births dataset into the train object, 20% would go to the test, and the val DataFrame would hold the remaining 10%.
Now it is about time to finally run our pipeline and estimate our model:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
The .fit(…) method of the pipeline object takes our training dataset as an input. Under the hood, the births_train dataset is passed first to the encoder object. The DataFrame that is created at the encoder stage then gets passed to the featuresCreator that creates the ‘features’ column. Finally, the output from this stage is passed to the logistic object that estimates the final model.
The .fit(…) method returns the PipelineModel object (the model object in the preceding snippet) that can then be used for prediction; we attain this by calling the .transform(…) method and passing the testing dataset created earlier. Here’s what the test_model looks like in the following command:
test_model.take(1)
It generates the following output:
As you can see, we get all the columns from the Transfomers and Estimators. The logistic regression model outputs several columns: the rawPrediction is the value of the linear combination of features and the β coefficients, probability is the calculated probability for each of the classes, and finally, the prediction, which is our final class assignment.
Obviously, we would like to now test how well our model did. PySpark exposes a number of evaluation methods for classification and regression in the .evaluation section of the package:
import pyspark.ml.evaluation as ev
We will use the BinaryClassficationEvaluator to test how well our model performed:
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
The rawPredictionCol can either be the rawPrediction column produced by the estimator or the probability.
Let’s see how well our model performed:
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderPR'}))
The preceding code produces the following result:
The area under the ROC of 74% and area under PR 71% shows a well-defined model, but nothing out of extraordinary; if we had other features, we could drive this up.
PySpark allows you to save the Pipeline definition for later use. It not only saves the pipeline structure, but also all the definitions of all the Transformers and Estimators:
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
So, you can load it up later and use it straight away to .fit(…) and predict:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline
.fit(births_train)
.transform(births_test)
.take(1)
The preceding code produces the same result (as expected):
Hence we studied ML package. We explained what Transformer and Estimator are, and showed their role in another concept introduced in the ML library: the Pipeline. Subsequently, we also presented how to use some of the methods to fine-tune the hyper parameters of models. Finally, we gave some examples of how to use some of the feature extractors and models from the library.
Further resources on this subject:
I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…
Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…
Once we learn how to deploy an Ubuntu server, how to manage users, and how…
Key-takeaways: Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…
While developing a web application, or setting dynamic pages and meta tags we need to deal with…
Software architecture is one of the most discussed topics in the software industry today, and…