11 min read

 In this article by Sumit Gupta, the author of the book Learning Real-time Processing with Spark Streaming, we will discuss the integration of Spark Streaming with various other advance Spark libraries such as Spark SQL.

(For more resources related to this topic, see here.)

No single software in today’s world can fulfill the varied, versatile, and complex demands/needs of the enterprises, and to be honest, neither should it!

Software are made to fulfill specific needs arising out of the enterprises at a particular point in time, which may change in future due to many other factors. These factors may or may not be controlled like government policies, business/market dynamics, and many more.

Considering all these factors integration and interoperability of any software system with internal/external systems/software’s is pivotal in fulfilling the enterprise needs. Integration and interoperability are categorized as nonfunctional requirements, which are always implicit and may or may not be explicitly stated by the end users.

Over the period of time, architects have realized the importance of these implicit requirements in modern enterprises, and now, all enterprise architectures provide support due diligence and provisions in fulfillment of these requirements. Even the enterprise architecture frameworks such as The Open Group Architecture Framework (TOGAF) defines the specific set of procedures and guidelines for defining and establishing interoperability and integration requirements of modern enterprises.

Spark community realized the importance of both these factors and provided a versatile and scalable framework with certain hooks for integration and interoperability with the different systems/libraries; for example; data consumed and processed via Spark streams can also be loaded into the structured (table: rows/columns) format and can be further queried using SQL. Even the data can be stored in the form of Hive tables in HDFS as persistent tables, which will exist even after our Spark program has restarted.

In this article, we will discuss querying streaming data in real time using Spark SQL.

Querying streaming data in real time

Spark Streaming is developed on the principle of integration and interoperability where it not only provides a framework for consuming data in near real time from varied data sources, but at the same time, it also provides the integration with Spark SQL where existing DStreams can be converted into structured data format for querying using standard SQL constructs.

There are many such use cases where SQL on streaming data is a much needed feature; for example, in our distributed log analysis use case, we may need to combine the precomputed datasets with the streaming data for performing exploratory analysis using interactive SQL queries, which is difficult to implement only with streaming operators as they are not designed for introducing new datasets and perform ad hoc queries.

Moreover SQL’s success at expressing complex data transformations derives from the fact that it is based on a set of very powerful data processing primitives that do filtering, merging, correlation, and aggregation, which is not available in the low-level programming languages such as Java/ C++ and may result in long development cycles and high maintenance costs.

Let’s move forward and first understand few things about Spark SQL, and then, we will also see the process of converting existing DStreams into the Structured formats.

Understanding Spark SQL

Spark SQL is one of the modules developed over the Spark framework for processing structured data, which is stored in the form of rows and columns. At a very high level, it is similar to the data residing in RDBMS in the form rows and columns, and then SQL queries are executed for performing analysis, but Spark SQL is much more versatile and flexible as compared to RDBMS. Spark SQL provides distributed processing of SQL queries and can be compared to frameworks Hive/Impala or Drill. Here are the few notable features of Spark SQL:

  1. Spark SQL is capable of loading data from variety of data sources such as text files, JSON, Hive, HDFS, Parquet format, and of course RDBMS too so that we can consume/join and process datasets from different and varied data sources.
  2. It supports static and dynamic schema definition for the data loaded from various sources, which helps in defining schema for known data structures/types, and also for those datasets where the columns and their types are not known until runtime.
  3. It can work as a distributed query engine using the thrift JDBC/ODBC server or command-line interface where end users or applications can interact with Spark SQL directly to run SQL queries.
  4. Spark SQL provides integration with Spark Streaming where DStreams can be transformed into the structured format and further SQL Queries can be executed.
  5. It is capable of caching tables using an in-memory columnar format for faster reads and in-memory data processing.
  6. It supports Schema evolution so that new columns can be added/deleted to the existing schema, and Spark SQL still maintains the compatibility between all versions of the schema.

Spark SQL defines the higher level of programming abstraction called DataFrames, which is also an extension to the existing RDD API.

Data frames are the distributed collection of the objects in the form of rows and named columns, which is similar to tables in the RDBMS, but with much richer functionality containing all the previously defined features. The DataFrame API is inspired by the concepts of data frames in R (http://www.r-tutor.com/r-introduction/data-frame) and Python (http://pandas.pydata.org/pandas-docs/stable/dsintro.html#dataframe).

Let’s move ahead and understand how Spark SQL works with the help of an example:

  1. As a first step, let’s create sample JSON data about the basic information about the company’s departments such as Name, Employees, and so on, and save this data into the file company.json. The JSON file would look like this:
    [
       {  
          "Name":"DEPT_A",
          "No_Of_Emp":10,
          "No_Of_Supervisors":2
       },
       {  
          "Name":"DEPT_B",
          "No_Of_Emp":12,
          "No_Of_Supervisors":2
       },
       {  
          "Name":"DEPT_C",
          "No_Of_Emp":14,
          "No_Of_Supervisors":3
       },
       {  
          "Name":"DEPT_D",
          "No_Of_Emp":10,
          "No_Of_Supervisors":1
       },
       {  
          "Name":"DEPT_E",
          "No_Of_Emp":20,
          "No_Of_Supervisors":5
       }
    ]
    

    You can use any online JSON editor such as http://codebeautify.org/online-json-editor to see and edit data defined in the preceding JSON code.

  2. Next, let’s extend our Spark-Examples project and create a new package by the name chapter.six, and within this new package, create a new Scala object and name it as ScalaFirstSparkSQL.scala.
  3. Next, add the following import statements just below the package declaration:
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
  4. Further, in your main method, add following set of statements to create SQLContext from SparkContext:
    //Creating Spark Configuration
    val conf = new SparkConf()
    //Setting Application/ Job Name
    conf.setAppName("My First Spark SQL")
    // Define Spark Context which we will use to initialize our SQL Context 
    val sparkCtx = new SparkContext(conf)
    //Creating SQL Context
    val sqlCtx = new SQLContext(sparkCtx)
    

    SQLContext or any of its descendants such as HiveContext—for working with Hive tables or CassandraSQLContext—for working with Cassandra tables is the main entry point for accessing all functionalities of Spark SQL. It allows the creation of data frames, and also provides functionality to fire SQL queries over data frames.

  5. Next, we will define the following code to load the JSON file (company.json) using the SQLContext, and further, we will also create a data frame:
    //Define path of your JSON File (company.json) which needs to be processed 
    val path = "/home/softwares/spark/data/company.json";
    //Use SQLCOntext and Load the JSON file.
    //This will return the DataFrame which can be further Queried using    SQL queries.
    val dataFrame = sqlCtx.jsonFile(path)
    

In the preceding piece of code, we used the jsonFile(…) method for loading the JSON data. There are other utility method defined by SQLContext for reading raw data from filesystem or creating data frames from the existing RDD and many more.

Spark SQL supports two different methods for converting the existing RDDs into data frames. The first method uses reflection to infer the schema of an RDD from the given data. This approach leads to more concise code and helps in instances where we already know the schema while writing Spark application. We have used the same approach in our example.

The second method is through a programmatic interface that allows to construct a schema. Then, apply it to an existing RDD and finally generate a data frame. This method is more verbose, but provides flexibility and helps in those instances where columns and data types are not known until the data is received at runtime.

Refer to https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.SQLContext for a complete list of methods exposed by SQLContext.

Once the DataFrame is created, we need to register DataFrame as a temporary table within the SQL context so that we can execute SQL queries over the registered table. Let’s add the following piece of code for registering our DataFrame with our SQL context and name it company:

//Register the data as a temporary table within SQL Context
//Temporary table is destroyed as soon as SQL Context is destroyed.
dataFrame.registerTempTable("company");

And we are done… Our JSON data is automatically organized into the table (rows/column) and is ready to accept the SQL queries. Even the data types is inferred from the type of data entered within the JSON file itself.

Now, we will start executing the SQL queries on our table, but before that let’s see the schema being created/defined by SQLContext:

//Printing the Schema of the Data loaded in the Data Frame
dataFrame.printSchema();

The execution of the preceding statement will provide results similar to mentioned illustration:

The preceding illustration shows the schema of the JSON data loaded by Spark SQL. Pretty simple and straight, isn’t it? Spark SQL has automatically created our schema based on the data defined in our company.json file. It has also defined the data type of each of the columns.

We can also define the schema using reflection (https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#inferring-the-schema-using-reflection) or can also programmatically define the schema (https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#inferring-the-schema-using-reflection).

Next, let’s execute some SQL queries to see the data stored in DataFrame, so the first SQL would be to print all records:

//Executing SQL Queries to Print all records in the DataFrame
   println("Printing All records")
   sqlCtx.sql("Select * from company").collect().foreach(print)

The execution of the preceding statement will produce the following results on the console where the driver is executed:

Next, let’s also select only few columns instead of all records and print the same on console:

//Executing SQL Queries to Print Name and Employees
//in each Department
   println("n Printing Number of Employees in All Departments")
   sqlCtx.sql("Select Name, No_Of_Emp from
   company").collect().foreach(println)

The execution of the preceding statement will produce the following results on the Console where the driver is executed:

Now, finally let’s do some aggregation and count the total number of all employees across the departments:

    //Using the aggregate function (agg) to print the 
    //total number of employees in the Company 
    println("n Printing Total Number of Employees in Company_X")
    val allRec = sqlCtx.sql("Select * from 
    company").agg(Map("No_Of_Emp"->"sum"))
    allRec.collect.foreach ( println )

In the preceding piece of code, we used the agg(…) function and performed the sum of all employees across the departments, where sum can be replaced by avg, max, min, or count.

The execution of the preceding statement will produce the following results on the console where the driver is executed:

The preceding images shows the results of executing the aggregation on our company.json data.

Refer to the Data Frame API at https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.DataFrame for further information on the available functions for performing aggregation.

As a last step, we will stop our Spark SQL context by invoking the stop() function on SparkContextsparkCtx.stop(). This is required so that your application can notify master or resource manager to release all resources allocated to the Spark job. It also ensures the graceful shutdown of the job and avoids any resource leakage, which may happen otherwise. Also, as of now, there can be only one Spark context active per JVM, and we need to stop() the active SparkContext class before creating a new one.

Summary

In this article, we have seen the step-by-step process of using Spark SQL as a standalone program. Though we have considered JSON files as an example, but we can also leverage Spark SQL with Cassandra (https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md) or MongoDB (https://github.com/Stratio/spark-mongodb) or Elasticsearch (http://chapeau.freevariable.com/2015/04/elasticsearch-and-spark-1-dot-3.html).

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here