35 min read

In this article by Garry Turkington and Gabriele Modena, the author of the book Learning Hadoop 2. explain how MapReduce is a powerful paradigm that enables complex data processing that can reveal valuable insights. It does require a different mindset and some training and experience on the model of breaking processing analytics into a series of map and reduce steps. There are several products that are built atop Hadoop to provide higher-level or more familiar views of the data held within HDFS, and Pig is a very popular one. This article will explore the other most common abstraction implemented atop Hadoop: SQL.

In this article, we will cover the following topics:

  • What the use cases for SQL on Hadoop are and why it is so popular
  • HiveQL, the SQL dialect introduced by Apache Hive
  • Using HiveQL to perform SQL-like analysis of the Twitter dataset
  • How HiveQL can approximate common features of relational databases such as joins and views

(For more resources related to this topic, see here.)

Why SQL on Hadoop

So far we have seen how to write Hadoop programs using the MapReduce APIs and how Pig Latin provides a scripting abstraction and a wrapper for custom business logic by means of UDFs. Pig is a very powerful tool, but its dataflow-based programming model is not familiar to most developers or business analysts. The traditional tool of choice for such people to explore data is SQL.

Back in 2008 Facebook released Hive, the first widely used implementation of SQL on Hadoop.

Instead of providing a way of more quickly developing map and reduce tasks, Hive offers an implementation of HiveQL, a query language based on SQL. Hive takes HiveQL statements and immediately and automatically translates the queries into one or more MapReduce jobs. It then executes the overall MapReduce program and returns the results to the user.

This interface to Hadoop not only reduces the time required to produce results from data analysis, it also significantly widens the net as to who can use Hadoop. Instead of requiring software development skills, anyone who’s familiar with SQL can use Hive.

The combination of these attributes is that HiveQL is often used as a tool for business and data analysts to perform ad hoc queries on the data stored on HDFS. With Hive, the data analyst can work on refining queries without the involvement of a software developer. Just as with Pig, Hive also allows HiveQL to be extended by means of User Defined Functions, enabling the base SQL dialect to be customized with business-specific functionality.

Other SQL-on-Hadoop solutions

Though Hive was the first product to introduce and support HiveQL, it is no longer the only one. There are others, but we will mostly discuss Hive and Impala as they have been the most successful.

While introducing the core features and capabilities of SQL on Hadoop however, we will give examples using Hive; even though Hive and Impala share many SQL features, they also have numerous differences. We don’t want to constantly have to caveat each new feature with exactly how it is supported in Hive compared to Impala. We’ll generally be looking at aspects of the feature set that are common to both, but if you use both products, it’s important to read the latest release notes to understand the differences.

Prerequisites

Before diving into specific technologies, let’s generate some data that we’ll use in the examples throughout this article. We’ll create a modified version of a former Pig script as the main functionality for this. The script in this article assumes that the Elephant Bird JARs used previously are available in the /jar directory on HDFS. The full source code is at https://github.com/learninghadoop2/book-examples/ch7/extract_for_hive.pig, but the core of extract_for_hive.pig is as follows:

-- load JSON data
tweets = load '$inputDir' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad');
-- Tweets
tweets_tsv = foreach tweets {
generate
   (chararray)CustomFormatToISO($0#'created_at',
'EEE MMMM d HH:mm:ss Z y') as dt,
   (chararray)$0#'id_str',
(chararray)$0#'text' as text,
   (chararray)$0#'in_reply_to',
(boolean)$0#'retweeted' as is_retweeted,
(chararray)$0#'user'#'id_str' as user_id, (chararray)$0#'place'#'id' as place_id;
}
store tweets_tsv into '$outputDir/tweets'
using PigStorage('u0001');
-- Places
needed_fields = foreach tweets {
   generate
(chararray)CustomFormatToISO($0#'created_at',
'EEE MMMM d HH:mm:ss Z y') as dt,
     (chararray)$0#'id_str' as id_str,
$0#'place' as place;
}
place_fields = foreach needed_fields {
generate
   (chararray)place#'id' as place_id,
   (chararray)place#'country_code' as co,
   (chararray)place#'country' as country,
   (chararray)place#'name' as place_name,
   (chararray)place#'full_name' as place_full_name,
   (chararray)place#'place_type' as place_type;
}
filtered_places = filter place_fields by co != '';
unique_places = distinct filtered_places;
store unique_places into '$outputDir/places'
using PigStorage('u0001');
 
-- Users
users = foreach tweets {
   generate
(chararray)CustomFormatToISO($0#'created_at',
'EEE MMMM d HH:mm:ss Z y') as dt,
(chararray)$0#'id_str' as id_str,
$0#'user' as user;
}
user_fields = foreach users {
   generate
   (chararray)CustomFormatToISO(user#'created_at',
'EEE MMMM d HH:mm:ss Z y') as dt,
(chararray)user#'id_str' as user_id,
(chararray)user#'location' as user_location,
(chararray)user#'name' as user_name,
(chararray)user#'description' as user_description,
(int)user#'followers_count' as followers_count,
(int)user#'friends_count' as friends_count,
(int)user#'favourites_count' as favourites_count,
(chararray)user#'screen_name' as screen_name,
(int)user#'listed_count' as listed_count;
 
}
unique_users = distinct user_fields;
store unique_users into '$outputDir/users'
using PigStorage('u0001');

Run this script as follows:

$ pig –f extract_for_hive.pig –param inputDir=<json input> -param outputDir=<output path>

The preceding code writes data into three separate TSV files for the tweet, user, and place information. Notice that in the store command, we pass an argument when calling PigStorage. This single argument changes the default field separator from a tab character to unicode value U0001, or you can also use Ctrl +C + A. This is often used as a separator in Hive tables and will be particularly useful to us as our tweet data could contain tabs in other fields.

Overview of Hive

We will now show how you can import data into Hive and run a query against the table abstraction Hive provides over the data. In this example, and in the remainder of the article, we will assume that queries are typed into the shell that can be invoked by executing the hive command.

Recently a client called Beeline also became available and will likely be the preferred CLI client in the near future.

When importing any new data into Hive, there is generally a three-stage process:

  • Create the specification of the table into which the data is to be imported
  • Import the data into the created table
  • Execute HiveQL queries against the table

Most of the HiveQL statements are direct analogues to similarly named statements in standard SQL. We assume only a passing knowledge of SQL throughout this article, but if you need a refresher, there are numerous good online learning resources.

Hive gives a structured query view of our data, and to enable that, we must first define the specification of the table’s columns and import the data into the table before we can execute any queries. A table specification is generated using a CREATE statement that specifies the table name, the name and types of its columns, and some metadata about how the table is stored:

CREATE table tweets (
created_at string,
tweet_id string,
text string,
in_reply_to string,
retweeted boolean,
user_id string,
place_id string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY 'u0001'
STORED AS TEXTFILE;

The statement creates a new table tweets defined by a list of names for columns in the dataset and their data type. We specify that fields are delimited by the Unicode U0001 character and that the format used to store data is TEXTFILE.

Data can be imported from a location in HDFS tweets/ into hive using the LOAD DATA statement:

LOAD DATA INPATH 'tweets' OVERWRITE INTO TABLE tweets;

By default, data for Hive tables is stored on HDFS under /user/hive/warehouse. If a LOAD statement is given a path to data on HDFS, it will not simply copy the data into /user/hive/warehouse, but will move it there instead. If you want to analyze data on HDFS that is used by other applications, then either create a copy or use the EXTERNAL mechanism that will be described later.

Once data has been imported into Hive, we can run queries against it. For instance:

SELECT COUNT(*) FROM tweets;

The preceding code will return the total number of tweets present in the dataset. HiveQL, like SQL, is not case sensitive in terms of keywords, columns, or table names. By convention, SQL statements use uppercase for SQL language keywords, and we will generally follow this when using HiveQL within files, as will be shown later. However, when typing interactive commands, we will frequently take the line of least resistance and use lowercase.

If you look closely at the time taken by the various commands in the preceding example, you’ll notice that loading data into a table takes about as long as creating the table specification, but even the simple count of all rows takes significantly longer. The output also shows that table creation and the loading of data do not actually cause MapReduce jobs to be executed, which explains the very short execution times.

The nature of Hive tables

Although Hive copies the data file into its working directory, it does not actually process the input data into rows at that point.

Both the CREATE TABLE and LOAD DATA statements do not truly create concrete table data as such; instead, they produce the metadata that will be used when Hive generates MapReduce jobs to access the data conceptually stored in the table but actually residing on HDFS. Even though the HiveQL statements refer to a specific table structure, it is Hive’s responsibility to generate code that correctly maps this to the actual on-disk format in which the data files are stored.

This might seem to suggest that Hive isn’t a real database; this is true, it isn’t. Whereas a relational database will require a table schema to be defined before data is ingested and then ingest only data that conforms to that specification, Hive is much more flexible. The less concrete nature of Hive tables means that schemas can be defined based on the data as it has already arrived and not on some assumption of how the data should be, which might prove to be wrong. Though changeable data formats are troublesome regardless of technology, the Hive model provides an additional degree of freedom in handling the problem when, not if, it arises.

Hive architecture

Until version 2, Hadoop was primarily a batch system. Internally, Hive compiles HiveQL statements into MapReduce jobs. Hive queries have traditionally been characterized by high latency. This has changed with the Stinger initiative and the improvements introduced in Hive 0.13 that we will discuss later.

Hive runs as a client application that processes HiveQL queries, converts them into MapReduce jobs, and submits these to a Hadoop cluster either to native MapReduce in Hadoop 1 or to the MapReduce Application Master running on YARN in Hadoop 2.

Regardless of the model, Hive uses a component called the metastore, in which it holds all its metadata about the tables defined in the system. Ironically, this is stored in a relational database dedicated to Hive’s usage. In the earliest versions of Hive, all clients communicated directly with the metastore, but this meant that every user of the Hive CLI tool needed to know the metastore username and password.

HiveServer was created to act as a point of entry for remote clients, which could also act as a single access-control point and which controlled all access to the underlying metastore. Because of limitations in HiveServer, the newest way to access Hive is through the multi-client HiveServer2.

HiveServer2 introduces a number of improvements over its predecessor, including user authentication and support for multiple connections from the same client. More information can be found at https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2.

Instances of HiveServer and HiveServer2 can be manually executed with the hive –service hiveserver and hive –service hiveserver2 commands, respectively.

In the examples we saw before and in the remainder of this article, we implicitly use HiveServer to submit queries via the Hive command-line tool. HiveServer2 comes with Beeline. For compatibility and maturity reasons, Beeline being relatively new, both tools are available on Cloudera and most other major distributions. The Beeline client is part of the core Apache Hive distribution and so is also fully open source. Beeline can be executed in embedded version with the following command:

$ beeline -u jdbc:hive2://

Data types

HiveQL supports many of the common data types provided by standard database systems. These include primitive types, such as float, double, int, and string, through to structured collection types that provide the SQL analogues to types such as arrays, structs, and unions (structs with options for some fields). Since Hive is implemented in Java, primitive types will behave like their Java counterparts. We can distinguish Hive data types into the following five broad categories:

  • Numeric: tinyint, smallint, int, bigint, float, double, and decimal
  • Date and time: timestamp and date
  • String: string, varchar, and char
  • Collections: array, map, struct, and uniontype
  • Misc: boolean, binary, and NULL

DDL statements

HiveQL provides a number of statements to create, delete, and alter databases, tables, and views. The CREATE DATABASE <name> statement creates a new database with the given name. A database represents a namespace where table and view metadata is contained. If multiple databases are present, the USE <database name> statement specifies which one to use to query tables or create new metadata. If no database is explicitly specified, Hive will run all statements against the default database. SHOW [DATABASES, TABLES, VIEWS] displays the databases currently available within a data warehouse and which table and view metadata is present within the database currently in use:

CREATE DATABASE twitter;
SHOW databases;
USE twitter;
SHOW TABLES;

The CREATE TABLE [IF NOT EXISTS] <name> statement creates a table with the given name. As alluded to earlier, what is really created is the metadata representing the table and its mapping to files on HDFS as well as a directory in which to store the data files. If a table or view with the same name already exists, Hive will raise an exception.

Both table and column names are case insensitive. In older versions of Hive (0.12 and earlier), only alphanumeric and underscore characters were allowed in table and column names. As of Hive 0.13, the system supports unicode characters in column names. Reserved words, such as load and create, need to be escaped by backticks (the ` character) to be treated literally.

The EXTERNAL keyword specifies that the table exists in resources out of Hive’s control, which can be a useful mechanism to extract data from another source at the beginning of a Hadoop-based Extract-Transform-Load (ETL) pipeline. The LOCATION clause specifies where the source file (or directory) is to be found. The EXTERNAL keyword and LOCATION clause have been used in the following code:

CREATE EXTERNAL TABLE tweets (
created_at string,
tweet_id string,
text string,
in_reply_to string,
retweeted boolean,
user_id string,
place_id string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY 'u0001'
STORED AS TEXTFILE
LOCATION '${input}/tweets';

This table will be created in metastore, but the data will not be copied into the /user/hive/warehouse directory.

Note that Hive has no concept of primary key or unique identifier. Uniqueness and data normalization are aspects to be addressed before loading data into the data warehouse.

The CREATE VIEW <view name> … AS SELECT statement creates a view with the given name. For example, we can create a view to isolate retweets from other messages, as follows:

CREATE VIEW retweets
COMMENT 'Tweets that have been retweeted'
AS SELECT * FROM tweets WHERE retweeted = true;

Unless otherwise specified, column names are derived from the defining SELECT statement. Hive does not currently support materialized views.

The DROP TABLE and DROP VIEW statements remove both metadata and data for a given table or view. When dropping an EXTERNAL table or a view, only metadata will be removed and the actual data files will not be affected.

Hive allows table metadata to be altered via the ALTER TABLE statement, which can be used to change a column type, name, position, and comment or to add and replace columns.

When adding columns, it is important to remember that only metadata will be changed and not the dataset itself. This means that if we were to add a column in the middle of the table which didn’t exist in older files, then while selecting from older data, we might get wrong values in the wrong columns. This is because we would be looking at old files with a new format

Similarly, ALTER VIEW <view name> AS <select statement> changes the definition of an existing view.

File formats and storage

The data files underlying a Hive table are no different from any other file on HDFS. Users can directly read the HDFS files in the Hive tables using other tools. They can also use other tools to write to HDFS files that can be loaded into Hive through CREATE EXTERNAL TABLE or through LOAD DATA INPATH.

Hive uses the Serializer and Deserializer classes, SerDe, as well as FileFormat to read and write table rows. A native SerDe is used if ROW FORMAT is not specified or ROW FORMAT DELIMITED is specified in a CREATE TABLE statement. The DELIMITED clause instructs the system to read delimited files. Delimiter characters can be escaped using the ESCAPED BY clause.

Hive currently uses the following FileFormat classes to read and write HDFS files:

  • TextInputFormat and HiveIgnoreKeyTextOutputFormat: will read/write data in plain text file format
  • SequenceFileInputFormat and SequenceFileOutputFormat: classes read/write data in the Hadoop SequenceFile format

Additionally, the following SerDe classes can be used to serialize and deserialize data:

  • MetadataTypedColumnsetSerDe: This will read/write delimited records such as CSV or tab-separated records
  • ThriftSerDe, and DynamicSerDe: These will read/write Thrift objects

JSON

As of version 0.13, Hive ships with the native org.apache.hive.hcatalog.data.JsonSerDe JSON SerDe. For older versions of Hive, Hive-JSON-Serde (found at https://github.com/rcongiu/Hive-JSON-Serde) is arguably one of the most feature-rich JSON serialization/deserialization modules.

We can use either module to load JSON tweets without any need for preprocessing and just define a Hive schema that matches the content of a JSON document. In the following example, we use Hive-JSON-Serde.

As with any third-party module, we load the SerDe JARS into Hive with the following code:

ADD JAR JAR json-serde-1.3-jar-with-dependencies.jar;

Then, we issue the usual create statement, as follows:

CREATE EXTERNAL TABLE tweets (
   contributors string,
   coordinates struct <
     coordinates: array <float>,
     type: string>,
   created_at string,
   entities struct <
     hashtags: array <struct <
           indices: array <tinyint>,
           text: string>>,
…
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'tweets';

With this SerDe, we can map nested documents (such as entities or users) to the struct or map types. We tell Hive that the data stored at LOCATION ‘tweets’ is text (STORED AS TEXTFILE) and that each row is a JSON object (ROW FORMAT SERDE ‘org.openx.data.jsonserde.JsonSerDe‘). In Hive 0.13 and later, we can express this property as ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe’.

Manually specifying the schema for complex documents can be a tedious and error-prone process. The hive-json module (found at https://github.com/hortonworks/hive-json) is a handy utility to analyze large documents and generate an appropriate Hive schema. Depending on the document collection, further refinement might be necessary.

In our example, we used a schema generated with hive-json that maps the tweets JSON to a number of struct data types. This allows us to query the data using a handy dot notation. For instance, we can extract the screen name and description fields of a user object with the following code:

SELECT user.screen_name, user.description FROM tweets_json LIMIT 10;

Avro

AvroSerde (https://cwiki.apache.org/confluence/display/Hive/AvroSerDe) allows us to read and write data in Avro format. Starting from 0.14, Avro-backed tables can be created using the STORED AS AVRO statement, and Hive will take care of creating an appropriate Avro schema for the table. Prior versions of Hive are a bit more verbose.

This dataset was created using Pig’s AvroStorage class, which generated the following schema:

{
"type":"record",
"name":"record",
"fields": [
   {"name":"topic","type":["null","int"]},
   {"name":"source","type":["null","int"]},
   {"name":"rank","type":["null","float"]}
]
}

The table structure is captured in an Avro record, which contains header information (a name and optional namespace to qualify the name) and an array of the fields. Each field is specified with its name and type as well as an optional documentation string.

For a few of the fields, the type is not a single value, but instead a pair of values, one of which is null. This is an Avro union, and this is the idiomatic way of handling columns that might have a null value. Avro specifies null as a concrete type, and any location where another type might have a null value needs to be specified in this way. This will be handled transparently for us when we use the following schema.

With this definition, we can now create a Hive table that uses this schema for its table specification, as follows:

CREATE EXTERNAL TABLE tweets_pagerank
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.literal'='{
   "type":"record",
   "name":"record",
   "fields": [
       {"name":"topic","type":["null","int"]},
       {"name":"source","type":["null","int"]},
       {"name":"rank","type":["null","float"]}
   ]
}')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '${data}/ch5-pagerank';

Then, look at the following table definition from within Hive (note also that HCatalog):

DESCRIBE tweets_pagerank;
OK
topic                 int                   from deserializer  
source               int                   from deserializer  
rank                 float                 from deserializer

In the DDL, we told Hive that data is stored in Avro format using AvroContainerInputFormat and AvroContainerOutputFormat. Each row needs to be serialized and deserialized using org.apache.hadoop.hive.serde2.avro.AvroSerDe. The table schema is inferred by Hive from the Avro schema embedded in avro.schema.literal.

Alternatively, we can store a schema on HDFS and have Hive read it to determine the table structure. Create the preceding schema in a file called pagerank.avsc—this is the standard file extension for Avro schemas. Then place it on HDFS; we prefer to have a common location for schema files such as /schema/avro. Finally, define the table using the avro.schema.url SerDe property WITH SERDEPROPERTIES (‘avro.schema.url’=’hdfs://<namenode>/schema/avro/pagerank.avsc’).

If Avro dependencies are not present in the classpath, we need to add the Avro MapReduce JAR to our environment before accessing individual fields. Within Hive, on the Cloudera CDH5 VM:

ADD JAR /opt/cloudera/parcels/CDH/lib/avro/avro-mapred-hadoop2.jar;

We can also use this table like any other. For instance, we can query the data to select the user and topic pairs with a high PageRank:

SELECT source, topic from tweets_pagerank WHERE rank >= 0.9;

Columnar stores

Hive can also take advantage of columnar storage via the ORC (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) and Parquet (https://cwiki.apache.org/confluence/display/Hive/Parquet) formats.

If a table is defined with very many columns, it is not unusual for any given query to only process a small subset of these columns. But even in a SequenceFile each full row and all its columns will be read from disk, decompressed, and processed. This consumes a lot of system resources for data that we know in advance is not of interest.

Traditional relational databases also store data on a row basis, and a type of database called columnar changed this to be column-focused. In the simplest model, instead of one file for each table, there would be one file for each column in the table. If a query only needed to access five columns in a table with 100 columns in total, then only the files for those five columns will be read. Both ORC and Parquet use this principle as well as other optimizations to enable much faster queries.

Queries

Tables can be queried using the familiar SELECT … FROM statement. The WHERE statement allows the specification of filtering conditions, GROUP BY aggregates records, ORDER BY specifies sorting criteria, and LIMIT specifies the number of records to retrieve. Aggregate functions, such as count and sum, can be applied to aggregated records. For instance, the following code returns the top 10 most prolific users in the dataset:

SELECT user_id, COUNT(*) AS cnt FROM tweets GROUP BY user_id ORDER BY cnt DESC LIMIT 10

The following are the top 10 most prolific users in the dataset:

NULL 7091
1332188053 4
959468857 3
1367752118 3
362562944 3
58646041 3
2375296688 3
1468188529 3
37114209 3
2385040940 3

We can improve the readability of the hive output by setting the following:

SET hive.cli.print.header=true;

This will instruct hive, though not beeline, to print column names as part of the output.

You can add the command to the .hiverc file usually found in the root of the executing user’s home directory to have it apply to all hive CLI sessions.

HiveQL implements a JOIN operator that enables us to combine tables together. In the Prerequisites section, we generated separate datasets for the user and place objects. Let’s now load them into hive using external tables.

We first create a user table to store user data, as follows:

CREATE EXTERNAL TABLE user (
created_at string,
user_id string,
`location` string,
name string,
description string,
followers_count bigint,
friends_count bigint,
favourites_count bigint,
screen_name string,
listed_count bigint
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY 'u0001'
STORED AS TEXTFILE
LOCATION '${input}/users';

We then create a place table to store location data, as follows:

CREATE EXTERNAL TABLE place (
place_id string,
country_code string,
country string,
`name` string,
full_name string,
place_type string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY 'u0001'
STORED AS TEXTFILE
LOCATION '${input}/places';

We can use the JOIN operator to display the names of the 10 most prolific users, as follows:

SELECT tweets.user_id, user.name, COUNT(tweets.user_id) AS cnt
FROM tweets
JOIN user ON user.user_id = tweets.user_id
GROUP BY tweets.user_id, user.user_id, user.name
ORDER BY cnt DESC LIMIT 10;

Only equality, outer, and left (semi) joins are supported in Hive.

Notice that there might be multiple entries with a given user ID but different values for the followers_count, friends_count, and favourites_count columns. To avoid duplicate entries, we count only user_id from the tweets tables.

We can rewrite the previous query as follows:

SELECT tweets.user_id, u.name, COUNT(*) AS cnt 
FROM tweets 
join (SELECT user_id, name FROM user GROUP BY user_id, name) u
ON u.user_id = tweets.user_id
GROUP BY tweets.user_id, u.name 
ORDER BY cnt DESC LIMIT 10;   

Instead of directly joining the user table, we execute a subquery, as follows:

SELECT user_id, name FROM user GROUP BY user_id, name;

The subquery extracts unique user IDs and names. Note that Hive has limited support for subqueries, historically only permitting a subquery in the FROM clause of a SELECT statement. Hive 0.13 has added limited support for subqueries within the WHERE clause also.

HiveQL is an ever-evolving rich language, a full exposition of which is beyond the scope of this article. A description of its query and ddl capabilities can be found at  https://cwiki.apache.org/confluence/display/Hive/LanguageManual.

Structuring Hive tables for given workloads

Often Hive isn’t used in isolation, instead tables are created with particular workloads in mind or needs invoked in ways that are suitable for inclusion in automated processes. We’ll now explore some of these scenarios.

Partitioning a table

With columnar file formats, we explained the benefits of excluding unneeded data as early as possible when processing a query. A similar concept has been used in SQL for some time: table partitioning.

When creating a partitioned table, a column is specified as the partition key. All values with that key are then stored together. In Hive’s case, different subdirectories for each partition key are created under the table directory in the warehouse location on HDFS.

It’s important to understand the cardinality of the partition column. With too few distinct values, the benefits are reduced as the files are still very large. If there are too many values, then queries might need a large number of files to be scanned to access all the required data. Perhaps the most common partition key is one based on date. We could, for example, partition our user table from earlier based on the created_at column, that is, the date the user was first registered. Note that since partitioning a table by definition affects its file structure, we create this table now as a non-external one, as follows:

CREATE TABLE partitioned_user (
created_at string,
user_id string,
`location` string,
name string,
description string,
followers_count bigint,
friends_count bigint,
favourites_count bigint,
screen_name string,
listed_count bigint
) PARTITIONED BY (created_at_date string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 'u0001'
STORED AS TEXTFILE;

To load data into a partition, we can explicitly give a value for the partition into which to insert the data, as follows:

INSERT INTO TABLE partitioned_user
PARTITION( created_at_date = '2014-01-01')
SELECT
created_at,
user_id,
location,
name,
description,
followers_count,
friends_count,
favourites_count,
screen_name,
listed_count
FROM user;

This is at best verbose, as we need a statement for each partition key value; if a single LOAD or INSERT statement contains data for multiple partitions, it just won’t work. Hive also has a feature called dynamic partitioning, which can help us here. We set the following three variables:

SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.exec.max.dynamic.partitions.pernode=5000;

The first two statements enable all partitions (nonstrict option) to be dynamic. The third one allows 5,000 distinct partitions to be created on each mapper and reducer node.

We can then simply use the name of the column to be used as the partition key, and Hive will insert data into partitions depending on the value of the key for a given row:

INSERT INTO TABLE partitioned_user
PARTITION( created_at_date )
SELECT
created_at,
user_id,
location,
name,
description,
followers_count,
friends_count,
favourites_count,
screen_name,
listed_count,
to_date(created_at) as created_at_date
FROM user;

Even though we use only a single partition column here, we can partition a table by multiple column keys; just have them as a comma-separated list in the PARTITIONED BY clause.

Note that the partition key columns need to be included as the last columns in any statement being used to insert into a partitioned table. In the preceding code we use Hive’s to_date function to convert the created_at timestamp to a YYYY-MM-DD formatted string.

Partitioned data is stored in HDFS as /path/to/warehouse/<database>/<table>/key=<value>. In our example, the partitioned_user table structure will look like /user/hive/warehouse/default/partitioned_user/created_at=2014-04-01.

If data is added directly to the filesystem, for instance by some third-party processing tool or by hadoop fs -put, the metastore won’t automatically detect the new partitions. The user will need to manually run an ALTER TABLE statement such as the following for each newly added partition:

ALTER TABLE <table_name> ADD PARTITION <location>;

To add metadata for all partitions not currently present in the metastore we can use: MSCK REPAIR TABLE <table_name>; statement. On EMR, this is equivalent to executing the following statement:

ALTER TABLE <table_name> RECOVER PARTITIONS;

Notice that both statements will work also with EXTERNAL tables.

Overwriting and updating data

Partitioning is also useful when we need to update a portion of a table. Normally a statement of the following form will replace all the data for the destination table:

INSERT OVERWRITE INTO <table>…

If OVERWRITE is omitted, then each INSERT statement will add additional data to the table. Sometimes, this is desirable, but often, the source data being ingested into a Hive table is intended to fully update a subset of the data and keep the rest untouched.

If we perform an INSERT OVERWRITE statement (or a LOAD OVERWRITE statement) into a partition of a table, then only the specified partition will be affected. Thus, if we were inserting user data and only wanted to affect the partitions with data in the source file, we could achieve this by adding the OVERWRITE keyword to our previous INSERT statement.

We can also add caveats to the SELECT statement. Say, for example, we only wanted to update data for a certain month:

INSERT INTO TABLE partitioned_user
PARTITION (created_at_date)
SELECT created_at ,
user_id,
location,
name,
description,
followers_count,
friends_count,
favourites_count,
screen_name,
listed_count,
to_date(created_at) as created_at_date
FROM user
WHERE to_date(created_at) BETWEEN '2014-03-01' and '2014-03-31';

Bucketing and sorting

Partitioning a table is a construct that you take explicit advantage of by using the partition column (or columns) in the WHERE clause of queries against the tables. There is another mechanism called bucketing that can further segment how a table is stored and does so in a way that allows Hive itself to optimize its internal query plans to take advantage of the structure.

Let’s create bucketed versions of our tweets and user tables; note the following additional CLUSTER BY and SORT BY statements in the CREATE TABLE statements:

CREATE table bucketed_tweets (
tweet_id string,
text string,
in_reply_to string,
retweeted boolean,
user_id string,
place_id string
) PARTITIONED BY (created_at string)
CLUSTERED BY(user_ID) into 64 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 'u0001'
STORED AS TEXTFILE;
 
CREATE TABLE bucketed_user (
user_id string,
`location` string,
name string,
description string,
followers_count bigint,
friends_count bigint,
favourites_count bigint,
screen_name string,
listed_count bigint
) PARTITIONED BY (created_at string)
CLUSTERED BY(user_ID) SORTED BY(name) into 64 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 'u0001'
STORED AS TEXTFILE;

Note that we changed the tweets table to also be partitioned; you can only bucket a table that is partitioned.

Just as we need to specify a partition column when inserting into a partitioned table, we must also take care to ensure that data inserted into a bucketed table is correctly clustered. We do this by setting the following flag before inserting the data into the table:

SET hive.enforce.bucketing=true;

Just as with partitioned tables, you cannot apply the bucketing function when using the LOAD DATA statement; if you wish to load external data into a bucketed table, first insert it into a temporary table, and then use the INSERT…SELECT… syntax to populate the bucketed table.

When data is inserted into a bucketed table, rows are allocated to a bucket based on the result of a hash function applied to the column specified in the CLUSTERED BY clause.

One of the greatest advantages of bucketing a table comes when we need to join two tables that are similarly bucketed, as in the previous example. So, for example, any query of the following form would be vastly improved:

SET hive.optimize.bucketmapjoin=true;
SELECT …
FROM bucketed_user u JOIN bucketed_tweet t
ON u.user_id = t.user_id;

With the join being performed on the column used to bucket the table, Hive can optimize the amount of processing as it knows that each bucket contains the same set of user_id columns in both tables. While determining which rows against which to match, only those in the bucket need to be compared against, and not the whole table. This does require that the tables are both clustered on the same column and that the bucket numbers are either identical or one is a multiple of the other. In the latter case, with say one table clustered into 32 buckets and another into 64, the nature of the default hash function used to allocate data to a bucket means that the IDs in bucket 3 in the first table will cover those in both buckets 3 and 35 in the second.

Sampling data

Bucketing a table can also help while using Hive’s ability to sample data in a table. Sampling allows a query to gather only a specified subset of the overall rows in the table. This is useful when you have an extremely large table with moderately consistent data patterns. In such a case, applying a query to a small fraction of the data will be much faster and will still give a broadly representative result. Note, of course, that this only applies to queries where you are looking to determine table characteristics, such as pattern ranges in the data; if you are trying to count anything, then the result needs to be scaled to the full table size.

For a non-bucketed table, you can sample in a mechanism similar to what we saw earlier by specifying that the query should only be applied to a certain subset of the table:

SELECT max(friends_count)
FROM user TABLESAMPLE(BUCKET 2 OUT OF 64 ON name);

In this query, Hive will effectively hash the rows in the table into 64 buckets based on the name column. It will then only use the second bucket for the query. Multiple buckets can be specified, and if RAND() is given as the ON clause, then the entire row is used by the bucketing function.

Though successful, this is highly inefficient as the full table needs to be scanned to generate the required subset of data. If we sample on a bucketed table and ensure the number of buckets sampled is equal to or a multiple of the buckets in the table, then Hive will only read the buckets in question. For example:

SELECT MAX(friends_count)
FROM bucketed_user TABLESAMPLE(BUCKET 2 OUT OF 32 on user_id);

In the preceding query against the bucketed_user table, which is created with 64 buckets on the user_id column, the sampling, since it is using the same column, will only read the required buckets. In this case, these will be buckets 2 and 34 from each partition.

A final form of sampling is block sampling. In this case, we can specify the required amount of the table to be sampled, and Hive will use an approximation of this by only reading enough source data blocks on HDFS to meet the required size. Currently, the data size can be specified as either a percentage of the table, as an absolute data size, or as a number of rows (in each block). The syntax for TABLESAMPLE is as follows, which will sample 0.5 percent of the table, 1 GB of data or 100 rows per split, respectively:

TABLESAMPLE(0.5 PERCENT)
TABLESAMPLE(1G)
TABLESAMPLE(100 ROWS)

If these latter forms of sampling are of interest, then consult the documentation, as there are some specific limitations on the input format and file formats that are supported.

Writing scripts

We can place Hive commands in a file and run them with the -f option in the hive CLI utility:

$ cat show_tables.hql
show tables;
$ hive -f show_tables.hql 

We can parameterize HiveQL statements by means of the hiveconf mechanism. This allows us to specify an environment variable name at the point it is used rather than at the point of invocation. For example:

$ cat show_tables2.hql
show tables like '${hiveconf:TABLENAME}';
$ hive -hiveconf TABLENAME=user -f show_tables2.hql

The variable can also be set within the Hive script or an interactive session:

SET TABLE_NAME='user';

The preceding hiveconf argument will add any new variables in the same namespace as the Hive configuration options. As of Hive 0.8, there is a similar option called hivevar that adds any user variables into a distinct namespace. Using hivevar, the preceding command would be as follows:

$ cat show_tables3.hql
show tables like '${hivevar:TABLENAME}';
$ hive -hivevar TABLENAME=user –f show_tables3.hql

Or we can write the command interactively:

SET hivevar_TABLE_NAME='user';

Summary

In this article, we learned that in its early days, Hadoop was sometimes erroneously seen as the latest supposed relational database killer. Over time, it has become more apparent that the more sensible approach is to view it as a complement to RDBMS technologies and that, in fact, the RDBMS community has developed tools such as SQL that are also valuable in the Hadoop world.

HiveQL is an implementation of SQL on Hadoop and was the primary focus of this article. In regard to HiveQL and its implementations, we covered the following topics:

  • How HiveQL provides a logical model atop data stored in HDFS in contrast to relational databases where the table structure is enforced in advance
  • How HiveQL offers the ability to extend its core set of operators with user-defined code and how this contrasts to the Pig UDF mechanism
  • The recent history of Hive developments, such as the Stinger initiative, that have seen Hive transition to an updated implementation that uses Tez

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here