14 min read

In this article by Alexander Kozlov, author of the book Mastering Scala Machine Learning, we will discuss how to download the pre-build Spark package from http://spark.apache.org/downloads.html,if you haven’t done so yet. The latest release of  Spark, at the time of writing, is 1.6.1:

Figure 3-1: The download site at http://spark.apache.org with recommended selections for this article

(For more resources related to this topic, see here.)

Alternatively, you can build the Spark by downloading the full source distribution from https://github.com/apache/spark:

$ git clone https://github.com/apache/spark.git

Cloning into 'spark'...

remote: Counting objects: 301864, done.

...

$ cd spark

$sh ./ dev/change-scala-version.sh 2.11

...

$./make-distribution.sh --name alex-build-2.6-yarn --skip-java-test --tgz -Pyarn -Phive -Phive-thriftserver -Pscala-2.11 -Phadoop-2.6

...

The command will download the necessary dependencies and create the spark-2.0.0-SNAPSHOT-bin-alex-spark-build-2.6-yarn.tgz file in the Spark directory; the version is 2.0.0, as it is the next release version at the time of writing. In general, you do not want to build from trunk unless you are interested in the latest features. If you want a released version, you can visit the corresponding tag. Full list of available versions is available via the git branch –r command. The spark*.tgz file is all you need to run Spark on any machine that has Java JRE.

The distribution comes with the docs/building-spark.md document that describes other options for building Spark and their descriptions, including incremental Scala compiler zinc. Full Scala 2.11 support is in the works for the next Spark 2.0.0 release.

Applications

Let’s consider a few practical examples and libraries in Spark/Scala starting with a very traditional problem of word counting.

Word count

Most modern machine learning algorithms require multiple passes over data. If the data fits in the memory of a single machine, the data is readily available and this does not present a performance bottleneck. However, if the data becomes too large to fit into RAM, one has a choice of either dumping pieces of the data on disk (or database), which is about 100 times slower, but has a much larger capacity, or splitting the dataset between multiple machines across the network and transferring the results. While there are still ongoing debates, for most practical systems, analysis shows that storing the data over a set of network connected nodes has a slight advantage over repeatedly storing and reading it from hard disks on a single node, particularly if we can split the workload effectively between multiple CPUs.

An average disk has bandwidth of about 100 MB/sec and transfers with a few mms latency, depending on the rotation speed and caching. This is about 100 times slower than reading the data from memory, depending on the data size and caching implementation again. Modern data bus can transfer data at over 10 GB/sec. While the network speed still lags behind the direct memory access, particularly with standard TCP/IP kernel networking layer overhead, specialized hardware can reach tens of GB/sec and if run in parallel, it can be potentially as fast as reading from the memory. In practice, the network-transfer speeds are somewhere between 1 to 10 GB/sec, but still faster than the disk in most practical systems. Thus, we can potentially fit the data into combined memory of all the cluster nodes and perform iterative machine learning algorithms across a system of them.

One problem with memory, however, is that it is does not persist across node failures and reboots. A popular big data framework, Hadoop, made possible with the help of the original Dean/Ghemawat paper (Jeff Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters, OSDI, 2004.), is using exactly the disk layer persistence to guarantee fault tolerance and store intermediate results. A Hadoop MapReduce program would first run a map function on each row of a dataset, emitting one or more key/value pairs. These key/value pairs then would be sorted, grouped, and aggregated by key so that the records with the same key would end up being processed together on the same reducer, which might be running on same or another node. The reducer applies a reduce function that traverses all the values that were emitted for the same key and aggregates them accordingly. The persistence of intermediate results would guarantee that if a reducer fails for one or another reason, the partial computations can be discarded and the reduce computation can be restarted from the checkpoint-saved results. Many simple ETL-like applications traverse the dataset only once with very little information preserved as state from one record to another.

For example, one of the traditional applications of MapReduce is word count. The program needs to count the number of occurrences of each word in a document consisting of lines of text. In Scala, the word count is readily expressed as an application of the foldLeft method on a sorted list of words:

val lines = scala.io.Source.fromFile("...").getLines.toSeq

val counts = lines.flatMap(line => line.split("\W+")).sorted.

  foldLeft(List[(String,Int)]()){ (r,c) =>

    r match {

      case (key, count) :: tail =>

        if (key == c) (c, count+1) :: tail

        else (c, 1) :: r

        case Nil =>

          List((c, 1))

  }

}

If I run this program, the output will be a list of (word, count) tuples. The program splits the lines into words, sorts the words, and then matches each word with the latest entry in the list of (word, count) tuples. The same computation in MapReduce would be expressed as follows:

val linesRdd = sc.textFile("hdfs://...")

val counts = linesRdd.flatMap(line => line.split("\W+"))

    .map(_.toLowerCase)

    .map(word => (word, 1)).

    .reduceByKey(_+_)

counts.collect

First, we need to process each line of the text by splitting the line into words and generation (word, 1) pairs. This task is easily parallelized. Then, to parallelize the global count, we need to split the counting part by assigning a task to do the count for a subset of words. In Hadoop, we compute the hash of the word and divide the work based on the value of the hash.

Once the map task finds all the entries for a given hash, it can send the key/value pairs to the reducer, the sending part is usually called shuffle in MapReduce vernacular. A reducer waits until it receives all the key/value pairs from all the mappers, combines the values—a partial combine can also happen on the mapper, if possible—and computes the overall aggregate, which in this case is just sum. A single reducer will see all the values for a given word.

Let’s look at the log output of the word count operation in Spark (Spark is very verbose by default, you can manage the verbosity level by modifying the conf/log4j.properties file by replacing INFO with ERROR or FATAL):

$ wget http://mirrors.sonic.net/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz

$ tar xvf spark-1.6.1-bin-hadoop2.6.tgz

$ cd spark-1.6.1-bin-hadoop2.6

$ mkdir leotolstoy

$ (cd leotolstoy; wget http://www.gutenberg.org/files/1399/1399-0.txt)

$ bin/spark-shell

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _ / _ / _ `/ __/  '_/

   /___/ .__/_,_/_/ /_/_   version 1.6.1

      /_/

 

Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)

Type in expressions to have them evaluated.

Type :help for more information.

Spark context available as sc.

SQL context available as sqlContext.

scala> val linesRdd = sc.textFile("leotolstoy", minPartitions=10)

linesRdd: org.apache.spark.rdd.RDD[String] = leotolstoy MapPartitionsRDD[3] at textFile at <console>:27

At this stage, the only thing that happened is metadata manipulations, Spark has not touched the data itself. Spark estimates that the size of the dataset and the number of partitions. By default, this is the number of HDFS blocks, but we can specify the minimum number of partitions explicitly with the minPartitions parameter:

scala> val countsRdd = linesRdd.flatMap(line => line.split("\W+")).

     | map(_.toLowerCase).

     | map(word => (word, 1)).

     | reduceByKey(_+_)

countsRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:31

We just defined another RDD derived from the original linesRdd:

scala> countsRdd.collect.filter(_._2 > 99)

res3: Array[(String, Int)] = Array((been,1061), (them,841), (found,141), (my,794), (often,105), (table,185), (this,1410), (here,364), (asked,320), (standing,132), ("",13514), (we,592), (myself,140), (is,1454), (carriage,181), (got,277), (won,153), (girl,117), (she,4403), (moment,201), (down,467), (me,1134), (even,355), (come,667), (new,319), (now,872), (upon,207), (sister,115), (veslovsky,110), (letter,125), (women,134), (between,138), (will,461), (almost,124), (thinking,159), (have,1277), (answer,146), (better,231), (men,199), (after,501), (only,654), (suddenly,173), (since,124), (own,359), (best,101), (their,703), (get,304), (end,110), (most,249), (but,3167), (was,5309), (do,846), (keep,107), (having,153), (betsy,111), (had,3857), (before,508), (saw,421), (once,334), (side,163), (ough...

Word count over 2 GB of text data—40,291 lines and 353,087 words—took under a second to read, split, and group by words.

With extended logging, you could see the following:

  • Spark opens a few ports to communicate with the executors and users
  • Spark UI runs on port 4040 on http://localhost:4040
  • You can read the file either from local or distributed storage (HDFS, Cassandra, and S3)
  • Spark will connect to Hive if Spark is built with Hive support
  • Spark uses lazy evaluation and executes the pipeline only when necessary or when output is required
  • Spark uses internal scheduler to split the job into tasks, optimize the execution, and execute the tasks
  • The results are stored into RDDs, which can either be saved or brought into RAM of the node executing the shell with collect method

The art of parallel performance tuning is to split the workload between different nodes or threads so that the overhead is relatively small and the workload is balanced.

Streaming word count

Spark supports listening on incoming streams, partitioning it, and computing aggregates close to real-time. Currently supported sources are Kafka, Flume, HDFS/S3, Kinesis, Twitter, as well as the traditional MQs such as ZeroMQ and MQTT. In Spark, streaming is implemented as micro-batches. Internally, Spark divides input data into micro-batches, usually from subseconds to minutes in size and performs RDD aggregation operations on these micro-batches.

For example, let’s extend the Flume example that we covered earlier. We’ll need to modify the Flume configuration file to create a Spark polling sink. Instead of HDFS, replace the sink section:

# The sink is Spark

a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink

a1.sinks.k1.hostname=localhost

a1.sinks.k1.port=4989

Now, instead of writing to HDFS, Flume will wait for Spark to poll for data:

object FlumeWordCount {

  def main(args: Array[String]) {

    // Create the context with a 2 second batch size

    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("FlumeWordCount")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    ssc.checkpoint("/tmp/flume_check")

    val hostPort=args(0).split(":")

    System.out.println("Opening a sink at host: [" + hostPort(0) + 
      "] port: [" + hostPort(1).toInt + "]")

    val lines = FlumeUtils.createPollingStream(ssc, hostPort(0), 
      hostPort(1).toInt, StorageLevel.MEMORY_ONLY)

    val words = lines

      .map(e => new String(e.event.getBody.array)).
        map(_.toLowerCase).flatMap(_.split("\W+"))

      .map(word => (word, 1L))

      .reduceByKeyAndWindow(_+_, _-_, Seconds(6), 
        Seconds(2)).print

    ssc.start()

    ssc.awaitTermination()

  }

}

To run the program, start the Flume agent in one window:

$ ./bin/flume-ng agent -Dflume.log.level=DEBUG,console -n a1 –f ../chapter03/conf/flume-spark.conf

...

Then run the FlumeWordCount object in another:

$ cd ../chapter03

$ sbt "run-main org.akozlov.chapter03.FlumeWordCount localhost:4989

...

Now, any text typed to the netcat connection will be split into words and counted every two seconds for a six second sliding window:

$ echo "Happy families are all alike; every unhappy family is unhappy in its own way" | nc localhost 4987

...

-------------------------------------------

Time: 1464161488000 ms

-------------------------------------------

(are,1)

(is,1)

(its,1)

(family,1)

(families,1)

(alike,1)

(own,1)

(happy,1)

(unhappy,2)

(every,1)

...

 

-------------------------------------------

Time: 1464161490000 ms

-------------------------------------------

(are,1)

(is,1)

(its,1)

(family,1)

(families,1)

(alike,1)

(own,1)

(happy,1)

(unhappy,2)

(every,1)

...

Spark/Scala allows to seamlessly switch between the streaming sources. For example, the same program for Kafka publish/subscribe topic model looks similar to the following:

object KafkaWordCount {

  def main(args: Array[String]) {

    // Create the context with a 2 second batch size

    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("KafkaWordCount")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    ssc.checkpoint("/tmp/kafka_check")

    System.out.println("Opening a Kafka consumer at zk:
      [" + args(0) + "] for group group-1 and topic example")

    val lines = KafkaUtils.createStream(ssc, args(0), "group-1", 
      Map("example" -> 1), StorageLevel.MEMORY_ONLY)

    val words = lines

      .flatMap(_._2.toLowerCase.split("\W+"))

      .map(word => (word, 1L))

      .reduceByKeyAndWindow(_+_, _-_, Seconds(6), 
        Seconds(2)).print

    ssc.start()

    ssc.awaitTermination()

  }

}

To start the Kafka broker, first download the latest binary distribution and start ZooKeeper. ZooKeeper is a distributed-services coordinator and is required by Kafka even in a single-node deployment:

$ wget http://apache.cs.utah.edu/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz

...

$ tar xf kafka_2.11-0.9.0.1.tgz

$ bin/zookeeper-server-start.sh config/zookeeper.properties

...

In another window, start the Kafka server:

$ bin/kafka-server-start.sh config/server.properties

...

Run the KafkaWordCount object:

$ $ sbt "run-main org.akozlov.chapter03.KafkaWordCount localhost:2181"

...

Now, publishing the stream of words into the Kafka topic will produce the window counts:

$ echo "Happy families are all alike; every unhappy family is unhappy in its own way" | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic example

...
$ sbt "run-main org.akozlov.chapter03.FlumeWordCount localhost:4989

...

-------------------------------------------

Time: 1464162712000 ms

-------------------------------------------

(are,1)

(is,1)

(its,1)

(family,1)

(families,1)

(alike,1)

(own,1)

(happy,1)

(unhappy,2)

(every,1)

As you see, the programs output every two seconds. Spark streaming is sometimes called micro-batch processing. Streaming has many other applications (and frameworks), but this is too big of a topic to be entirely considered here and needs to be covered separately.

Spark SQL and DataFrame

DataFrame was a relatively recent addition to Spark, introduced in version 1.3, allowing one to use the standard SQL language for data analysis. SQL is really great for simple exploratory analysis and data aggregations.

According to the latest poll results, about 70% of Spark users use DataFrame. Although DataFrame recently became the most popular framework for working with tabular data, it is relatively a heavyweight object. The pipelines that use DataFrames may execute much slower than the ones that are based on Scala’s vector or LabeledPoint, which will be discussed in the next chapter. The evidence from different developers is that the response times can be driven to tens or hundreds of milliseconds, depending on the query from submillisecond on simpler objects.

Spark implements its own shell for SQL, which can be invoked additionally to the standard Scala REPL shell: ./bin/spark-sql can be used to access the existing Hive/Impala or relational DB tables:

$ ./bin/spark-sql

…

spark-sql> select min(duration), max(duration), avg(duration) from kddcup;

…

0  58329  48.34243046395876

Time taken: 11.073 seconds, Fetched 1 row(s)

In standard Spark’s REPL, the same query can performed by running the following command:

$ ./bin/spark-shell

…

scala> val df = sqlContext.sql("select min(duration), max(duration), avg(duration) from kddcup"

16/05/12 13:35:34 INFO parse.ParseDriver: Parsing command: select min(duration), max(duration), avg(duration) from alex.kddcup_parquet

16/05/12 13:35:34 INFO parse.ParseDriver: Parse Completed

df: org.apache.spark.sql.DataFrame = [_c0: bigint, _c1: bigint, _c2: double]

scala> df.collect.foreach(println)

…

16/05/12 13:36:32 INFO scheduler.DAGScheduler: Job 2 finished: collect at <console>:22, took 4.593210 s

[0,58329,48.34243046395876]

Summary

In this article, we discussed Spark and Hadoop and their relationship with Scala. We also discussed functional programming at a very high level. We then considered a classic word count example and it’s implementation in Scala and Spark.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here