Categories: TutorialsData

Storing Apache Storm data in Elasticsearch

6 min read

[box type=”note” align=”” class=”” width=””]This article is an excerpt from a book by Ankit Jain titled Mastering Apache Storm. This book explores various real-time processing functionalities offered by Apache Storm such as parallelism, data partitioning, and more.[/box]

In this article, we are going to cover how to store the data processed by Apache Storm in Elasticsearch. Elasticsearch is an open source, distributed search engine platform developed on Lucene. It provides a multitenant-capable, full-text search engine capability.Though Apache storm is meant for real-time data processing, in most cases, we need to store the processed data in a data store so that it can be used for further batch analysis and to execute the batch analysis queries on the data stored.

We assume that Elasticsearch is running on your environment. Please refer to https://www.elastic.co/guide/en/elasticsearch/reference/2.3/_installation.html to install Elasticsearch on any of the boxes if you don’t have any running Elasticsearch cluster. Go through the following steps to integrate Storm with Elasticsearch:

  1. Create a Maven project using com.stormadvance for the groupID and storm_elasticsearch for the artifactID.
  2. Add the following dependencies and repositories to the pom.xml file:
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
</dependency>
</dependencies>

3. Create an ElasticSearchOperation class in the com.stormadvance.storm_elasticsearch package.The ElasticSearchOperation class contains the following method:

insert(Map<String, Object> data, String indexName, String indexMapping, String indexId): This method takes the record data, indexName, indexMapping, and indexId as input, and inserts the input record in Elasticsearch.

The following is the source code of the ElasticSearchOperation class:

public class ElasticSearchOperation { private TransportClient client;

public ElasticSearchOperation(List<String> esNodes) throws Exception {



try {

Settings settings = Settings.settingsBuilder()

.put("cluster.name", "elasticsearch").build(); client =

TransportClient.builder().settings(settings).build(); for (String esNode : esNodes) {

client.addTransportAddress(new InetSocketTransportAddress(

InetAddress.getByName(esNode), 9300));

}

} catch (Exception e) { throw e;

}

}

public void insert(Map<String, Object> data, String indexName, String indexMapping, String indexId) {

client.prepareIndex(indexName, indexMapping, indexId)

.setSource(data).get();

}

public static void main(String[] s){ try{

List<String> esNodes = new ArrayList<String>(); esNodes.add("127.0.0.1");

ElasticSearchOperation elasticSearchOperation = new ElasticSearchOperation(esNodes);

Map<String, Object> data = new HashMap<String,



Object>();



data.put("name", "name");

data.put("add", "add");



elasticSearchOperation.insert(data,"indexName","indexMapping",UUID. randomUUID().toString());

}catch(Exception e) { e.printStackTrace();

//System.out.println(e);

}

}

}

4. Create a SampleSpout class in the com.stormadvance.stormhbase package. This class generates random records and passes them to the next action (bolt) in the topology. The following is the format of the record generated by the SampleSpout class:

[“john”,”watson”,”abc”]

The following is the source code of the SampleSpout class:

public class SampleSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector spoutOutputCollector;

private static final Map<Integer, String> FIRSTNAMEMAP = new HashMap<Integer, String>();

static {

FIRSTNAMEMAP.put(0, "john"); FIRSTNAMEMAP.put(1, "nick"); FIRSTNAMEMAP.put(2, "mick"); FIRSTNAMEMAP.put(3, "tom"); FIRSTNAMEMAP.put(4, "jerry");

}

private static final Map<Integer, String> LASTNAME = new   HashMap<Integer, String>();

static {

LASTNAME.put(0, "anderson"); LASTNAME.put(1, "watson"); LASTNAME.put(2, "ponting"); LASTNAME.put(3, "dravid"); LASTNAME.put(4, "lara");

}

private static final Map<Integer, String> COMPANYNAME = new HashMap<Integer, String>();

static {

COMPANYNAME.put(0, "abc"); COMPANYNAME.put(1, "dfg"); COMPANYNAME.put(2, "pqr"); COMPANYNAME.put(3, "ecd"); COMPANYNAME.put(4, "awe");

}

public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {

// Open the spout

this.spoutOutputCollector = spoutOutputCollector;

}

public void nextTuple() {



// Storm cluster repeatedly call this method to emit the continuous //

// stream of tuples.

final Random rand = new Random();

// generate the random number from 0 to 4. int randomNumber = rand.nextInt(5); spoutOutputCollector.emit (new

Values(FIRSTNAMEMAP.get(randomNumber),LASTNAME.get(randomNumber),CO MPANYNAME.get(randomNumber)));

}

public void declareOutputFields(OutputFieldsDeclarer declarer)

{

// emits the field   firstName , lastName and companyName. declarer.declare(new

Fields("firstName","lastName","companyName"));

}

}

5. Create an ESBolt class in the com.stormadvance.storm_elasticsearch package. This bolt receives the tuples emitted by the SampleSpout class, converts it to the Map structure, and then calls the insert() method of the ElasticSearchOperation class to insert the record into Elasticsearch. The following is the source code of the ESBolt class:

public class ESBolt implements IBasicBolt {

private static final long serialVersionUID = 2L; private ElasticSearchOperation elasticSearchOperation; private List<String> esNodes;

/**

*

* @param esNodes

*/
public ESBolt(List<String> esNodes) { this.esNodes = esNodes;

}

public void execute(Tuple input, BasicOutputCollector collector) {

Map<String, Object> personalMap = new HashMap<String, Object>();

// "firstName","lastName","companyName") personalMap.put("firstName",

input.getValueByField("firstName")); personalMap.put("lastName",

input.getValueByField("lastName"));

personalMap.put("companyName", input.getValueByField("companyName")); elasticSearchOperation.insert(personalMap,"person","personmapping", UUID.randomUUID().toString());

}

public void declareOutputFields(OutputFieldsDeclarer declarer)

{

}

public Map<String, Object> getComponentConfiguration() {

// TODO Auto-generated method stub return null;

}

public void prepare(Map stormConf, TopologyContext context) { try {

// create the instance of ESOperations class elasticSearchOperation = new

 ElasticSearchOperation(esNodes);

} catch (Exception e) {

throw new RuntimeException();

}

}

public void cleanup() {

}

}

6.  Create an ESTopology class in the com.stormadvance.storm_elasticsearch package. This class creates an instance of the spout and bolt classes and chains them together using a TopologyBuilder class. The following is the implementation of the main class:

public class ESTopology {

public static void main(String[] args) throws AlreadyAliveException,

InvalidTopologyException {

TopologyBuilder builder = new TopologyBuilder();

//ES Node list

List<String> esNodes = new ArrayList<String>(); esNodes.add("10.191.209.14");

// set the spout class

builder.setSpout("spout", new SampleSpout(), 2);

// set the ES bolt class builder.setBolt("bolt", new ESBolt(esNodes), 2)

.shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(true);

// create an instance of LocalCluster class for

// executing topology in local mode. LocalCluster cluster = new LocalCluster();

// ESTopology is the name of submitted topology. cluster.submitTopology("ESTopology", conf,

builder.createTopology());

try {

Thread.sleep(60000);

} catch (Exception exception) {

System.out.println("Thread interrupted exception : " +

exception);

}

System.out.println("Stopped Called : ");

// kill the LearningStormTopology cluster.killTopology("StormHBaseTopology");

// shutdown the storm test cluster cluster.shutdown();

}

}

To summarize we covered how we can store the data processed by Apache Storm into Elasticsearch by making the connection with Elasticsearch nodes inside the Storm bolts.

If you enjoyed this post, check out the book Mastering Apache Storm to know more about different types of real time processing techniques used to create distributed applications.

Richa Tripathi

Share
Published by
Richa Tripathi

Recent Posts

Top life hacks for prepping for your IT certification exam

I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…

3 years ago

Learn Transformers for Natural Language Processing with Denis Rothman

Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…

3 years ago

Learning Essential Linux Commands for Navigating the Shell Effectively

Once we learn how to deploy an Ubuntu server, how to manage users, and how…

3 years ago

Clean Coding in Python with Mariano Anaya

Key-takeaways:   Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…

3 years ago

Exploring Forms in Angular – types, benefits and differences   

While developing a web application, or setting dynamic pages and meta tags we need to deal with…

3 years ago

Gain Practical Expertise with the Latest Edition of Software Architecture with C# 9 and .NET 5

Software architecture is one of the most discussed topics in the software industry today, and…

3 years ago