13 min read

In this article by Vishal Shukla, author of the book Elasticsearch for Hadoop, we will take a look at how ES-Hadoop can integrate with Pig and Spark with ease.

Elasticsearch is great in getting insights into the indexed data. The Hadoop ecosystem does a great job in making Hadoop easily usable for different users by providing a comfortable interface. Some of the examples are Hive and Pig. Apart from these, Hadoop integrates well with other computing engines and platforms, such as Spark and Cascading.

(For more resources related to this topic, see here.)

Pigging out Elasticsearch

For many use cases, Pig is one of the easiest ways to fiddle around with the data in the Hadoop ecosystem. Pig wins when it comes to ease of use and simple syntax for designing data flow pipelines without getting into complex programming. Assuming that you know Pig, we will cover how to move the data to and from Elasticsearch. If you don’t know Pig yet, never mind. You can still carry on with the steps, and by the end of the article, you will at least know how to use Pig to perform data ingestion and reading with Elasticsearch.

Setting up Apache Pig for Elasticsearch

Let’s start by setting up Apache Pig. At the time of writing this article, the latest Pig version available is 0.15.0. You can use the following steps to set up the same version:

  1. First, download the Pig distribution using the following command:
    $ sudo wget –O /usr/local/pig.tar.gz 
    http://mirrors.sonic.net/apache/pig/pig-0.15.0/pig-0.15.0.tar.gz
  2. Then, extract Pig to the desired location and rename it to a convenient name:
    $ cd /userusr/local
    $ sudo tar –xvf pig.tar.gz
    $ sudo mv pig-0.15.0 pig
  3. Now, export the required environment variables by appending the following two lines in the /home/eshadoop/.bashrc file:
    export PIG_HOME=/usr/local/pig
    export PATH=$PATH:$PIG_HOME/bin
  4. You can either log out and relogin to see the newly set environment variables or source the environment configuration with the following command:
    $ source ~/.bashrc
  5. Now, start the job history server daemon with the following command:
    $ mr-jobhistory-daemon.sh start historyserver
  6. You should see the Pig console with the following command:
    $ pig
    grunt>

    It’s easy to forget to start the job history daemon once you restart your machine or VM. You may make this daemon run on start up, or you need to ensure this manually.

Now, we have Pig up and running. In order to use Pig with Elasticsearch, we must ensure that the ES-Hadoop JAR file is available in the Pig classpath.

Let’s take the ES-Hadoop JAR file and and import it to HDFS using the following steps:

  1. First, download the ES-Hadoop JAR used to develop the examples in this article, as shown in the following command:
    $ wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.1.1/elasticsearch-hadoop-2.1.1.jar 
  2. Then, move the downloaded JAR to a convenient name as follows:

    $ sudo mkdir /opt/lib
  3. Now, import the JAR to HDFS:

    $ hadoop fs –mkdir /lib
    $ hadoop fs –put elasticsearch-hadoop-2.1.1.jar /lib/elasticsearch-hadoop-2.1.1.jar
    

Throughout this article, we will use a crime dataset that is tailored from the open dataset provided at https://data.cityofchicago.org/. This tailored dataset can be downloaded from http://www.packtpub.com/support, where all the code files required for this article are available.

Once you have downloaded the dataset, import it to HDFS at /ch07/crime_data.csv.

Importing data to Elasticsearch

Let’s import the crime dataset to Elasticsearch using Pig with ES-Hadoop. This provides the EsStorage class as Pig Storage.

  1. In order to use the EsStorage class, you need to have a registered ES-Hadoop JAR with Pig. You can register the JAR located in the local filesystem, HDFS, or other shared filesystems. The REGISTER command registers a JAR file that contains UDFs (User-defined functions) with Pig, as shown in the following code:
    grunt> REGISTER hdfs://localhost:9000/lib/elasticsearch-hadoop-2.1.1.jar;
  2. Then, load the CSV data file as a relation with the following code:

    grunt> SOURCE = load '/ch07/crimes_dataset.csv' using PigStorage(',') as (id:chararray,
    caseNumber:chararray, date:datetime, block:chararray, iucr:chararray, primaryType:chararray, description:chararray, location:chararray, arrest:boolean, domestic:boolean, lat:double,lon:double);

    This command reads the CSV fields and maps each token in the data to the respective field in the preceding command. The resulting relation, SOURCE, represents a relation with the Bag data structure that contains multiple Tuples.

  3. Now, generate the target Pig relation that has the structure that matches closely to the target Elasticsearch index mapping, as shown in the following code:
    grunt> TARGET = foreach SOURCE generate id, caseNumber, date, block, iucr, primaryType, 
    description, location, arrest, domestic, TOTUPLE(lon, lat) AS geoLocation;

    Here, we need the nested object with the geoLocation name in the target Elasticsearch document. We can achieve this with a Tuple to represent the lat and lon fields. TOTUPLE() helps us to create this tuple. We then assigned the geoLocation alias for this tuple.

  4. Let’s store the TARGET relationto the Elasticsearch index with the following code:

    grunt> STORE TARGET INTO ‘esh_pig/crimes’ USING org.elasticsearch.hadoop.pig.EsStorage(‘es.http.timeout = 5m’, ‘es.index.auto.create = true’, ‘es.mapping.names=arrest:isArrest, domestic:isDomestic’, ‘es.mapping.id=id’);

    We can specify the target index and type to store indexed documents. The EsStorage class can accept multiple Elasticsearch configurations.es.mapping.names maps the Pig field name to Elasticsearch document’s field name. You can use Pig’s field id to assign a custom _id value for the Elasticsearch document using the es.mapping.id option. Similarly, you can set the _ttl and _timestamp metadata fields as well.

Pig uses just one reducer in the default configuration. It is recommended to change this behavior to have a parallelism that matches the number of shards available, as shown in the following command:

grunt> SET default_parallel 5;

Pig also combines the input splits, irrespective of its size. This makes it efficient for small files by reducing the number of mappers. However, this will give performance issues for large files. You can disable this behavior in the Pig script, as shown in the following command:

grunt> SET pig.splitCombination FALSE;

Executing the preceding commands will create the Elasticsearch index and import crime data documents. If you observe the created documents in Elasticsearch, you can see the geoLocation value isan array in the [-87.74274476, 41.87404405]format. This is because by default, ES-Hadoop ignores the tuple field names and simply converts them as an ordered array. If you wish to make your geoLocation field look similar to the key/value-based object with the lat/lon keys, you can do so by including the following configuration in EsStorage:

es.mapping.pig.tuple.use.field.names=true

Writing from the JSON source

If you have inputs as a well-formed JSON file, you can avoid conversion and transformations and directly pass the JSON document to Elasticsearch for indexing purposes.

You may have the JSON data in Pig as chararray, bytearray, or in any other form that translates to well-formed JSON by calling the toString() method, as shown in the following code:

grunt> JSON_DATA = LOAD '/ch07/crimes.json' USING PigStorage() AS (json:chararray);

grunt> STORE JSON_DATA INTO 'esh_pig/crimes_json' USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true');

Type conversions

Take a look at the the type mapping of the esh_pig index in Elasticsearch. It maps the geoLocation type to double. This is done because Elasticsearch inferred the double type based on the field type we specified in Pig. To map geoLocation to geo_point, you must create the Elasticsearch mapping for it manually before executing the script.

Although Elasticsearch provides a data type detection based on the type of field in the incoming document, it is always good to create the type mapping beforehand in Elasticsearch. This is a one-time activity that you should do. Then, you can run the MapReduce, Pig, Hive, Cascading, or Spark jobs multiple times. This will avoid any surprises in the type detection.

For your reference, here is a list of some of the field types of Pig and Elasticsearch that map to each other. The table doesn’t list no-brainer and absolutely intuitive type mappings:

Pig type

Elasticsearch type

chararray

This specifies string

bytearray

This indicates binary

tuple

This denotes an array(default) or object

bag

This specifies an array

map

This denotes an object

bigdecimal

This indicates Not supported

biginteger

This denotes Not supported

Reading data from Elasticsearch

Reading data from Elasticsearch using Pig is as simple as writing a single command with the Elasticsearch query.

Here is a snippet of how to print tuples that has crimes related to theft:

grunt> REGISTER hdfs://localhost:9000/lib/elasticsearch-hadoop-2.1.1.jar
grunt> ES = LOAD 'esh_pig/crimes' using org.elasticsearch.hadoop.pig.EsStorage('{"query" : 
{ "term" : { "primaryType" : "theft" } } }'); grunt> dump ES;

Executing the preceding commands will print the tuples Pig console.

Giving Spark to Elasticsearch

Spark is a distributed computing system that provides huge performance boost compared to Hadoop MapReduce. It works on an abstraction of RDD (Resilient-distributed Datasets). This can be created for any data residing in Hadoop. Without any surprises, ES-Hadoop provides easy integration with Spark by enabling the creation of RDD from the data in Elasticsearch.

Spark’s increasing support of integrating with various data sources, such as HDFS, Parquet, Avro, S3, Cassandra, relational databases, and streaming data makes it special when it comes to data integration. This means that when you use ES-Hadoop with Spark, you can make all these sources integrate with Elasticsearch easily.

Setting up Spark

In order to set up Apache Spark in order to execute a job, you can perform the following steps:

  1. First, download the Apache Spark distribution with the following command:
    $ sudo wget –O /usr/local/spark.tgz
    http://www.apache.org/dyn/closer.cgi/spark/spark-1.4.1/spark-1.4.1-bin-hadoop2.4.tgz
  2. Then, extract Spark to the desired location and rename it to a convenient name, as shown in the following command:
    $ cd /user/local
    $ sudo tar –xvf spark.tgz
    $ sudo mv spark-1.4.1-bin-hadoop2.4 spark
    

Importing data to Elasticsearch

To import the crime dataset to Elasticsearch with Spark, let’s see how we can write a Spark job. We will continue using Java to write Spark jobs for consistency. Here are the driver program’s snippets:

SparkConf conf = new SparkConf().setAppName("esh-spark").setMaster("local[4]");
   conf.set("es.index.auto.create", "true");
   JavaSparkContext context = new JavaSparkContext(conf);

Set up the SparkConf object to configure the spark job. As always, you can also set most options (such as es.index.auto.create) and other configurations that we have seen throughout the article. Using this configuration, we created the JavaSparkContext object as follows:

JavaRDD<String> textFile =
context.textFile("hdfs://localhost:9000/ch07/crimes_dataset.csv");

Read the crime data CSV file as JavaRDD. Here, RDD is still of the type String that represents each line:

JavaRDD<Crime> dataSplits = textFile.map(new Function<String, Crime>() {
  @Override
  public Crime call(String line) throws Exception {
    CSVParser parser = CSVParser.parse(line, 
    CSVFormat.RFC4180);
    Crime c = new Crime();
    CSVRecord record = parser.getRecords().get(0);
    c.setId(record.get(0));
    ..
    ..
    String lat = record.get(10);
    String lon = record.get(11);

    Map<String, Double> geoLocation = new HashMap<>();
    geoLocation.put("lat", StringUtils.isEmpty(lat)? 
    null:Double.parseDouble(lat));
    geoLocation.put("lon",StringUtils.isEmpty(lon)?null:Double.
    parseDouble(lon));
    c.setGeoLocation(geoLocation);
    return c;
    }
  });

In the preceding snippet, we called the map() method on JavaRDD to map each of the input line to the Crime object. Note that we created a simple JavaBean class called Crime that implements the Serializable interface and maps to the Elasticsearch document structure. Using CSVParser, we parsed each field into the Crime object. We mapped nested the geoLocation object by embedding Map in the Crime object. This map is populated with the lat and lon fields. This map() method returns another JavaRDD that contains the Crime objects, as shown in the following code:

JavaEsSpark.saveToEs(dataSplits, "esh_spark/crimes");

Save JavaRDD<Crime> to Elasticsearch with the JavaEsSpark class provided by Elasticsearch.

For all the ES-Hadoop integrations, such as Pig, Hive, Cascading, Apache Storm, and Spark, you can use all the standard ES-Hadoop configurations and techniques. This includes dynamic/multiresource writes with a pattern similar to esh_spark/{primaryType} and use JSON strings to directly import the data to Elasticsearch as well.

To control the Elasticsearch document metadata from being indexed, you can use the saveToEsWithMeta() method of JavaEsSpark. You can pass an instance of JavaPairRDD that contains Tuple2<Metadata, Object>, where Metadata represents a map that has the key/value pairs of the document metadata fields, such as id, ttl, timestamp, and version.

Using SparkSQL

ES-Hadoop also bridges Elasticsearch with the SparkSQL module. SparkSQL 1.3+ versions provide the DataFrame abstraction that represent a collection of Row. We will not discuss the details of DataFrame here. ES-Hadoop lets you persist your DataFrame instance to Elasticsearch transparently. Let’s see how we can do this with the following code:

SQLContext sqlContext = new SQLContext(context);
DataFrame df = sqlContext.createDataFrame(dataSplits, 
Crime.class);

Create an SQLContext instance using the JavaSparkContext instance. Using the SqlContextSqlContext instance, you can create DataFrame by calling the createDataFrame() method and passing the existing JavaRDD<T> and Class<T>, where T is a JavaBean class that implements the Serializable interface. Note that the passing class instance is required to infer a schema for DataFrame. If you wish to use nonJavaBean-based RDD, you can create the schema manually. The article source code contains the implementations of both the approaches for your reference. Take a look at the following code:

JavaEsSparkSQL.saveToEs(df, "esh_sparksql/crimes_reflection");

Once you have the DataFrame instance, you can save it to Elasticsearch with the JavaEsSparkSQL class, as shown in the preceding code.

Reading data from Elasticsearch

Here is the snippet of SparkEsReader that finds crimes related to theft:

JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(context, 
"esh_spark/crimes", "{"query" : { "term" : { "primaryType" : 
"theft" } } }").values();
for(Map<String,Object> item: esRDD.collect()){
       System.out.println(item);
   }

We used the same JavaEsSpark class to create RDD with documents that match the Elasticsearch query.

Using SparkSQL

ES-Hadoop provides a org.elasticsearch.spark.sql data source provider to read the data from Elasticsearch using SparkSQL, as shown in the following code:

Map<String, String> options = new HashMap<>();
options.put("pushdown","true");
options.put("es.nodes","localhost");
DataFrame df = sqlContext.read()
  .options(options)
  .format("org.elasticsearch.spark.sql")
  .load("esh_sparksql/crimes_reflection");

The preceding code snippet uses the org.elasticsearch.spark.sql data source to load data from Elasticsearch. You can set the pushdown option to true to push the query execution down to Elasticsearch. This greatly increases its efficiency as the query execution is collocated where the data resides, as shown in the following code:

df.registerTempTable("crimes");
DataFrame theftCrimes = sqlContext.sql("SELECT * FROM crimes WHERE primaryType='THEFT'");
for(Row row: theftCrimes.javaRDD().collect()){
  System.out.println(row);
}

We registered table with the data frame and executed the SQL query on SqlContext. Note that we need to collect the final results locally to print in a driver class.

Summary

In this article, we looked at the various Hadoop ecosystem technologies. We set up Pig with ES-Hadoop and developed the script to interact with Elasticsearch. You also learned how to use ES-Hadoop to integrate Elasticsearch with Spark and empower it with powerful SQL engine SparkSQL.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here