[box type=”note” align=”” class=”” width=””]Below given is an excerpt from the book Learning Spark SQL by Aurobindo Sarkar. Spark SQL APIs provide an optimized interface that helps developers build distributed applications quickly and easily. However, designing web-scale production applications using Spark SQL APIs can be a complex task. This book provides you with an understanding of design and implementation best practices used to design and build real-world, Spark-based applications. [/box]
In the article, we shall give you a perspective of Spark SQL and its components.
Introduction
Spark SQL is one of the most advanced components of Apache Spark. It has been a part of the core distribution since Spark 1.0 and supports Python, Scala, Java, and R programming APIs. As illustrated in the figure below, Spark SQL components provide the foundation for Spark machine learning applications, streaming applications, graph applications, and many other types of application architectures.
Such applications, typically, use Spark ML pipelines, Structured Streaming, and GraphFrames, which are all based on Spark SQL interfaces (DataFrame/Dataset API). These applications, along with constructs such as SQL, DataFrames, and Datasets API, receive the benefits of the Catalyst optimizer, automatically. This optimizer is also responsible for generating executable query plans based on the lower-level RDD interfaces.
SparkSession
SparkSession represents a unified entry point for manipulating data in Spark. It minimizes the number of different contexts a developer has to use while working with Spark. SparkSession replaces multiple context objects, such as the SparkContext, SQLContext, and HiveContext. These contexts are now encapsulated within the SparkSession object. In Spark programs, we use the builder design pattern to instantiate a SparkSession object. However, in the REPL environment (that is, in a Spark shell session), the SparkSession is automatically created and made available to you via an instance object called Spark.At this time, start the Spark shell on your computer to interactively execute the code snippets in this section. As the shell starts up, you will notice a bunch of messages appearing on your screen, as shown in the following figure.
Understanding Resilient Distributed datasets (RDD)
RDDs are Spark’s primary distributed Dataset abstraction. It is a collection of data that is immutable, distributed, lazily evaluated, type inferred, and cacheable. Prior to execution, the developer code (using higher-level constructs such as SQL, DataFrames, and Dataset APIs) is converted to a DAG of RDDs (ready for execution). RDDs can be created by parallelizing an existing collection of data or accessing a Dataset residing in an external storage system, such as the file system or various Hadoop-based data sources. The parallelized collections form a distributed Dataset that enable parallel operations on them. An RDD can be created from the input file with number of partitions specified, as shown:
scala> val cancerRDD =
sc.textFile("file:///Users/aurobindosarkar/Downloads/breast-cancerwisconsin.
data", 4)
scala> cancerRDD.partitions.size
res37: Int = 4
RDD files can be internaly converted to a DataFrame by importing the spark.implicits package and using the toDF()
method:
scala> import spark.implicits._scala>
val cancerDF = cancerRDD.toDF()
To create a DataFrame with a specific schema, we define a Row object for the rows contained in the DataFrame. Additionally, we split the comma-separated data, convert it to a list of fields, and then map it to the Row object. Finally, we use the create DataFrame()
to create the DataFrame with a specified schema:
def row(line: List[String]): Row = { Row(line(0).toLong, line(1).toInt,
line(2).toInt, line(3).toInt, line(4).toInt, line(5).toInt, line(6).toInt,
line(7).toInt, line(8).toInt, line(9).toInt, line(10).toInt) }
val data = cancerRDD.map(_.split(",").to[List]).map(row)
val cancerDF = spark.createDataFrame(data, recordSchema)
Further, we can easily convert the preceding DataFrame to a Dataset using the case class defined earlier:
scala> val cancerDS = cancerDF.as[CancerClass]
RDD data is logically divided into a set of partitions; additionally, all input, intermediate, and output data is also represented as partitions. The number of RDD partitions defines the level of data fragmentation. These partitions are also the basic units of parallelism. Spark execution jobs are split into multiple stages, and as each stage operates on one partition at a time, it is very important to tune the number of partitions. Fewer partitions than active stages means your cluster could be under-utilized, while an excessive number of partitions could impact the performance due to higher disk and network I/O.
Understanding DataFrames and Datasets
A DataFrame is similar to a table in a relational database, a pandas dataframe, or a dataframe in R. It is a distributed collection of rows that is organized into columns. It uses the immutable, in-memory, resilient, distributed, and parallel capabilities of RDD, and applies a schema to the data. DataFrames are also evaluated lazily. Additionally, they provide a domain-specific language (DSL) for distributed data manipulation. Conceptually, the DataFrame is an alias for a collection of generic objects Dataset[Row], where a row is a generic untyped object. This means that syntax errors for DataFrames are caught during the compile stage; however, analysis errors are detected only during runtime. DataFrames can be constructed from a wide array of sources, such as structured data files, Hive tables, databases, or RDDs. The source data can be read from local filesystems, HDFS, Amazon S3, and RDBMSs. In addition, other popular data formats, such as CSV, JSON, Avro, Parquet, and so on, are also supported. Additionally, you can also create and use custom data sources. The DataFrame API supports Scala, Java, Python, and R programming APIs. The DataFrames API is declarative, and combined with procedural Spark code, it provides a much tighter integration between the relational and procedural processing in your applications. DataFrames can be manipulated using Spark’s procedural API, or using relational APIs (with richer optimizations).
Understanding the Catalyst optimizer
The Catalyst optimizer is at the core of Spark SQL and is implemented in Scala. It enables several key features, such as schema inference (from JSON data), that are very useful in data analysis work. The following figure shows the high-level transformation process from a developer’s program containing DataFrames/Datasets to the final execution plan:
The internal representation of the program is a query plan. The query plan describes data operations such as aggregate, join, and filter, which match what is defined in your query. These operations generate a new Dataset from the input Dataset. After we have an initial version of the query plan ready, the Catalyst optimizer will apply a series of transformations to convert it to an optimized query plan. Finally, the Spark SQL code generation mechanism translates the optimized query plan into a DAG of RDDs that is ready for execution. The query plans and the optimized query plans are internally represented as trees. So, at its core, the Catalyst optimizer contains a general library for representing trees and applying rules to manipulate them. On top of this library, are several other libraries that are more specific to relational query processing. Catalyst has two types of query plans: Logical and Physical Plans. The Logical Plan describes the computations on the Datasets without defining how to carry out the specific computations. Typically, the Logical Plan generates a list of attributes or columns as output under a set of constraints on the generated rows. The Physical Plan describes the computations on Datasets with specific definitions on how to execute them (it is executable).
Let’s explore the transformation steps in more detail. The initial query plan is essentially an unresolved Logical Plan, that is, we don’t know the source of the Datasets or the columns (contained in the Dataset) at this stage and we also don’t know the types of columns. The first step in this pipeline is the analysis step. During analysis, the catalog information is used to convert the unresolved Logical Plan to a resolved Logical Plan.
In the next step, a set of logical optimization rules is applied to the resolved Logical Plan, resulting in an optimized Logical Plan. In the next step the optimizer may generate multiple Physical Plans and compare their costs to pick the best one. The first version of the Costbased Optimizer (CBO), built on top of Spark SQL has been released in Spark 2.2. More details on cost-based optimization are presented in Chapter 11, Tuning Spark SQL Components for Performance.
All three–DataFrame, Dataset and SQL–share the same optimization pipeline as illustrated in the following figure:
The primary goal of this article was to give an overview of Spark SQL to enable you being comfortable with the Spark environment through hands-on sessions (using public Datasets).
If you liked our article, please be sure to check out Learning Spark SQL which consists of more useful techniques on data extraction and data analysis using Spark SQL.