This Spark machine learning tutorial is by Krishna Sankar, the author of Fast Data Processing with Spark Second Edition. One of the major attractions of Spark is the ability to scale computation massively, and that is exactly what you need for machine learning algorithms. But the caveat is that all machine learning algorithms cannot be effectively parallelized. Each algorithm has its own challenges for parallelization, whether it is task parallelism or data parallelism. Having said that, Spark is becoming the de-facto platform for building machine learning algorithms and applications. For example, Apache Mahout is moving away from Hadoop MapReduce and implementing the algorithms in Spark (see the first reference at the end of this article). The developers working on the Spark MLlib are implementing more and more machine algorithms in a scalable and concise manner in the Spark framework. For the latest information on this, you can refer to the Spark site at https://spark.apache.org/docs/latest/mllib-guide.html, which is the authoritative source.
This article covers the following machine learning algorithms:
- Basic statistics
- Linear regression
- Classification
- Clustering
- Recommendations
The Spark machine learning algorithm table
The Spark machine learning algorithms implemented in Spark 1.1.0 org.apache.spark.mllib for Scala and Java, and in pyspark.mllib for Python is shown in the following table:
Algorithm |
Feature |
Notes |
Basic statistics |
Summary statistics |
Mean, variance, count, max, min, and numNonZeros |
|
Correlations |
Spearman and Pearson correlation |
|
Stratified sampling |
sampleBykey, sampleByKeyExact—With and without replacement |
|
Hypothesis testing |
Pearson’s chi-squared goodness of fit test |
|
Random data generation |
RandomRDDs Normal, Poisson, and so on |
Regression |
Linear models |
Linear regression—least square, Lasso, and ridge regression |
Classification |
Binary classification |
Logistic regression, SVM, decision trees, and naïve Bayes |
|
Multi-class classification |
Decision trees, naïve Bayes, and so on |
Recommendation |
Collaborative filtering |
Alternating least squares |
Clustering |
k-means |
|
Dimensionality reduction |
SVD PCA |
|
Feature extraction |
TF-IDF Word2Vec StandardScaler Normalizer |
|
Optimization |
SGD L-BFGS |
|
Spark MLlib examples
Now, let’s look at how to use the algorithms. Naturally, we need interesting datasets to implement the algorithms; we will use appropriate datasets for the algorithms shown in the next section.
The code and data files are available in the GitHub repository at https://github.com/xsankar/fdps-vii. We’ll keep it updated with corrections.
Basic statistics
Let’s read the car mileage data into an RDD and then compute some basic statistics. We will use a simple parse class to parse a line of data. This will work if you know the type and the structure of your CSV file. We will use this technique for the examples in this article:
import org.apache.spark.SparkContext import org.apache.spark.mllib.stat. {MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD object MLlib01 { // def getCurrentDirectory = new java.io.File( "." ).getCanonicalPath // def parseCarData(inpLine : String) : Array[Double] = { val values = inpLine.split(',') val mpg = values(0).toDouble val displacement = values(1).toDouble val hp = values(2).toInt val torque = values(3).toInt val CRatio = values(4).toDouble val RARatio = values(5).toDouble val CarbBarrells = values(6).toInt val NoOfSpeed = values(7).toInt val length = values(8).toDouble val width = values(9).toDouble val weight = values(10).toDouble val automatic = values(11).toInt return Array(mpg,displacement,hp, torque,CRatio,RARatio,CarbBarrells, NoOfSpeed,length,width,weight,automatic) } // def main(args: Array[String]) { println(getCurrentDirectory) val sc = new SparkContext("local","Chapter 9") println(s"Running Spark Version ${sc.version}") // val dataFile = sc.textFile("/Users/ksankar/fdps-vii/data/car- milage-no-hdr.csv") val carRDD = dataFile.map(line => parseCarData(line)) // // Let us find summary statistics // val vectors: RDD[Vector] = carRDD.map(v => Vectors.dense(v)) val summary = Statistics.colStats(vectors) carRDD.foreach(ln=> {ln.foreach(no => print("%6.2f | " .format(no))); println()}) print("Max :");summary.max.toArray.foreach(m => print("%5.1f | ".format(m)));println print("Min :");summary.min.toArray.foreach(m => print("%5.1f | ".format(m)));println print("Mean :");summary.mean.toArray.foreach(m => print("%5.1f | ".format(m)));println } }
This program will produce the following output:
Let’s also run some correlations, as shown here:
// // correlations // val hp = vectors.map(x => x(2)) val weight = vectors.map(x => x(10)) var corP = Statistics.corr(hp,weight,"pearson") // default println("hp to weight : Pearson Correlation = %2.4f".format(corP)) var corS = Statistics.corr(hp,weight,"spearman") // Need to specify println("hp to weight : Spearman Correlation = %2.4f" .format(corS)) // val raRatio = vectors.map(x => x(5)) val width = vectors.map(x => x(9)) corP = Statistics.corr(raRatio,width,"pearson") // default println("raRatio to width : Pearson Correlation = %2.4f" .format(corP)) corS = Statistics.corr(raRatio,width,"spearman") // Need to specify println("raRatio to width : Spearman Correlation = %2.4f" .format(corS)) //
This will produce interesting results as shown in the next screenshot:
While this might seem too much work to calculate the correlation of a tiny dataset, remember that this will scale to datasets consisting of 1,000,000 rows or even a billion rows!
Linear regression
Linear regression takes a little more work than statistics. We need the LabeledPoint class as well as a few more parameters such as the learning rate, that is, the step size. We will also split the dataset into training and test, as shown here:
// // def carDataToLP(inpArray : Array[Double]) : LabeledPoint = { return new LabeledPoint( inpArray(0),Vectors.dense ( inpArray(1), inpArray(2), inpArray(3), inpArray(4), inpArray(5), inpArray(6), inpArray(7), inpArray(8), inpArray(9), inpArray(10), inpArray(11) ) ) } // Linear Regression // val carRDDLP = carRDD.map(x => carDataToLP(x)) // create a labeled point RDD println(carRDDLP.count()) println(carRDDLP.first().label) println(carRDDLP.first().features) // // Let us split the data set into training & test set using a very simple filter // val carRDDLPTrain = carRDDLP.filter( x => x.features(9) <= 4000) val carRDDLPTest = carRDDLP.filter( x => x.features(9) > 4000) println("Training Set : " + "%3d".format (carRDDLPTrain.count())) println("Training Set : " + "%3d".format(carRDDLPTest.count())) // // Train a Linear Regression Model // numIterations = 100, stepsize = 0.000000001 // without such a small step size the algorithm will diverge // val mdlLR = LinearRegressionWithSGD.train (carRDDLPTrain,100,0.000000001) println(mdlLR.intercept) // Intercept is turned off when using LinearRegressionSGD object, so intercept will always be 0 for this code println(mdlLR.weights) // // Now let us use the model to predict our test set // val valuesAndPreds = carRDDLPTest.map(p => (p.label, mdlLR.predict(p.features))) val mse = valuesAndPreds.map( vp => math.pow( (vp._1 - vp._2),2 ) ). reduce(_+_) / valuesAndPreds.count() println("Mean Squared Error = " + "%6.3f".format(mse)) println("Root Mean Squared Error = " + "%6.3f" .format(math.sqrt(mse))) // Let us print what the model predicted valuesAndPreds.take(20).foreach(m => println("%5.1f | %5.1f |" .format(m._1,m._2)))
The run result will be as expected, as shown in the next screenshot:
The prediction is not that impressive. There are a couple of reasons for this. There might be quadratic effects; some of the variables might be correlated (for example, length, width, and weight, and so we might not need all three to predict the mpg value). Finally, we might not need all the 10 features anyways. I leave it to you to try with different combinations of features. (In the parseCarData function, take only a subset of the variables; for example take hp, weight, and number of speed and see which combination minimizes the mse value.)
Classification
Classification is very similar to linear regression. The algorithms take labeled points, and the train process has various parameters to tweak the algorithm to fit the needs of an application. The returned model can be used to predict the class of a labeled point. Here is a quick example using the titanic dataset:
For our example, we will keep the same structure as the linear regression example. First, we will parse the full dataset line and then later keep it simple by creating a labeled point with a set of selected features, as shown in the following code:
import org.apache.spark.SparkContext import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.tree.DecisionTree object Chapter0802 { // def getCurrentDirectory = new java.io.File( "." ).getCanonicalPath // // 0 pclass,1 survived,2 l.name,3.f.name, 4 sex,5 age,6 sibsp,7 parch,8 ticket,9 fare,10 cabin, // 11 embarked,12 boat,13 body,14 home.dest // def str2Double(x: String) : Double = { try { x.toDouble } catch { case e: Exception => 0.0 } } // def parsePassengerDataToLP(inpLine : String) : LabeledPoint = { val values = inpLine.split(',') //println(values) //println(values.length) // val pclass = str2Double(values(0)) val survived = str2Double(values(1)) // skip last name, first name var sex = 0 if (values(4) == "male") { sex = 1 } var age = 0.0 // a better choice would be the average of all ages age = str2Double(values(5)) // var sibsp = 0.0 age = str2Double(values(6)) // var parch = 0.0 age = str2Double(values(7)) // var fare = 0.0 fare = str2Double(values(9)) return new LabeledPoint(survived,Vectors.dense (pclass,sex,age,sibsp,parch,fare)) }
Now that we have setup the routines to parse the data, let’s dive into the main program:
// def main(args: Array[String]): Unit = { println(getCurrentDirectory) val sc = new SparkContext("local","Chapter 8") println(s"Running Spark Version ${sc.version}") // val dataFile = sc.textFile("/Users/ksankar/bdtc-2014 /titanic/titanic3_01.csv") val titanicRDDLP = dataFile.map(_.trim).filter( _.length > 1). map(line => parsePassengerDataToLP(line)) // println(titanicRDDLP.count()) //titanicRDDLP.foreach(println) // println(titanicRDDLP.first().label) println(titanicRDDLP.first().features) // val categoricalFeaturesInfo = Map[Int, Int]() val mdlTree = DecisionTree.trainClassifier(titanicRDDLP, 2, // numClasses categoricalFeaturesInfo, // all features are continuous "gini", // impurity 5, // Maxdepth 32) //maxBins // println(mdlTree.depth) println(mdlTree)
The tree is interesting to inspect. Check it out here:
// // Let us predict on the dataset and see how well it works. // In the real world, we should split the data to train & test and then predict the test data: // val predictions = mdlTree.predict(titanicRDDLP. map(x=>x.features)) val labelsAndPreds = titanicRDDLP. map(x=>x.label).zip(predictions) // val mse = labelsAndPreds.map( vp => math.pow( (vp._1 - vp._2),2 ) ). reduce(_+_) / labelsAndPreds.count() println("Mean Squared Error = " + "%6f".format(mse)) // // labelsAndPreds.foreach(println) // val correctVals = labelsAndPreds.aggregate(0.0)((x, rec) => x + (rec._1 == rec._2).compare(false), _ + _) val accuracy = correctVals/labelsAndPreds.count() println("Accuracy = " + "%3.2f%%".format(accuracy*100)) // println("*** Done ***") } }
The result obtained when you run the program is as expected. The printout of the tree is interesting, as shown here:
Running Spark Version 1.1.1 14/11/28 18:41:27 INFO MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=2061647216 [..] 14/11/28 18:41:27 INFO SparkContext: Job finished: count at Chapter0802.scala:56, took 0.260993 s 1309 14/11/28 18:41:27 INFO SparkContext: Starting job: first at Chapter0802.scala:59 [..] 14/11/28 18:41:27 INFO SparkContext: Job finished: first at Chapter0802.scala:59, took 0.016479 s 1.0 14/11/28 18:41:27 INFO SparkContext: Starting job: first at Chapter0802.scala:60 [..] 14/11/28 18:41:27 INFO SparkContext: Job finished: first at Chapter0802.scala:60, took 0.014408 s [1.0,0.0,0.0,0.0,0.0,211.3375] 14/11/28 18:41:27 INFO SparkContext: Starting job: take at DecisionTreeMetadata.scala:66 [..] 14/11/28 18:41:28 INFO DecisionTree: Internal timing for DecisionTree: 14/11/28 18:41:28 INFO DecisionTree: init: 0.36408 total: 0.95518 extractNodeInfo: 7.3E-4 findSplitsBins: 0.249814 extractInfoForLowerLevels: 7.74E-4 findBestSplits: 0.565394 chooseSplits: 0.201012 aggregation: 0.362411 5 DecisionTreeModel classifier If (feature 1 <= 0.0) If (feature 0 <= 2.0) If (feature 5 <= 26.0) If (feature 2 <= 1.0) If (feature 0 <= 1.0) Predict: 1.0 Else (feature 0 > 1.0) Predict: 1.0 Else (feature 2 > 1.0) Predict: 1.0 Else (feature 5 > 26.0) If (feature 2 <= 1.0) If (feature 5 <= 38.0021) Predict: 1.0 Else (feature 5 > 38.0021) Predict: 1.0 Else (feature 2 > 1.0) If (feature 5 <= 79.42500000000001) Predict: 1.0 Else (feature 5 > 79.42500000000001) Predict: 1.0 Else (feature 0 > 2.0) If (feature 5 <= 25.4667) If (feature 5 <= 7.2292) If (feature 5 <= 7.05) Predict: 1.0 Else (feature 5 > 7.05) Predict: 1.0 Else (feature 5 > 7.2292) If (feature 5 <= 15.5646) Predict: 0.0 Else (feature 5 > 15.5646) Predict: 1.0 Else (feature 5 > 25.4667) If (feature 5 <= 38.0021) If (feature 5 <= 30.6958) Predict: 0.0 Else (feature 5 > 30.6958) Predict: 0.0 Else (feature 5 > 38.0021) Predict: 0.0 Else (feature 1 > 0.0) If (feature 0 <= 1.0) If (feature 5 <= 26.0) If (feature 5 <= 7.05) If (feature 5 <= 0.0) Predict: 0.0 Else (feature 5 > 0.0) Predict: 0.0 Else (feature 5 > 7.05) Predict: 0.0 Else (feature 5 > 26.0) If (feature 5 <= 30.6958) If (feature 2 <= 0.0) Predict: 0.0 Else (feature 2 > 0.0) Predict: 0.0 Else (feature 5 > 30.6958) If (feature 2 <= 1.0) Predict: 0.0 Else (feature 2 > 1.0) Predict: 1.0 Else (feature 0 > 1.0) If (feature 2 <= 0.0) If (feature 5 <= 38.0021) If (feature 5 <= 14.4583) Predict: 0.0 Else (feature 5 > 14.4583) Predict: 0.0 Else (feature 5 > 38.0021) If (feature 0 <= 2.0) Predict: 0.0 Else (feature 0 > 2.0) Predict: 1.0 Else (feature 2 > 0.0) If (feature 5 <= 26.0) If (feature 2 <= 1.0) Predict: 0.0 Else (feature 2 > 1.0) Predict: 0.0 Else (feature 5 > 26.0) If (feature 0 <= 2.0) Predict: 0.0 Else (feature 0 > 2.0) Predict: 0.0 14/11/28 18:41:28 INFO SparkContext: Starting job: reduce at Chapter0802.scala:79 [..] 14/11/28 18:41:28 INFO SparkContext: Job finished: count at Chapter0802.scala:79, took 0.077973 s Mean Squared Error = 0.200153 14/11/28 18:41:28 INFO SparkContext: Starting job: aggregate at Chapter0802.scala:84 [..] 14/11/28 18:41:28 INFO SparkContext: Job finished: count at Chapter0802.scala:85, took 0.042592 s Accuracy = 79.98% *** Done ***
In the real world, one would create a training and a test dataset and train the model on the training dataset and then predict on the test dataset. Then we can calculate the mse and minimize it on various feature combinations, some of which could also be engineered features.
Clustering
Spark MLlib has implemented the k-means clustering algorithm. The model training and prediction interfaces are similar to other machine learning algorithms. Let’s see how it works by going through an example.
Let’s use a sample data that has two dimensions x and y. The plot of the points would look like the following screenshot:
From the preceding graph, we can see that four clusters form one solution. Let’s try with k=2 and k=4. Let’s see how the Spark clustering algorithm handles this dataset and the groupings:
import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.{Vector,Vectors} import org.apache.spark.mllib.clustering.KMeans object Chapter0803 { def parsePoints(inpLine : String) : Vector = { val values = inpLine.split(',') val x = values(0).toInt val y = values(1).toInt return Vectors.dense(x,y) } // def main(args: Array[String]): Unit = { val sc = new SparkContext("local","Chapter 8") println(s"Running Spark Version ${sc.version}") // val dataFile = sc.textFile("/Users/ksankar/bdtc-2014/cluster- points/cluster-points.csv") val points = dataFile.map(_.trim).filter( _.length > 1). map(line => parsePoints(line)) // println(points.count()) // var numClusters = 2 val numIterations = 20 var mdlKMeans = KMeans.train(points, numClusters, numIterations) // println(mdlKMeans.clusterCenters) // var clusterPred = points.map(x=>mdlKMeans.predict(x)) var clusterMap = points.zip(clusterPred) // clusterMap.foreach(println) // clusterMap.saveAsTextFile("/Users/ksankar/bdtc-2014/cluster- points/2-cluster.csv") // // Now let us try 4 centers: // numClusters = 4 mdlKMeans = KMeans.train(points, numClusters, numIterations) clusterPred = points.map(x=>mdlKMeans.predict(x)) clusterMap = points.zip(clusterPred) clusterMap.saveAsTextFile("/Users/ksankar/bdtc-2014/cluster- points/4-cluster.csv") clusterMap.foreach(println) } }
The results of the run would be as shown in the next screenshot (your run could give slightly different results):
The k=2 graph shown in the next screenshot looks as expected:
With k=4 the results are as shown in the following screenshot:
The plot shown in the following screenshot confirms that the clusters are obtained as expected. Spark does understand clustering!
Bear in mind that the results could vary a little between runs because the clustering algorithm picks the centers randomly and grows from there. With k=4, the results are stable; but with k=2, there is room for partitioning the points in different ways. Try it out a few times and see the results.
Recommendation
The recommendation algorithms fall under five general mechanisms, namely, knowledge-based, demographic-based, content-based, collaborative filtering (item-based or user-based), and latent factor-based. Usually, the collaborative filtering is computationally intensive—Spark implements the Alternating Least Square (ALS) algorithm authored by Yehuda Koren, available at http://dl.acm.org/citation.cfm?id=1608614. It is user-based collaborative filtering using the method of learning latent factors, which can scale to a large dataset. Let’s quickly use the movielens medium dataset to implement a recommendation using Spark.
There are some interesting RDD transformations. Apart from that, the code is not that complex, as shown next:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ // for implicit conversations import org.apache.spark.mllib.recommendation.Rating import org.apache.spark.mllib.recommendation.ALS object Chapter0804 { def parseRating1(line : String) : (Int,Int,Double,Int) = { //println(x) val x = line.split("::") val userId = x(0).toInt val movieId = x(1).toInt val rating = x(2).toDouble val timeStamp = x(3).toInt/10 return (userId,movieId,rating,timeStamp) } // def parseRating(x : (Int,Int,Double,Int)) : Rating = { val userId = x._1 val movieId = x._2 val rating = x._3 val timeStamp = x._4 // ignore return new Rating(userId,movieId,rating) } //
Now that we have the parsers in place, let’s focus on the main program, as shown next:
def main(args: Array[String]): Unit = { val sc = new SparkContext("local","Chapter 8") println(s"Running Spark Version ${sc.version}") // val moviesFile = sc.textFile("/Users/ksankar/bdtc- 2014/movielens/medium/movies.dat") val moviesRDD = moviesFile.map(line => line.split("::")) println(moviesRDD.count()) // val ratingsFile = sc.textFile("/Users/ksankar/bdtc- 2014/movielens/medium/ratings.dat") val ratingsRDD = ratingsFile.map(line => parseRating1(line)) println(ratingsRDD.count()) // ratingsRDD.take(5).foreach(println) // always check the RDD // val numRatings = ratingsRDD.count() val numUsers = ratingsRDD.map(r => r._1).distinct().count() val numMovies = ratingsRDD.map(r => r._2).distinct().count() println("Got %d ratings from %d users on %d movies.". format(numRatings, numUsers, numMovies))
Split the dataset into training, validation, and test. We can use any random dataset. But here we will use the last digit of the timestamp:
val trainSet = ratingsRDD.filter(x => (x._4 % 10) < 6) .map(x=>parseRating(x)) val validationSet = ratingsRDD.filter(x => (x._4 % 10) >= 6 & (x._4 % 10) < 8).map(x=>parseRating(x)) val testSet = ratingsRDD.filter(x => (x._4 % 10) >= 8) .map(x=>parseRating(x)) println("Training: "+ "%d".format(trainSet.count()) + ", validation: " + "%d".format(validationSet.count()) + ", test: " + "%d".format(testSet.count()) + ".") // // Now train the model using the training set: val rank = 10 val numIterations = 20 val mdlALS = ALS.train(trainSet,rank,numIterations) // // prepare validation set for prediction // val userMovie = validationSet.map { case Rating(user, movie, rate) =>(user, movie) } // // Predict and convert to Key-Value PairRDD val predictions = mdlALS.predict(userMovie).map { case Rating(user, movie, rate) => ((user, movie), rate) } // println(predictions.count()) predictions.take(5).foreach(println) // // Now convert the validation set to PairRDD: // val validationPairRDD = validationSet.map(r => ((r.user, r.product), r.rating)) println(validationPairRDD.count()) validationPairRDD.take(5).foreach(println) println(validationPairRDD.getClass()) println(predictions.getClass()) // // Now join the validation set with predictions. // Then we can figure out how good our recommendations are. // Tip: // Need to import org.apache.spark.SparkContext._ // Then MappedRDD would be converted implicitly to PairRDD // val ratingsAndPreds = validationPairRDD.join(predictions) println(ratingsAndPreds.count()) ratingsAndPreds.take(3).foreach(println) // val mse = ratingsAndPreds.map(r => { math.pow((r._2._1 - r._2._2),2) }).reduce(_+_) / ratingsAndPreds.count() val rmse = math.sqrt(mse) println("MSE = %2.5f".format(mse) + " RMSE = %2.5f" .format(rmse)) println("** Done **") } }
The run results, as shown in the next screenshot, are obtained as expected:
Check the following screenshot as well:
Some more information is available at:
- The Goodby MapReduce article from Mahout News (https://mahout.apache.org/)
- https://spark.apache.org/docs/latest/mllib-guide.html
- A Collaborative Filtering ALS paper (http://dl.acm.org/citation.cfm?id=1608614)
- A good presentation on decision trees (http://spark-summit.org/wp-content/uploads/2014/07/Scalable-Distributed-Decision-Trees-in-Spark-Made-Das-Sparks-Talwalkar.pdf)
- A recommended hands-on exercise from Spark Summit 2014 (https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html)
Summary
In this article, we looked at the most common machine learning algorithms. Naturally, ML is a vast subject and requires lot more study, experimentation, and practical experience on interesting data science problems. Two books that are relevant to Spark Machine Learning are Packt’s own books Machine Learning with Spark, Nick Pentreath, and O’Reilly’s Advanced Analytics with Spark, Sandy Ryza, Uri Laserson, Sean Owen, and Josh Wills. Both are excellent books that you can refer to.
Resources for Article:
Further resources on this subject:
- Driving Visual Analyses with Automobile Data (Python) [article]
- The Spark programming model [article]
- Using the Spark Shell [article]