8 min read

[box type=”note” align=”” class=”” width=””]In this article by Shilpi Saxena and Saurabh Gupta from their book Practical Real-time data Processing and Analytics we shall explore Storm’s architecture with its components and configure it to run in a cluster. [/box]

Initially, real-time processing was implemented by pushing messages into a queue and then reading the messages from it using Python or any other language to process them one by one. The primary challenges with this approach were:

  • In case of failure of the processing of any message, it has to be put back in to queue for reprocessing
  • Keeping queues and the worker (processing unit) up and running all the time

Below are the two main reasons that make Storm a highly reliable real-time engine:

  • Abstraction: Storm is distributed abstraction in the form of Streams. A Stream can be produced and processed in parallel. Spout can produce new Stream and Bolt is a small unit of processing on stream. Topology is the top level abstraction. The advantage of abstraction here is that nobody must be worried about what is going on internally, like serialization/deserialization, sending/receiving message between different processes, and so on. The user must be focused on writing the business logic.
  • A guaranteed message processing algorithm:  Nathan Marz developed an algorithm based on random numbers and XORs that would only require about 20 bytes to track each spout tuple, regardless of how much processing was triggered downstream.

Storm Architecture and Storm components

  • The nimbus node acts as the master node in a Storm cluster. It is responsible for analyzing topology and distributing tasks on different supervisors as per the availability. Also, it monitors failure; in the case that one of the supervisors dies, it then redistributes the tasks among available supervisors. Nimbus node uses Zookeeper to keep track of tasks to maintain the state. In case of Nimbus node failure, it can be restarted which reads the state from Zookeeper and start from the same point where it failed earlier.
  • Supervisors act as slave nodes in the Storm cluster. One or more workers, that is, JVM processes, can run in each supervisor node. A supervisor co-ordinates with workers to complete the tasks assigned by nimbus node. In the case of worker process failure, the supervisor finds available workers to complete the tasks.
  • A worker process is a JVM running in a supervisor node. It has executors. There can be one or more executors in the worker process. Worker co-ordinates with executor to finish up the task.
  • An executor is single thread process spawned by a worker. Each executor is responsible for running one or more tasks.
  • A task is a single unit of work. It performs actual processing on data. It can be either Spout or Bolt.
  • Apart from above processes, there are two important parts of a Storm cluster; they are logging and Storm UI. The logviewer service is used to debug logs for workers at supervisors on Storm UI.

The following are the primary characteristics of Storm that make it special and ideal for real-time processing.

  • Fast
  • Reliable
  • Fault-Tolerant
  • Scalable
  • Programming Language Agnostic

Strom Components

  • Tuple: It is the basic data structure of Storm. It can hold multiple values and data type of each value can be different.
  • Topology: As mentioned earlier, topology is the highest level of abstraction. It contains the flow of processing including spout and bolts. It is kind of graph computation.
  • Stream: The stream is core abstraction of Storm. It is a sequence of unbounded tuples. A stream can be processed by the different type of bolts and which results into a new stream.
  • Spout: Spout is a source of stream. It reads messages from sources like Kafka, RabbitMQ, and so on as tuples and emits them in a stream. There are two types of Spout

Reliable: Spout keeps track of each tuple and replay tuple in case of any failure.

Unreliable: Spout does not care about the tuple once it is emitted as a stream to another bolt or spout.

Setting up and configuring Storm

Before setting up Storm, we need to setup Zookeeper which is required by Storm:

Setting up Zookeeper

Below are instructions on how to install, configure and run Zookeeper in standalone and cluster mode:

Installing

Download Zookeeper from http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz. After the download, extract zookeeper-3.4.6.tar.gz as below:

tar -xvf zookeeper-3.4.6.tar.gz

The following files and folders will be extracted:

Configuring

There are two types of deployment with Zookeeper; they are standalone and cluster. There is no big difference in configuration, just new extra parameters for cluster mode.

Standalone

As shown, in the previous figure, go to the conf folder and change the zoo.cfg file as follows:

tickTime=2000 # Length of single tick in milliseconds. It is used to
# regulate heartbeat and timeouts.
initLimit=5 # Amount of time to allow followers to connect and sync
# with leader.
syncLimit=2 # Amount of time to allow followers to sync with
# Zookeeper
dataDir=/tmp/zookeeper/tmp # Directory where Zookeeper keeps
# transaction logs
clientPort=2182 # Listening port for client to connect.
maxClientCnxns=30 # Maximum limit of client to connect to Zookeeper
# node.

Cluster

In addition to above configuration, add the following configuration to the cluster as well:

server.1=zkp-1:2888:3888

server.2=zkp-2:2888:3888

server.3=zkp-3:2888:3888

server.x=[hostname]nnnn:mmmm : Here x is id assigned to each Zookeeper node. In datadir, configured above, create a file “myid” and put corresponding ID of Zookeeper in it. It should be unique across the cluster. The same ID is used as x here. Nnnn is the port used by followers to connect with leader node and mmmm is the port used for leader election.

Running

Use the following command to run Zookeeper from the Zookeeper home dir:

/bin/zkServer.sh start

The console will come out after the below message and the process will run in the background.
Starting zookeeper … STARTED
The following command can be used to check the status of Zookeeper process:

/bin/zkServer.sh status

The following output would be in standalone mode:

Mode: standalone

The following output would be in cluster mode:

Mode: follower # in case of follower node
Mode: leader  	  # in case of leader node

Setting up Apache Storm

Below are instructions on how to install, configure and run Storm with nimbus and supervisors.

Installing

Download Storm from http://www.apache.org/dyn/closer.lua/storm/apache-storm-1.0.3/apache-storm-1.0.3.tar.gz. After the download, extract apache-storm-1.0.3.tar.gz, as follows:

tar -xvf apache-storm-1.0.3.tar.gz

Below are the files and folders that will be extracted:
Configuring
As shown, in the previous figure, go to the conf folder and add/edit properties in storm.yaml:

  • Set the Zookeeper hostname in the Storm configuration:
    storm.zookeeper.servers:
    - "zkp-1"
    - "zkp-2"
    - "zkp-3"
  • Set the Zookeeper port:
    storm.zookeeper.port: 2182
  • Set the Nimbus node hostname so that storm supervisor can communicate with it:
    nimbus.host: "nimbus"
  • Set Storm local data directory to keep small information like conf, jars, and so on:
    storm.local.dir: "/usr/local/storm/tmp"
  • Set the number of workers that will run on current the supervisor node. It is best practice to use the same number of workers as the number of cores in the machine.
    supervisor.slots.ports:
        - 6700
        - 6701
        - 6702
        - 6703
        - 6704
        - 6705
    
    
  • Perform memory allocation to the worker, supervisor, and nimbus:
    worker.childopts: "-Xmx1024m"
    nimbus.childopts: "-XX:+UseConcMarkSweepGC –
    XX:+UseCMSInitiatingOccupancyOnly –
    XX_CMSInitiatingOccupancyFraction=70"
    supervisor.childopts: "-Xmx1024m"
    
  • Topologies related configuration: The first configuration is to configure the maximum amount of time (in seconds) for a tuple’s tree to be acknowledged (fully processed) before it is considered failed. The second configuration is that Debug logs are false, so Storm will generate only info logs.
    topology.message.timeout.secs: 60
    topology.debug: false

Running

There are four services needed to start a complete Storm cluster:

  • Nimbus: First of all, we need to start Nimbus service in Storm. The following is the command to start it:
    /bin/storm nimbus
  • Supervisor: Next, we need to start supervisor nodes to connect with the nimbus node. The following is the command:
    /bin/storm supervisor
  • UI: To start Storm UI, execute the following command:
    /bin/storm ui

    You can access UI on http://nimbus-host:8080. It is shown in following figure.

  • Logviewer: Log viewer service helps to see the worker logs in the Storm UI. Execute the following command to start it:
    /bin/storm logviewer

Summary

We started with the history of Storm, where we discussed how Nathan Marz the got idea for Storm and what type of challenges he faced while releasing Storm as open source software and then in Apache. We discussed the architecture of Storm and its components. Nimbus, supervisor worker, executors, and tasks are part of Storm’s architecture. Its components are tuple, stream, topology, spout, and bolt. We discussed how to set up Storm and configure it to run in the cluster. Zookeeper is required to be set up first, as Storm requires it.

The above was an excerpt from the book Practical Real-time data Processing and Analytics 

LEAVE A REPLY

Please enter your comment!
Please enter your name here