12 min read

[box type=”note” align=”” class=”” width=””]This article is an excerpt taken from a book Mastering Apache Spark 2.x – Second Edition written by Romeo Kienzler. In this book, you will understand how memory management and binary processing, cache-aware computation, and code generation are used to speed up things  dramatically.[/box]

This article provides a working example of the Apache Spark MLlib Naive Bayes algorithm on the Road Safety – Digital Breath Test Data 2013. It will describe the theory behind the algorithm and will provide a step-by-step example in Scala to show how the algorithm may be used.

Theory on Classification

In order to use the Naive Bayes algorithm to classify a dataset, data must be linearly divisible; that is, the classes within the data must be linearly divisible by class boundaries. The following figure visually explains this with three datasets and two class boundaries shown via the dotted lines:

Naive Bayes classification

Naive Bayes assumes that the features (or dimensions) within a dataset are independent of one another; that is, they have no effect on each other. The following example considers the classification of e-mails as spam. If you have 100 e-mails, then perform the following:

60% of emails are spam

80% of spam emails contain the word buy

20% of spam emails don't contain the word buy 40% of emails are not spam

10% of non spam emails contain the word buy

90% of non spam emails don't contain the word buy

Let’s convert this example into conditional probabilities so that a Naive Bayes classifier can pick it up:

P(Spam) = the probability that an email is spam = 0.6

P(Not Spam) = the probability that an email is not spam = 0.4

P(Buy|Spam) = the probability that an email that is spam has the word buy = 0.8

P(Buy|Not Spam) = the probability that an email that is not spam has the word buy = 0.1

What is the probability that an e-mail that contains the word buy is spam? Well, this would be written as P (Spam|Buy). Naive Bayes says that it is described by the equation in the following figure:

Naive Bayes algorithm

So, using the previous percentage figures, we get the following:

P(Spam|Buy) = ( 0.8 * 0.6 ) / (( 0.8 * 0.6 ) + ( 0.1 * 0.4 ) ) = ( .48 ) / ( .48 + .04 )

= .48 / .52 = .923

This means that it is 92 percent more likely that an e-mail that contains the word buy is spam. That was a look at the theory; now it’s time to try a real-world example using the Apache Spark MLlib Naive Bayes algorithm.

Naive Bayes in practice

The first step is to choose some data that will be used for classification. We have chosen some data from the UK Government data website at http://data.gov.uk/dataset/road- accidents-safety-data.

The dataset is called Road Safety – Digital Breath Test Data 2013, which downloads a zipped text file called DigitalBreathTestData2013.txt. This file contains around half a million rows. The data looks as follows:

Reason,Month,Year,WeekType,TimeBand,BreathAlcohol,AgeBand,Gender

Suspicion of Alcohol,Jan,2013,Weekday,12am-4am,75,30-39,Male

Moving Traffic Violation,Jan,2013,Weekday,12am-4am,0,20-24,Male

Road Traffic Collision,Jan,2013,Weekend,12pm-4pm,0,20-24,Female

In order to classify the data, we have modified both the column layout and the number of columns. We have simply used Excel, given the data volume. However, if our data size had been in the big data range, we would have had to run some Scala code on top of Apache Spark for ETL (Extract Transform Load). As the following commands show, the data now resides in HDFS in the directory named /data/spark/nbayes. The file name is called DigitalBreathTestData2013- MALE2.csv. The line count from the Linux wc command shows that there are 467,000 rows. Finally, the following data sample shows that we have selected the columns, Gender, Reason, WeekType, TimeBand, BreathAlcohol, and AgeBand to classify. We will try to classify on the Gender column using the other columns as features:

[hadoop@hc2nn ~]$ hdfs dfs -cat

/data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv | wc -l 467054

[hadoop@hc2nn ~]$ hdfs dfs -cat

/data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv | head -5

Male,Suspicion of Alcohol,Weekday,12am-4am,75,30-39

Male,Moving Traffic Violation,Weekday,12am-4am,0,20-24

Male,Suspicion of Alcohol,Weekend,4am-8am,12,40-49

Male,Suspicion of Alcohol,Weekday,12am-4am,0,50-59

Female,Road Traffic Collision,Weekend,12pm-4pm,0,20-24

The Apache Spark MLlib classification function uses a data structure called LabeledPoint, which is a general purpose data representation defined at http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.mllib.regression.LabeledPoint and https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point.

This structure only accepts double values, which means that the text values in the previous data need to be classified numerically. Luckily, all of the columns in the data will convert to numeric categories, and we have provided a program in the software package with this book under the chapter2naive bayes directory to do just that. It is called convert.scala. It takes the contents of the DigitalBreathTestData2013- MALE2.csv file and converts each record into a double vector.

The directory structure and files for an sbt Scala-based development environment have already been described earlier. We are developing our Scala code on the Linux server using the Linux account, Hadoop. Next, the Linux pwd and ls commands show our top-level nbayes development directory with the bayes.sbt configuration file, whose contents have already been examined:

[hadoop@hc2nn nbayes]$ pwd

/home/hadoop/spark/nbayes [hadoop@hc2nn nbayes]$ ls

bayes.sbt target   project   src

The Scala code to run the Naive Bayes example is in the src/main/scala subdirectory under the nbayes directory:

[hadoop@hc2nn scala]$ pwd

/home/hadoop/spark/nbayes/src/main/scala

[hadoop@hc2nn scala]$ ls

bayes1.scala convert.scala

We will examine the bayes1.scala file later, but first, the text-based data on HDFS must be converted into numeric double values. This is where the convert.scala file is used. The code is as follows:

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

These lines import classes for the Spark context, the connection to the Apache Spark cluster, and the Spark configuration. The object that is being created is called convert1. It is an application as it extends the App class:

object convert1 extends App

{

The next line creates a function called enumerateCsvRecord. It has a parameter called colData, which is an array of Strings and returns String:

def enumerateCsvRecord( colData:Array[String]): String =

{

The function then enumerates the text values in each column, so, for instance, Male becomes 0. These numeric values are stored in values such as colVal1:

val colVal1 = colData(0) match

{

case "Male"                                                          => 0

case "Female"                                                     => 1

case "Unknown"                                                   => 2

case _                                                                     => 99

}

val colVal2 = colData(1) match

{

case "Moving Traffic Violation"           => 0 case "Other"     => 1

case "Road Traffic Collision"                => 2 case "Suspicion of Alcohol"                                                                 => 3

case _                                                                     => 99

}

val colVal3 = colData(2) match

{

case "Weekday"                                                     => 0

case "Weekend"                                                     => 0

case _                                                                        => 99

}

val colVal4 = colData(3) match

{

case "12am-4am"                                                => 0

case "4am-8am"                                                   => 1

case "8am-12pm"                                                => 2

case "12pm-4pm"                                                => 3

case "4pm-8pm"                                                   => 4

case "8pm-12pm"                                                => 5

case _                                                                     => 99

}

val colVal5 = colData(4) val colVal6 =

colData(5) match

{

case "16-19"                                                          => 0

case "20-24"                                                          => 1

case "25-29"                                                          => 2

case "30-39"                                                          => 3

case "40-49"                                                          => 4

case "50-59"                                                          => 5

case "60-69"                                                          => 6

case "70-98"                                                          => 7

case "Other"                                                          => 8

case _                                                                        => 99

}
Note: A comma-separated string called lineString is created from the numeric column        values and is then returned. The function closes with the final brace character. Note that the data line created next starts with a label value at column one and is followed by a vector, which represents the data.

The vector is space-separated while the label is separated from the vector by a comma. Using these two separator types allows us to process both–the label and vector–in two simple steps:

val lineString = colVal1+","+colVal2+" "+colVal3+" "+colVal4+" "+colVal5+" "+colVal6

return lineString

}

The main script defines the HDFS server name and path. It defines the input file and the output path in terms of these values. It uses the Spark URL and application name to create a new configuration. It then creates a new context or connection to Spark using these details:

val hdfsServer = "hdfs://localhost:8020"

val hdfsPath     = "/data/spark/nbayes/"

val inDataFile = hdfsServer + hdfsPath + "DigitalBreathTestData2013-

MALE2.csv"

val outDataFile = hdfsServer + hdfsPath + "result"

val sparkMaster = "spark://localhost:7077"

val appName = "Convert 1"

Val sparkConf = new SparkConf() sparkConf.setMaster(sparkMaster) sparkConf.setAppName(appName)

val sparkCxt = new SparkContext(sparkConf)

The CSV-based raw data file is loaded from HDFS using the Spark context textFile method. Then, a data row count is printed:

val csvData = sparkCxt.textFile(inDataFile) println("Records in : "+ csvData.count() )

The CSV raw data is passed line by line to the enumerateCsvRecord function. The returned string-based numeric data is stored in the enumRddData variable:

val enumRddData = csvData.map

{

csvLine =>

val colData = csvLine.split(',') enumerateCsvRecord(colData)

}

Finally, the number of records in the enumRddData variable is printed, and the enumerated data is saved to HDFS:

println("Records out : "+ enumRddData.count() ) enumRddData.saveAsTextFile(outDataFile)

} // end object

In order to run this script as an application against Spark, it must be compiled. This is carried out with the sbt package command, which also compiles the code. The following command is run from the nbayes directory:

[hadoop@hc2nn nbayes]$ sbt package

Loading /usr/share/sbt/bin/sbt-launch-lib.bash

....

[info] Done packaging.

[success] Total time: 37 s, completed Feb 19, 2015 1:23:55 PM

This causes the compiled classes that are created to be packaged into a JAR library, as shown here:

[hadoop@hc2nn nbayes]$ pwd

/home/hadoop/spark/nbayes

[hadoop@hc2nn nbayes]$ ls -l target/scala-2.10 total 24

drwxrwxr-x 2 hadoop hadoop 4096 Feb 19 13:23 classes

-rw-rw-r-- 1 hadoop hadoop 17609 Feb 19 13:23 naive-bayes_2.10-1.0.jar

The convert1 application can now be run against Spark using the application name, Spark URL, and full path to the JAR file that was created. Some extra parameters specify memory and the maximum cores that are supposed to be used:

spark-submit 

--class convert1 

--master spark://localhost:7077 

--executor-memory 700M 

--total-executor-cores 100 

/home/hadoop/spark/nbayes/target/scala-2.10/naive-bayes_2.10-1.0.jar

This creates a data directory on HDFS called /data/spark/nbayes/ followed by the result, which contains part files with the processed data:

[hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes Found 2 items

-rw-r--r--   3 hadoop supergroup 24645166 2015-01-29 21:27

/data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv

drwxr-xr-x   - hadoop supergroup       0 2015-02-19 13:36

/data/spark/nbayes/result

[hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes/result Found 3 items

-rw-r--r--   3 hadoop supergroup       0 2015-02-19 13:36

/data/spark/nbayes/result/_SUCCESS

-rw-r--r--   3 hadoop supergroup 2828727 2015-02-19 13:36

/data/spark/nbayes/result/part-00000

-rw-r--r--   3 hadoop supergroup 2865499 2015-02-19 13:36

/data/spark/nbayes/result/part-00001

In the following HDFS cat command, we concatenated the part file data into a file called DigitalBreathTestData2013-MALE2a.csv. We then examined the top five lines of the file using the head command to show that it is numeric. Finally, we loaded it in HDFS with the put command:

[hadoop@hc2nn nbayes]$ hdfs dfs -cat /data/spark/nbayes/result/part* >

./DigitalBreathTestData2013-MALE2a.csv
0,3 0 0 75 3
0,0 0 0 0 1
0,3 0 1 12 4
0,3 0 0 0 5
1,2 0 3 0 1
[hadoop@hc2nn nbayes]$ head -5 DigitalBreathTestData2013-MALE2a.csv

naive bayes algorithm

[hadoop@hc2nn nbayes]$ hdfs dfs -put ./DigitalBreathTestData2013-MALE2a.csv

/data/spark/nbayes

The following HDFS ls command now shows the numeric data file stored on HDFS in the nbayes directory:

[hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes

Found 3 items

-rw-r--r--   3 hadoop supergroup 24645166 2015-01-29 21:27

/data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv

-rw-r--r--   3 hadoop supergroup 5694226 2015-02-19 13:39

/data/spark/nbayes/DigitalBreathTestData2013-MALE2a.csv drwxr-xr-x - hadoop supergroup                           0 2015-02-19 13:36

/data/spark/nbayes/result

Now that the data has been converted into a numeric form, it can be processed with the MLlib Naive Bayes algorithm; this is what the Scala file, bayes1.scala, does. This file imports the same configuration and context classes as before. It also imports MLlib classes for Naive Bayes, vectors, and the LabeledPoint structure. The application class that is created this time is called bayes1:

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.regression.LabeledPoint object bayes1 extends App {

The HDFS data file is again defined, and a Spark context is created as before:

val hdfsServer = "hdfs://localhost:8020" val hdfsPath      = "/data/spark/nbayes/"

val dataFile = hdfsServer+hdfsPath+"DigitalBreathTestData2013-MALE2a.csv" val sparkMaster = "spark://loclhost:7077"

val appName = "Naive Bayes 1" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName)

val sparkCxt = new SparkContext(conf)

The raw CSV data is loaded and split by the separator characters. The first column becomes the label (Male/Female) that the data will be classified on. The final columns separated by spaces become the classification features:

val csvData = sparkCxt.textFile(dataFile) val ArrayData = csvData.map {

csvLine =>

val colData = csvLine.split(',') LabeledPoint(colData(0).toDouble,

Vectors.dense(colData(1)

.split('')

.map(_.toDouble)

)

)

}

The data is then randomly divided into training (70%) and testing (30%) datasets:

val divData = ArrayData.randomSplit(Array(0.7, 0.3), seed = 13L) val trainDataSet = divData(0)

val testDataSet = divData(1)

The Naive Bayes MLlib function can now be trained using the previous training set. The trained Naive Bayes model, held in the nbTrained variable, can then be used to predict the Male/Female result labels against the testing data:

val nbTrained = NaiveBayes.train(trainDataSet)

val nbPredict = nbTrained.predict(testDataSet.map(_.features))

Given that all of the data already contained labels, the original and predicted labels for the test data can be compared. An accuracy figure can then be computed to determine how accurate the predictions were, by comparing the original labels with the prediction values:

val predictionAndLabel = nbPredict.zip(testDataSet.map(_.label)) val accuracy = 100.0 * predictionAndLabel.filter(x => x._1 ==

x._2).count() / testDataSet.count()

println( "Accuracy : " + accuracy );

}

So, this explains the Scala Naive Bayes code example. It’s now time to run the compiled bayes1 application using spark-submit and determine the classification accuracy. The parameters are the same. It’s just the class name that has changed:

spark-submit 

--class bayes1 

--master spark://hc2nn.semtech-solutions.co.nz:7077 

--executor-memory 700M 

--total-executor-cores 100 

/home/hadoop/spark/nbayes/target/scala-2.10/naive-bayes_2.10-1.0.jar

The resulting accuracy given by the Spark cluster is just 43 percent, which seems to imply that this data is not suitable for Naive Bayes:

Accuracy: 43.30

We have seen how with the help of Apache Spark MLib, one can perform a successful classification on Naive Bayes algorithm.

If you found this post useful, do check  out this book Mastering Apache Spark 2.x – Second Edition to learn about the latest enhancements to Apache Spark 2.x, such as interactive querying of live data and unifying DataFrames and Datasets.

Mastering Apache Spark 2.x

 

A Data science fanatic. Loves to be updated with the tech happenings around the globe. Loves singing and composing songs. Believes in putting the art in smart.

LEAVE A REPLY

Please enter your comment!
Please enter your name here