Categories: TutorialsData

ML Package

18 min read

In this article by Denny Lee, the author of the book Learning PySpark, has provided a brief implementation and theory on ML packages.

So, let’s get to it!

In this article, we will reuse a portion of the dataset. The data can be downloaded from http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz.

(For more resources related to this topic, see here.)

Overview of the package

At the top level, the package exposes three main abstract classes: a Transformer, an Estimator, and a Pipeline. We will shortly explain each with some short examples.

Transformer

The Transformer class, like the name suggests, transforms your data by (normally) appending a new column to your DataFrame.

At the high level, when deriving from the Transformer abstract class, each and every new Transformer needs to implement a .transform(…) method. The method, as a first and normally the only obligatory parameter, requires passing a DataFrame to be transformed. This, of course, varies method-by-method in the ML package: other popular parameters are inputCol and outputCol; these, however, frequently default to some predefined values, such as ‘features’ for the inputCol parameter.

There are many Transformers offered in the spark.ml.feature and we will briefly describe them here:

  • Binarizer: Given a threshold, the method takes a continuous variable and transforms it into a binary one.
  • Bucketizer: Similar to the Binarizer, this method takes a list of thresholds (the splits parameter) and transforms a continuous variable into a multinomial one.
  • ChiSqSelector: For the categorical target variables (think, classification models), the feature allows you to select a predefined number of features (parameterized by the numTopFeatures parameter) that explain the variance in the target the best. The selection is done, as the name of the method suggest using a Chi-Square test. It is one of the two-step methods: first, you need to .fit(…) your data (so the method can calculate the Chi-square tests). Calling the .fit(…) method (you pass your DataFrame as a parameter) returns a ChiSqSelectorModel object that you can then use to transform your DataFrame using the .transform(…) method.

    More information on Chi-square can be found here: http://ccnmtl.columbia.edu/projects/qmss/the_chisquare_test/about_the_chisquare_test.html.

  • CountVectorizer: Useful for a tokenized text (such as [[‘Learning’, ‘PySpark’, ‘with’, ‘us’],[‘us’, ‘us’, ‘us’]]). It is of two-step methods: first, you need to .fit(…), that is, learn the patterns from your dataset, before you can .transform(…) with the CountVectorizerModel returned by the .fit(…) method. The output from this transformer, for the tokenized text presented previously, would look similar to this: [(4, [0, 1, 2, 3], [1.0, 1.0, 1.0, 1.0]),(4, [3], [3.0])].
  • DCT: The Discrete Cosine Transform takes a vector of real values and returns a vector of the same length, but with the sum of cosine functions oscillating at different frequencies. Such transformations are useful to extract some underlying frequencies in your data or in data compression.
  • ElementwiseProduct: A method that returns a vector with elements that are products of the vector passed to the method, and a vector passed as the scalingVec parameter. For example, if you had a [10.0, 3.0, 15.0] vector and your scalingVec was [0.99, 3.30, 0.66], then the vector you would get would look as follows: [9.9, 9.9, 9.9].
  • HashingTF: A hashing trick transformer that takes a list of tokenized text and returns a vector (of predefined length) with counts. From PySpark’s documentation:

    Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the numFeatures parameter; otherwise the features will not be mapped evenly to the columns.

  • IDF: The method computes an Inverse Document Frequency for a list of documents. Note that the documents need to already be represented as a vector (for example, using either the HashingTF or CountVectorizer).
  • IndexToString: A complement to the StringIndexer method. It uses the encoding from the StringIndexerModel object to reverse the string index to original values.
  • MaxAbsScaler: Rescales the data to be within the [-1, 1] range (thus, does not shift the center of the data).
  • MinMaxScaler: Similar to the MaxAbsScaler with the difference that it scales the data to be in the [0.0, 1.0] range.
  • NGram: The method that takes a list of tokenized text and returns n-grams: pairs, triples, or n-mores of subsequent words. For example, if you had a [‘good’, ‘morning’, ‘Robin’, ‘Williams’] vector you would get the following output: [‘good morning’, ‘morning Robin’, ‘Robin Williams’].
  • Normalizer: A method that scales the data to be of unit norm using the p-norm value (by default, it is L2).
  • OneHotEncoder: A method that encodes a categorical column to a column of binary vectors.
  • PCA: Performs the data reduction using principal component analysis.
  • PolynomialExpansion: Performs a polynomial expansion of a vector. For example, if you had a vector symbolically written as [x, y, z], the method would produce the following expansion: [x, x*x, y, x*y, y*y, z, x*z, y*z, z*z].
  • QuantileDiscretizer: Similar to the Bucketizer method, but instead of passing the splits parameter you pass the numBuckets one. The method then decides, by calculating approximate quantiles over your data, what the splits should be.
  • RegexTokenizer: String tokenizer using regular expressions.
  • RFormula: For those of you who are avid R users – you can pass a formula such as vec ~ alpha * 3 + beta (assuming your DataFrame has the alpha and beta columns) and it will produce the vec column given the expression.
  • SQLTransformer: Similar to the previous, but instead of R-like formulas you can use SQL syntax.

    The FROM statement should be selecting from __THIS__ indicating you are accessing the DataFrame. For example: SELECT alpha * 3 + beta AS vec FROM __THIS__.

  • StandardScaler: Standardizes the column to have 0 mean and standard deviation equal to 1.
  • StopWordsRemover: Removes stop words (such as ‘the’ or ‘a’) from a tokenized text.
  • StringIndexer: Given a list of all the words in a column, it will produce a vector of indices.
  • Tokenizer: Default tokenizer that converts the string to lower case and then splits on space(s).
  • VectorAssembler: A highly useful transformer that collates multiple numeric (vectors included) columns into a single column with a vector representation. For example, if you had three columns in your DataFrame:
    df = spark.createDataFrame(
        [(12, 10, 3), (1, 4, 2)], 
        ['a', 'b', 'c'])
    

    The output of calling:

    ft.VectorAssembler(inputCols=['a', 'b', 'c'], 
            outputCol='features')
        .transform(df) 
        .select('features')
        .collect()
    

    It would look as follows:

    [Row(features=DenseVector([12.0, 10.0, 3.0])), 
     Row(features=DenseVector([1.0, 4.0, 2.0]))]
    
  • VectorIndexer: A method for indexing categorical column into a vector of indices. It works in a column-by-column fashion, selecting distinct values from the column, sorting and returning an index of the value from the map instead of the original value.
  • VectorSlicer: Works on a feature vector, either dense or sparse: given a list of indices it extracts the values from the feature vector.
  • Word2Vec: The method takes a sentence (string) as an input and transforms it into a map of {string, vector} format, a representation that is useful in natural language processing.

Note that there are many methods in the ML package that have an E letter next to it; this means the method is currently in beta (or Experimental) and it sometimes might fail or produce erroneous results. Beware.

Estimators

Estimators can be thought of as statistical models that need to be estimated to make predictions or classify your observations.

If deriving from the abstract Estimator class, the new model has to implement the .fit(…) method that fits the model given the data found in a DataFrame and some default or user-specified parameters.

There are a lot of estimators available in PySpark and we will now shortly describe the models available in Spark 2.0.

Classification

The ML package provides a data scientist with seven classification models to choose from. These range from the simplest ones (such as Logistic Regression) to more sophisticated ones. We will provide short descriptions of each of them in the following section:

  • LogisticRegression: The benchmark model for classification. The logistic regression uses logit function to calculate the probability of an observation belonging to a particular class. At the time of writing, the PySpark ML supports only binary classification problems.
  • DecisionTreeClassifier: A classifier that builds a decision tree to predict a class for an observation. Specifying the maxDepth parameter limits the depth the tree grows, the minInstancePerNode determines the minimum number of observations in the tree node required to further split, the maxBins parameter specifies the maximum number of bins the continuous variables will be split into, and the impurity specifies the metric to measure and calculate the information gain from the split.
  • GBTClassifier: A Gradient Boosted Trees classification model for classification. The model belongs to the family of ensemble models: models that combine multiple weak predictive models to form a strong one. At the moment the GBTClassifier model supports binary labels, and continuous and categorical features.
  • RandomForestClassifier: The models produce multiple decision trees (hence the name – forest) and use the mode output of those decision trees to classify observations. The RandomForestClassifier supports both binary and multinomial labels.
  • NaiveBayes: Based on the Bayes’ theorem, this model uses conditional probability theory to classify observations. The NaiveBayes model in PySpark ML supports both binary and multinomial labels.
  • MultilayerPerceptronClassifier: A classifier that mimics the nature of a human brain. Deeply rooted in the Artificial Neural Networks theory, the model is a black-box, that is, it is not easy to interpret the internal parameters of the model. The model consists, at a minimum, of three, fully connected layers (a parameter that needs to be specified when creating the model object) of artificial neurons: the input layer (that needs to be equal to the number of features in your dataset), a number of hidden layers (at least one), and an output layer with the number of neurons equal to the number of categories in your label. All the neurons in the input and hidden layers have a sigmoid activation function, whereas the activation function of the neurons in the output layer is softmax.
  • OneVsRest: A reduction of a multiclass classification to a binary one. For example, in the case of a multinomial label, the model can train multiple binary logistic regression models. For example, if label == 2 the model will build a logistic regression where it will convert the label == 2 to 1 (or else label values would be set to 0) and then train a binary model. All the models are then scored and the model with the highest probability wins.

Regression

There are seven models available for regression tasks in the PySpark ML package. As with classification, these range from some basic ones (such as obligatory Linear Regression) to more complex ones:

  • AFTSurvivalRegression: Fits an Accelerated Failure Time regression model; It is a parametric model that assumes that a marginal effect of one of the features accelerates or decelerates a life expectancy (or process failure). It is highly applicable for the processes with well-defined stages.
  • DecisionTreeRegressor: Similar to the model for classification with an obvious distinction that the label is continuous instead of binary (or multinomial).
  • GBTRegressor: As with the DecisionTreeRegressor, the difference is the data type of the label.
  • GeneralizedLinearRegression: A family of linear models with differing kernel functions (link functions). In contrast to the linear regression that assumes normality of error terms, the GLM allows the label to have different error term distributions: the GeneralizedLinearRegression model from the PySpark ML package supports gaussian, binomial, gamma, and poisson families of error distributions with a host of different link functions.
  • IsotonicRegression: A type of regression that fits a free-form, non-decreasing line to your data. It is useful to fit the datasets with ordered and increasing observations.
  • LinearRegression: The most simple of regression models, assumes linear relationship between features and a continuous label, and normality of error terms.
  • RandomForestRegressor: Similar to either DecisionTreeRegressor or GBTRegressor, the RandomForestRegressor fits a continuous label instead of a discrete one.

Clustering

Clustering is a family of unsupervised models that is used to find underlying patterns in your data. The PySpark ML package provides four most popular models at the moment:

  • BisectingKMeans: A combination of k-means clustering method and hierarchical clustering. The algorithm begins with all observations in a single cluster and iteratively splits the data into k clusters.

    Check out this website for more information on pseudo-algorithm: http://minethedata.blogspot.com/2012/08/bisecting-k-means.html.

  • KMeans: It is the famous k-mean algorithm that separates data into k clusters, iteratively searching for centroids that minimize the sum of square distances between each observation and the centroid of the cluster it belongs to.
  • GaussianMixture: This method uses k Gaussian distributions with unknown parameters to dissect the dataset. Using the Expectation-Maximization algorithm, the parameters for the Gaussians are found by maximizing the log-likelihood function.

    Beware that for datasets with many features this model might perform poorly due to the curse of dimensionality and numerical issues with Gaussian distributions.

  • LDA: This model is used for topic modeling in natural language processing applications.

There is also one recommendation model available in PySpark ML, but I will refrain from describing it here.

Pipeline

A Pipeline in PySpark ML is a concept of an end-to-end transformation-estimation process (with distinct stages) that ingests some raw data (in a DataFrame form), performs necessary data carpentry (transformations), and finally estimates a statistical model (estimator).

A Pipeline can be purely transformative, that is, consisting of Transformers only.

A Pipeline can be thought of as a chain of multiple discrete stages. When a .fit(…) method is executed on a Pipeline object, all the stages are executed in the order they were specified in the stages parameter; the stages parameter is a list of Transformer and Estimator objects. The .fit(…) method of the Pipeline object executes the .transform(…) method for the Transformers and the .fit(…) method for the Estimators.

Normally, the output of a preceding stage becomes the input for the following stage: when deriving from either the Transformer or Estimator abstract classes, one needs to implement the .getOutputCol() method that returns the value of the outputCol parameter specified when creating an object.

Predicting chances of infant survival with ML

In this section, we will use the portion of the dataset to present the ideas of PySpark ML.

If you have not yet downloaded the data, it can be accessed here: http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz.

In this section, we will, once again, attempt to predict the chances of the survival of an infant.

Loading the data

First, we load the data with the help of the following code:

import pyspark.sql.types as typ
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]
schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('births_transformed.csv.gz', 
                        header=True, 
                        schema=schema)

We specify the schema of the DataFrame; our severely limited dataset now only has 17 columns.

Creating transformers

Before we can use the dataset to estimate a model, we need to do some transformations. Since statistical models can only operate on numeric data, we will have to encode the BIRTH_PLACE variable.

Before we do any of this, since we will use a number of different feature transformations. Let’s import them all:

import pyspark.ml.feature as ft

To encode the BIRTH_PLACE column, we will use the OneHotEncoder method. However, the method cannot accept StringType columns – it can only deal with numeric types so first we will cast the column to an IntegerType:

births = births 
    .withColumn(       'BIRTH_PLACE_INT', 
                births['BIRTH_PLACE'] 
                    .cast(typ.IntegerType()))

Having done this, we can now create our first Transformer:

encoder = ft.OneHotEncoder(
    inputCol='BIRTH_PLACE_INT', 
    outputCol='BIRTH_PLACE_VEC')

Let’s now create a single column with all the features collated together. We will use the VectorAssembler method:

featuresCreator = ft.VectorAssembler(
    inputCols=[
        col[0] 
        for col 
        in labels[2:]] + 
    [encoder.getOutputCol()], 
    outputCol='features'
)

The inputCols parameter passed to the VectorAssembler object is a list of all the columns to be combined together to form the outputCol – the ‘features’. Note that we use the output of the encoder object (by calling the .getOutputCol() method), so we do not have to remember to change this parameter’s value should we change the name of the output column in the encoder object at any point.

It’s now time to create our first estimator.

Creating an estimator

In this example, we will (once again) use the Logistic Regression model. However, we will showcase some more complex models from the .classification set of PySpark ML models, so we load the whole section:

import pyspark.ml.classification as cl

Once loaded, let’s create the model by using the following code:

logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')

We would not have to specify the labelCol parameter if our target column had the name ‘label’. Also, if the output of our featuresCreator would not be called ‘features’ we would have to specify the featuresCol by (most conveniently) calling the getOutputCol() method on the featuresCreator object.

Creating a pipeline

All that is left now is to create a Pipeline and fit the model. First, let’s load the Pipeline from the ML package:

from pyspark.ml import Pipeline

Creating Pipeline is really easy. Here’s how our pipeline should look like conceptually:

Converting this structure into a Pipeline is a walk in the park:

pipeline = Pipeline(stages=[
        encoder, 
        featuresCreator, 
        logistic
    ])

That’s it! Our pipeline is now created so we can (finally!) estimate the model.

Fitting the model

Before you fit the model we need to split our dataset into training and testing datasets. Conveniently, the DataFrame API has the .randomSplit(…) method:

births_train, births_test = births 
    .randomSplit([0.7, 0.3], seed=666)

The first parameter is a list of dataset proportions that should end up in, respectively, births_train and births_test subsets. The seed parameter provides a seed to the randomizer.

You can also split the dataset into more than two subsets as long as the elements of the list sum up to 1, and you unpack the output into as many subsets.

For example, we could split the births dataset into three subsets like this:

train, test, val = births.
    randomSplit([0.7, 0.2, 0.1], seed=666)

The preceding code would put a random 70% of the births dataset into the train object, 20% would go to the test, and the val DataFrame would hold the remaining 10%.

Now it is about time to finally run our pipeline and estimate our model:

model = pipeline.fit(births_train)
test_model = model.transform(births_test)

The .fit(…) method of the pipeline object takes our training dataset as an input. Under the hood, the births_train dataset is passed first to the encoder object. The DataFrame that is created at the encoder stage then gets passed to the featuresCreator that creates the ‘features’ column. Finally, the output from this stage is passed to the logistic object that estimates the final model.

The .fit(…) method returns the PipelineModel object (the model object in the preceding snippet) that can then be used for prediction; we attain this by calling the .transform(…) method and passing the testing dataset created earlier. Here’s what the test_model looks like in the following command:

test_model.take(1)

It generates the following output:

As you can see, we get all the columns from the Transfomers and Estimators. The logistic regression model outputs several columns: the rawPrediction is the value of the linear combination of features and the β coefficients, probability is the calculated probability for each of the classes, and finally, the prediction, which is our final class assignment.

Evaluating the performance of the model

Obviously, we would like to now test how well our model did. PySpark exposes a number of evaluation methods for classification and regression in the .evaluation section of the package:

import pyspark.ml.evaluation as ev

We will use the BinaryClassficationEvaluator to test how well our model performed:

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

The rawPredictionCol can either be the rawPrediction column produced by the estimator or the probability.

Let’s see how well our model performed:

print(evaluator.evaluate(test_model, 
    {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, 
   {evaluator.metricName: 'areaUnderPR'}))

The preceding code produces the following result:

The area under the ROC of 74% and area under PR 71% shows a well-defined model, but nothing out of extraordinary; if we had other features, we could drive this up.

Saving the model

PySpark allows you to save the Pipeline definition for later use. It not only saves the pipeline structure, but also all the definitions of all the Transformers and Estimators:

pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

So, you can load it up later and use it straight away to .fit(…) and predict:

loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline 
    .fit(births_train)
    .transform(births_test)
    .take(1)

The preceding code produces the same result (as expected):

Summary

Hence we studied ML package. We explained what Transformer and Estimator are, and showed their role in another concept introduced in the ML library: the Pipeline. Subsequently, we also presented how to use some of the methods to fine-tune the hyper parameters of models. Finally, we gave some examples of how to use some of the feature extractors and models from the library.

Resources for Article:


Further resources on this subject:


Packt

Share
Published by
Packt

Recent Posts

Top life hacks for prepping for your IT certification exam

I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…

3 years ago

Learn Transformers for Natural Language Processing with Denis Rothman

Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…

3 years ago

Learning Essential Linux Commands for Navigating the Shell Effectively

Once we learn how to deploy an Ubuntu server, how to manage users, and how…

3 years ago

Clean Coding in Python with Mariano Anaya

Key-takeaways:   Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…

3 years ago

Exploring Forms in Angular – types, benefits and differences   

While developing a web application, or setting dynamic pages and meta tags we need to deal with…

3 years ago

Gain Practical Expertise with the Latest Edition of Software Architecture with C# 9 and .NET 5

Software architecture is one of the most discussed topics in the software industry today, and…

3 years ago