29 min read

In this article by Sumit Gupta and Shilpi Saxena, the authors of Real-Time Big Data Analytics, we will discuss the architecture of Spark and its various components in detail. We will also briefly talk about the various extensions/libraries of Spark, which are developed over the core Spark framework.

(For more resources related to this topic, see here.)

Spark is a general-purpose computing engine that initially focused to provide solutions to the iterative and interactive computations and workloads. For example, machine learning algorithms, which reuse intermediate or working datasets across multiple parallel operations.

The real challenge with iterative computations was the dependency of the intermediate data/steps on the overall job. This intermediate data needs to be cached in the memory itself for faster computations because flushing and reading from a disk was an overhead, which, in turn, makes the overall process unacceptably slow.

The creators of Apache Spark not only provided scalability, fault tolerance, performance, distributed data processing but also provided in-memory processing of distributed data over the cluster of nodes.

To achieve this, a new layer abstraction of distributed datasets that is partitioned over the set of machines (cluster) was introduced, which can be cached in the memory to reduce the latency. This new layer of abstraction was known as resilient distributed datasets (RDD).

RDD, by definition, is an immutable (read-only) collection of objects partitioned across a set of machines that can be rebuilt if a partition is lost.

It is important to note that Spark is capable of performing in-memory operations, but at the same time, it can also work on the data stored on the disk.

High-level architecture

Spark provides a well-defined and layered architecture where all its layers and components are loosely coupled and integration with external components/libraries/extensions is performed using well-defined contracts.

Here is the high-level architecture of Spark 1.5.1 and its various components/layers:

Real-Time Big Data Analytics

The preceding diagram shows the high-level architecture of Spark. Let’s discuss the roles and usage of each of the architecture components:

  • Physical machines: This layer represents the physical or virtual machines/nodes on which Spark jobs are executed. These nodes collectively represent the total capacity of the cluster with respect to the CPU, memory, and data storage.
  • Data storage layer: This layer provides the APIs to store and retrieve the data from the persistent storage area to Spark jobs/applications. This layer is used by Spark workers to dump data on the persistent storage whenever the cluster memory is not sufficient to hold the data. Spark is extensible and capable of using any kind of filesystem. RDD, which hold the data, are agnostic to the underlying storage layer and can persist the data in various persistent storage areas, such as local filesystems, HDFS, or any other NoSQL database such as HBase, Cassandra, MongoDB, S3, and Elasticsearch.
  • Resource manager: The architecture of Spark abstracts out the deployment of the Spark framework and its associated applications. Spark applications can leverage cluster managers such as YARN (http://tinyurl.com/pcymnnf) and Mesos (http://mesos.apache.org/) for the allocation and deallocation of various physical resources, such as the CPU and memory for the client jobs. The resource manager layer provides the APIs that are used to request for the allocation and deallocation of available resource across the cluster.
  • Spark core libraries: The Spark core library represents the Spark Core engine, which is responsible for the execution of the Spark jobs. It contains APIs for in-memory distributed data processing and a generalized execution model that supports a wide variety of applications and languages.
  • Spark extensions/libraries: This layer represents the additional frameworks/APIs/libraries developed by extending the Spark core APIs to support different use cases. For example, Spark SQL is one such extension, which is developed to perform ad hoc queries and interactive analysis over large datasets.

The preceding architecture should be sufficient enough to understand the various layers of abstraction provided by Spark. All the layers are loosely coupled, and if required, can be replaced or extended as per the requirements.

Spark extensions is one such layer that is widely used by architects and developers to develop custom libraries. Let’s move forward and talk more about Spark extensions, which are available for developing custom applications/jobs.

Spark extensions/libraries

In this section, we will briefly discuss the usage of various Spark extensions/libraries that are available for different use cases.

The following are the extensions/libraries available with Spark 1.5.1:

  • Spark Streaming: Spark Streaming, as an extension, is developed over the core Spark API. It enables scalable, high-throughput, and fault-tolerant stream processing of live data streams. Spark Streaming enables the ingestion of data from various sources, such as Kafka, Flume, Kinesis, or TCP sockets. Once the data is ingested, it can be further processed using complex algorithms that are expressed with high-level functions, such as map, reduce, join, and window. Finally, the processed data can be pushed out to filesystems, databases, and live dashboards. In fact, Spark Streaming also facilitates the usage Spark’s machine learning and graph processing algorithms on data streams. For more information, refer to http://spark.apache.org/docs/latest/streaming-programming-guide.html.
  • Spark MLlib: Spark MLlib is another extension that provides the distributed implementation of various machine learning algorithms. Its goal is to make practical machine learning library scalable and easy to use. It provides implementation of various common machine learning algorithms used for classification, regression, clustering, and many more. For more information, refer to http://spark.apache.org/docs/latest/mllib-guide.html.
  • Spark GraphX: GraphX provides the API to create a directed multigraph with properties attached to each vertex and edges. It also provides the various common operators used for the aggregation and distributed implementation of various graph algorithms, such as PageRank and triangle counting. For more information, refer to http://spark.apache.org/docs/latest/graphx-programming-guide.html.
  • Spark SQL: Spark SQL provides the distributed processing of structured data and facilitates the execution of relational queries, which are expressed in a structured query language. (http://en.wikipedia.org/wiki/SQL). It provides the high level of abstraction known as DataFrames, which is a distributed collection of data organized into named columns. For more information, refer to http://spark.apache.org/docs/latest/sql-programming-guide.html.
  • SparkR: R (https://en.wikipedia.org/wiki/R_(programming_language) is a popular programming language used for statistical computing and performing machine learning tasks. However, the execution of the R language is single threaded, which makes it difficult to leverage in order to process large data (TBs or PBs). R can only process the data that fits into the memory of a single machine. In order to overcome the limitations of R, Spark introduced a new extension: SparkR. SparkR provides an interface to invoke and leverage Spark distributed execution engine from R, which allows us to run large-scale data analysis from the R shell. For more information, refer to http://spark.apache.org/docs/latest/sparkr.html.

All the previously listed Spark extension/libraries are part of the standard Spark distribution. Once we install and configure Spark, we can start using APIs that are exposed by the extensions.

Apart from the earlier extensions, Spark also provides various other external packages that are developed and provided by the open source community. These packages are not distributed with the standard Spark distribution, but they can be searched and downloaded from http://spark-packages.org/. Spark packages provide libraries/packages for integration with various data sources, management tools, higher level domain-specific libraries, machine learning algorithms, code samples, and other Spark content.

Let’s move on to the next section where we will dive deep into the Spark packaging structure and execution model, and we will also talk about various other Spark components.

Spark packaging structure and core APIs

In this section, we will briefly talk about the packaging structure of the Spark code base. We will also discuss core packages and APIs, which will be frequently used by the architects and developers to develop custom applications with Spark.

Spark is written in Scala (http://www.scala-lang.org/), but for interoperability, it also provides the equivalent APIs in Java and Python as well.

For brevity, we will only talk about the Scala and Java APIs, and for Python APIs, users can refer to https://spark.apache.org/docs/1.5.1/api/python/index.html.

A high-level Spark code base is divided into the following two packages:

  • Spark extensions: All APIs for a particular extension is packaged in its own package structure. For example, all APIs for Spark Streaming are packaged in the org.apache.spark.streaming.* package, and the same packaging structure goes for other extensions: Spark MLlib—org.apache.spark.mllib.*, Spark SQL—org.apcahe.spark.sql.*, Spark GraphX—org.apache.spark.graphx.*.

    For more information, refer to http://tinyurl.com/q2wgar8 for Scala APIs and http://tinyurl.com/nc4qu5l for Java APIs.

  • Spark Core: Spark Core is the heart of Spark and provides two basic components: SparkContext and SparkConfig. Both of these components are used by each and every standard or customized Spark job or Spark library and extension. The terms/concepts Context and Config are not new and more or less they have now become a standard architectural pattern. By definition, a Context is an entry point of the application that provides access to various resources/features exposed by the framework, whereas a Config contains the application configurations, which helps define the environment of the application.

Let’s move on to the nitty-gritty of the Scala APIs exposed by Spark Core:

  • org.apache.spark: This is the base package for all Spark APIs that contains a functionality to create/distribute/submit Spark jobs on the cluster.
  • org.apache.spark.SparkContext: This is the first statement in any Spark job/application. It defines the SparkContext and then further defines the custom business logic that is is provided in the job/application. The entry point for accessing any of the Spark features that we may want to use or leverage is SparkContext, for example, connecting to the Spark cluster, submitting jobs, and so on. Even the references to all Spark extensions are provided by SparkContext. There can be only one SparkContext per JVM, which needs to be stopped if we want to create a new one. The SparkContext is immutable, which means that it cannot be changed or modified once it is started.
  • org.apache.spark.rdd.RDD.scala: This is another important component of Spark that represents the distributed collection of datasets. It exposes various operations that can be executed in parallel over the cluster. The SparkContext exposes various methods to load the data from HDFS or the local filesystem or Scala collections, and finally, create an RDD on which various operations such as map, filter, join, and persist can be invoked. RDD also defines some useful child classes within the org.apache.spark.rdd.* package such as PairRDDFunctions to work with key/value pairs, SequenceFileRDDFunctions to work with Hadoop sequence files, and DoubleRDDFunctions to work with RDDs of doubles. We will read more about RDD in the subsequent sections.
  • org.apache.spark.annotation: This package contains the annotations, which are used within the Spark API. This is the internal Spark package, and it is recommended that you do not to use the annotations defined in this package while developing our custom Spark jobs. The three main annotations defined within this package are as follows:
    • DeveloperAPI: All those APIs/methods, which are marked with DeveloperAPI, are for advance usage where users are free to extend and modify the default functionality. These methods may be changed or removed in the next minor or major releases of Spark.
    • Experimental: All functions/APIs marked as Experimental are officially not adopted by Spark but are introduced temporarily in a specific release. These methods may be changed or removed in the next minor or major releases.
    • AlphaComponent: The functions/APIs, which are still being tested by the Spark community, are marked as AlphaComponent. These are not recommended for production use and may be changed or removed in the next minor or major releases.
  • org.apache.spark.broadcast: This is one of the most important packages, which are frequently used by developers in their custom Spark jobs. It provides the API for sharing the read-only variables across the Spark jobs. Once the variables are defined and broadcast, they cannot be changed. Broadcasting the variables and data across the cluster is a complex task, and we need to ensure that an efficient mechanism is used so that it improves the overall performance of the Spark job and does not become an overhead.
  • Spark provides two different types of implementations of broadcasts—HttpBroadcast and TorrentBroadcast. The HttpBroadcast broadcast leverages the HTTP server to fetch/retrieve the data from Spark driver. In this mechanism, the broadcast data is fetched through an HTTP Server running at the driver itself and further stored in the executor block manager for faster accesses. The TorrentBroadcast broadcast, which is also the default implementation of the broadcast, maintains its own block manager. The first request to access the data makes the call to its own block manager, and if not found, the data is fetched in chunks from the executor or driver. It works on the principle of BitTorrent and ensures that the driver is not the bottleneck in fetching the shared variables and data. Spark also provides accumulators, which work like broadcast, but provide updatable variables shared across the Spark jobs but with some limitations. You can refer to https://spark.apache.org/docs/1.5.1/api/scala/index.html#org.apache.spark.Accumulator.
  • org.apache.spark.io: This provides implementation of various compression libraries, which can be used at block storage level. This whole package is marked as Developer API, so developers can extend and provide their own custom implementations. By default, it provides three implementations: LZ4, LZF, and Snappy.
  • org.apache.spark.scheduler: This provides various scheduler libraries, which help in job scheduling, tracking, and monitoring. It defines the directed acyclic graph (DAG) scheduler (http://en.wikipedia.org/wiki/Directed_acyclic_graph). The Spark DAG scheduler defines the stage-oriented scheduling where it keeps track of the completion of each RDD and the output of each stage and then computes DAG, which is further submitted to the underlying org.apache.spark.scheduler.TaskScheduler API that executes them on the cluster.
  • org.apache.spark.storage: This provides APIs for structuring, managing, and finally, persisting the data stored in RDD within blocks. It also keeps tracks of data and ensures that it is either stored in memory, or if the memory is full, it is flushed to the underlying persistent storage area.
  • org.apache.spark.util: These are the utility classes used to perform common functions across the Spark APIs. For example, it defines MutablePair, which can be used as an alternative to Scala’s Tuple2 with the difference that MutablePair is updatable while Scala’s Tuple2 is not. It helps in optimizing memory and minimizing object allocations.

Spark execution model – master worker view

Let’s move on to the next section where we will dive deep into the Spark execution model, and we will also talk about various other Spark components.

Spark essentially enables the distributed in-memory execution of a given piece of code. We discussed the Spark architecture and its various layers in the previous section. Let’s also discuss its major components, which are used to configure the Spark cluster, and at the same time, they will be used to submit and execute our Spark jobs.

The following are the high-level components involved in setting up the Spark cluster or submitting a Spark job:

  • Spark driver: This is the client program, which defines SparkContext. The entry point for any job that defines the environment/configuration and the dependencies of the submitted job is SparkContext. It connects to the cluster manager and requests resources for further execution of the jobs.
  • Cluster manager/resource manager/Spark master: The cluster manager manages and allocates the required system resources to the Spark jobs. Furthermore, it coordinates and keeps track of the live/dead nodes in a cluster. It enables the execution of jobs submitted by the driver on the worker nodes (also called Spark workers) and finally tracks and shows the status of various jobs running by the worker nodes.
  • Spark worker/executors: A worker actually executes the business logic submitted by the Spark driver. Spark workers are abstracted and are allocated dynamically by the cluster manager to the Spark driver for the execution of submitted jobs.

The following diagram shows the high-level components and the master worker view of Spark:

Real-Time Big Data Analytics

The preceding diagram depicts the various components involved in setting up the Spark cluster, and the same components are also responsible for the execution of the Spark job.

Although all the components are important, but let’s briefly discuss the cluster/resource manager, as it defines the deployment model and allocation of resources to our submitted jobs.

Spark enables and provides flexibility to choose our resource manager. As of Spark 1.5.1, the following are the resource managers or deployment models that are supported by Spark:

  • Apache Mesos: Apache Mesos (http://mesos.apache.org/) is a cluster manager that provides efficient resource isolation and sharing across distributed applications or frameworks. It can run Hadoop, MPI, Hypertable, Spark, and other frameworks on a dynamically shared pool of nodes. Apache Mesos and Spark are closely related to each other (but they are not the same). The story started way back in 2009 when Mesos was ready and there were talks going on about the ideas/frameworks that can be developed on top of Mesos, and that’s exactly how Spark was born.

    Refer to http://spark.apache.org/docs/latest/running-on-mesos.html for more information on running Spark jobs on Apache Mesos.

  • Hadoop YARN: Hadoop 2.0 (http://tinyurl.com/qypb4xm), also known as YARN, was a complete change in the architecture. It was introduced as a generic cluster computing framework that was entrusted with the responsibility of allocating and managing the resources required to execute the varied jobs or applications. It introduced new daemon services, such as the resource manager (RM), node manager (NM), and application master (AM), which are responsible for managing cluster resources, individual nodes, and respective applications. YARN also introduced specific interfaces/guidelines for application developers where they can implement/follow and submit or execute their custom applications on the YARN cluster. The Spark framework implements the interfaces exposed by YARN and provides the flexibility of executing the Spark applications on YARN. Spark applications can be executed in the following two different modes in YARN:
    • YARN client mode: In this mode, the Spark driver executes the client machine (the machine used for submitting the job), and the YARN application master is just used for requesting the resources from YARN. All our logs and sysouts (println) are printed on the same console, which is used to submit the job.
    • YARN cluster mode: In this mode, the Spark driver runs inside the YARN application master process, which is further managed by YARN on the cluster, and the client can go away just after submitting the application. Now as our Spark driver is executed on the YARN cluster, our application logs/sysouts (println) are also written in the log files maintained by YARN and not on the machine that is used to submit our Spark job.

      For more information on executing Spark applications on YARN, refer to http://spark.apache.org/docs/latest/running-on-yarn.html.

    • Standalone mode: The Core Spark distribution contains the required APIs to create an independent, distributed, and fault tolerant cluster without any external or third-party libraries or dependencies.
    • Local mode: Local mode should not be confused with standalone mode. In local mode, Spark jobs can be executed on a local machine without any special cluster setup by just passing local[N] as the master URL, where N is the number of parallel threads.

Writing and executing our first Spark program

In this section, we will install/configure and write our first Spark program in Java and Scala.

Hardware requirements

Spark supports a variety of hardware and software platforms. It can be deployed on commodity hardware and also supports deployments on high-end servers. Spark clusters can be provisioned either on cloud or on-premises. Though there is no single configuration or standards, which can guide us through the requirements of Spark, but to create and execute Spark examples provided in this article, it would be good to have a laptop/desktop/server with the following configuration:

  • RAM: 8 GB.
  • CPU: Dual core or Quad core.
  • DISK: SATA drives with a capacity of 300 GB to 500 GB with 15 k RPM.
  • Operating system: Spark supports a variety of platforms that include various flavors of Linux (Ubuntu, HP-UX, RHEL, and many more) and Windows. For our examples, we will recommend that you use Ubuntu for the deployment and execution of examples.

Spark core is coded in Scala, but it offers several development APIs in different languages, such as Scala, Java, and Python, so that developers can choose their preferred weapon for coding. The dependent software may vary based on the programming languages but still there are common sets of software for configuring the Spark cluster and then language-specific software for developing Spark jobs.

In the next section, we will discuss the software installation steps required to write/execute Spark jobs in Scala and Java on Ubuntu as the operating system.

Installation of the basic softwares

In this section, we will discuss the various steps required to install the basic software, which will help us in the development and execution of our Spark jobs.

Spark

Perform the following steps to install Spark:

  1. Download the Spark compressed tarball from http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.4.tgz.
  2. Create a new directory spark-1.5.1 on your local filesystem and extract the Spark tarball into this directory.
  3. Execute the following command on your Linux shell in order to set SPARK_HOME as an environment variable:
    export SPARK_HOME=<Path of Spark install Dir>
  4. Now, browse your SPARK_HOME directory and it should look similar to the following screenshot:Real-Time Big Data Analytics

Java

Perform the following steps to install Java:

  1. Download and install Oracle Java 7 from http://www.oracle.com/technetwork/java/javase/install-linux-self-extracting-138783.html.
  2. Execute the following command on your Linux shell to set JAVA_HOME as an environment variable:
    export JAVA_HOME=<Path of Java install Dir>
    

Scala

Perform the following steps to install Scala:

  1. Download the Scala 2.10.5 compressed tarball from http://downloads.typesafe.com/scala/2.10.5/scala-2.10.5.tgz?_ga=1.7758962.1104547853.1428884173.
  2. Create a new directory, Scala 2.10.5, on your local filesystem and extract the Scala tarball into this directory.
  3. Execute the following commands on your Linux shell to set SCALA_HOME as an environment variable, and add the Scala compiler to the $PATH system:
    export SCALA_HOME=<Path of Scala install Dir>
  • Next, execute the command in the following screenshot to ensure that the Scala runtime and Scala compiler is available and the version is 2.10.x:Real-Time Big Data Analytics

Spark 1.5.1 supports the 2.10.5 version of Scala, so it is advisable to use the same version to avoid any runtime exceptions due to mismatch of libraries.

Eclipse

Perform the following steps to install Eclipse:

  • Based on your hardware configuration, download Eclipse Luna (4.4) from http://www.eclipse.org/downloads/packages/eclipse-ide-java-eedevelopers/lunasr2:

     Real-Time Big Data Analytics

  • Next, install the IDE for Scala in Eclipse itself so that we can write and compile our Scala code inside Eclipse (http://scala-ide.org/download/current.html).
  • We are now done with the installation of all the required software. Let’s move on and configure our Spark cluster.

Configuring the Spark cluster

The first step to configure the Spark cluster is to identify the appropriate resource manager. We discussed the various resource managers in the Spark execution model – master worker view section (Yarn, Mesos, and standalone). Standalone is the most preferred resource manager for development because it is simple/quick and does not require installation of any other component or software.

We will also configure the standalone resource manager for all our Spark examples, and for more information on Yarn and Mesos, refer to the Spark execution model – master worker view section.

Perform the following steps to bring up an independent cluster using Spark binaries:

  1. The first step to set up the Spark cluster is to bring up the master node, which will track and allocate the systems’ resource. Open your Linux shell and execute the following command:
    $SPARK_HOME/sbin/start-master.sh
  2. The preceding command will bring up your master node, and it will also enable a UI, the Spark UI to monitor the nodes/jobs in the Spark cluster, http://<host>:8080/. The <host> is the domain name of the machine on which the master is running.
  3. Next, let’s bring up our worker node, which will execute our Spark jobs. Execute the following command on the same Linux shell:
  4. $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker <Spark-Master> &
    In the preceding command, replace the <Spark-Master> with the Spark URL, which is shown at the top of the Spark UI, just beside Spark master at. The preceding command will start the Spark worker process in the background and the same will also be reported in the Spark UI. Real-Time Big Data Analytics

The Spark UI shown in the preceding screenshot shows the three different sections, providing the following information:

  • Workers: This reports the health of a worker node, which is alive or dead and also provides drill-down to query the status and details logs of the various jobs executed by that specific worker node
  • Running applications: This shows the applications that are currently being executed in the cluster and also provides drill-down and enables viewing of application logs
  • Completed application: This is the same functionality as running applications; the only difference being that it shows the jobs, which are finished

We are done!!! Our Spark cluster is up and running and ready to execute our Spark jobs with one worker node. Let’s move on and write our first Spark application in Scala and Java and further execute it on our newly created cluster.

Coding Spark job in Scala

In this section, we will code our first Spark job in Scala, and we will also execute the same job on our newly created Spark cluster and will further analyze the results.

This is our first Spark job, so we will keep it simple. We will use the Chicago crimes dataset for August 2015and will count the number of crimes reported in August 2015.

Perform the following steps to code the Spark job in Scala for aggregating the number of crimes in August 2015:

  1. Open Eclipse and create a Scala project called Spark-Examples.
  2. Expand your newly created project and modify the version of the Scala library container to 2.10. This is done to ensure that the version of Scala libraries used by Spark and the custom jobs developed/deployed are the same.Real-Time Big Data Analytics
  3. Next, open the properties of your project Spark-Examples and add the dependencies for the all libraries packaged with the Spark distribution, which can be found at $SPARK_HOME/lib.
  4. Next, create a chapter.six Scala package, and in this package, define a new Scala object by the name of ScalaFirstSparkJob.
  5. Define a main method in the Scala object and also import SparkConfand SparkContext.
  6. Now, add the following code to the main method of ScalaFirstSparkJob:
    object ScalaFirstSparkJob {
    
      def main(args: Array[String]) {
        println("Creating Spark Configuration")
       //Create an Object of Spark Configuration
       val conf = new SparkConf()
       //Set the logical and user defined Name of this Application
       conf.setAppName("My First Spark Scala Application")
       println("Creating Spark Context")
       //Create a Spark Context and provide previously created
       //Object of SparkConf as an reference.
       val ctx = new SparkContext(conf)
       //Define the location of the file containing the Crime Data
       val file = "file:///home/ec2-user/softwares/crime-data/
         Crimes_-Aug-2015.csv";
       println("Loading the Dataset and will further process it")
       //Loading the Text file from the local file system or HDFS
       //and converting it into RDD.
       //SparkContext.textFile(..) - It uses the Hadoop's
       //TextInputFormat and file is broken by New line Character.
       //Refer to http://hadoop.apache.org/docs/r2.6.0/api/org/
         apache/hadoop/mapred/TextInputFormat.html
       //The Second Argument is the Partitions which specify the 
         parallelism.
       //It should be equal or more then number of Cores in the 
         cluster.
       
       val logData = ctx.textFile(file, 2)
       
       //Invoking Filter operation on the RDD, and counting the 
         number of lines in the Data loaded in RDD.
       //Simply returning true as "TextInputFormat" have already 
         divided the data by "\n"
       //So each RDD will have only 1 line.
       val numLines = logData.filter(line => true).count()
       //Finally Printing the Number of lines.
       println("Number of Crimes reported in Aug-2015 = " + 
         numLines)
      }
    
    }
    

    We are now done with the coding! Our first Spark job in Scala is ready for execution.

  7. Now, from Eclipse itself, export your project as a .jar fie, name it spark-examples.jar, and save this .jar file in the root of $SPARK_HOME.
  8. Next, open your Linux console, go to $SPARK_HOME, and execute the following command:
    $SPARK_HOME/bin/spark-submit --class chapter.six.ScalaFirstSparkJob --master spark://ip-10-166-191-242:7077 spark-examples.jar

    In the preceding command, ensure that the value given to –masterparameter is the same as it is shown on your Spark UI.

    The Spark-submit is a utility script, which is used to submit the Spark jobs to the cluster.

  9. As soon as you click on Enter and execute the preceding command, you will see lot of activity (log messages) on the console, and finally, you will see the output of your job at the end:Real-Time Big Data Analytics

Isn’t that simple! As we move forward and discuss Spark more, you will appreciate the ease of coding and simplicity provided by Spark for creating, deploying, and running jobs in a distributed framework.

Your completed job will also be available for viewing at the Spark UI:

Real-Time Big Data Analytics

The preceding image shows the status of our first Scala job on the UI. Now let’s move forward and develop the same Job using Spark Java APIs.

Coding Spark job in Java

Perform the following steps to code the Spark job in Java for aggregating the number of crimes in August 2015:

  1. Open your Spark-Examples Eclipse project (created in the previous section).
  2. Add a new chapter.six.JavaFirstSparkJobJava file, and add the following code snippet:
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    
    
    public class JavaFirstSparkJob {
    
      public static void main(String[] args) {
        System.out.println("Creating Spark Configuration");
         // Create an Object of Spark Configuration
         SparkConf javaConf = new SparkConf();
         // Set the logical and user defined Name of this Application
         javaConf.setAppName("My First Spark Java Application");
         System.out.println("Creating Spark Context");
         // Create a Spark Context and provide previously created
         //Objectx of SparkConf as an reference.
         JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
         System.out.println("Loading the Crime Dataset and will further process it");
    
         String file = "file:///home/ec2-user/softwares/crime-data/Crimes_-Aug-2015.csv";
         JavaRDD<String> logData = javaCtx.textFile(file);
         //Invoking Filter operation on the RDD.
         //And counting the number of lines in the Data loaded
         //in RDD.
         //Simply returning true as "TextInputFormat" have already divided the data by "\n"
         //So each RDD will have only 1 line.
         long numLines = logData.filter(new Function<String, Boolean>() {
         public Boolean call(String s) {
         return true;
         }
         }).count();
         //Finally Printing the Number of lines
         System.out.println("Number of Crimes reported in Aug-2015 = "+numLines);
         javaCtx.close();
    
      }
    }
    
  3. Next, compile the preceding JavaFirstSparkJob from Eclipse itself and perform steps 7, 8, and 9 of the previous section in which we executed the Spark Scala job.

We are done! Analyze the output on the console; it should be the same as the output of the Scala job, which we executed in the previous section.

Troubleshooting – tips and tricks

In this section, we will talk about troubleshooting tips and tricks, which are helpful in solving the most common errors encountered while working with Spark.

Port numbers used by Spark

Spark binds various network ports for communication within the cluster/nodes and also exposes the monitoring information of jobs to developers and administrators. There may be instances where the default ports used by Spark may not be available or may be blocked by the network firewall which in turn will result in modifying the default Spark ports for master/worker or driver. Here is a list of all the ports utilized by Spark and their associated parameters, which need to be configured for any changes (http://spark.apache.org/docs/latest/security.html#configuring-ports-for-network-security).

Classpath issues – class not found exception

Classpath is the most common issue and it occurs frequently in distributed applications.

Spark and its associated jobs run in a distributed mode on a cluster. So, if your Spark job is dependent upon external libraries, then we need to ensure that we package them into a single JAR fie and place it in a common location or the default classpath of all worker nodes or define the path of the JAR file within SparkConf itself:

val sparkConf = new SparkConf().setAppName("myapp").setJars(<path of Jar file>))

Other common exceptions

In this section, we will talk about few of the common errors/issues/exceptions encountered by architects/developers when they set up Spark or execute Spark jobs:

  • Too many open files: This increases the ulimit on your Linux OS by executingsudo ulimit –n 20000.
  • Version of Scala: Spark 1.5.1 supports Scala 2.10, so if you have multiple versions of Scala deployed on your box, then ensure that all versions are the same, that is, Scala 2.10.
  • Out of memory on workers in standalone mode: This configures SPARK_WORKER_MEMORY in $SPARK_HOME/conf/spark-env.sh. By default, it provides a total memory of 1 G to workers, but at the same time, you should analyze and ensure that you are not loading or caching too much data on worker nodes.
  • Out of memory in applications executed on worker nodes: This configures spark.executor.memory in your SparkConf, as follows:
    val sparkConf = new SparkConf().setAppName("myapp")
    .set("spark.executor.memory", "1g")

The preceding tips will help you solve basic issues in setting up Spark clusters, but as you move ahead, there could be more complex issues, which are beyond the basic setup, and for all those issues, post your queries at http://stackoverflow.com/questions/tagged/apache-spark or mail at [email protected].

Summary

In this article, we discussed the architecture of Spark and its various components. We also configured our Spark cluster and executed our first Spark job in Scala and Java.

Resources for Article:

 


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here