7 min read

[box type=”note” align=”” class=”” width=””]This article is an excerpt from a book authored by Ankit Jain titled Mastering Apache Storm. This book explores various real-time processing functionalities offered by Apache Storm such as parallelism, data partitioning, and more.[/box]

Today, we are going to cover how to stream tweets from Twitter using the twitter streaming API. We are also going to explore how we can store fetched tweets in Kafka for later processing through Storm.

Setting up a single node Kafka cluster

Following are the steps to set up a single node Kafka cluster:

  1.   Download the Kafka 0.9.x binary distribution named kafka_2.10-0.9.0.1.tar.gz from http://apache.claz.org/kafka/0.9.0. or 1/kafka_2.10-0.9.0.1.tgz.
  1. Extract the archive to wherever you want to install Kafka with the following command:
tar -xvzf kafka_2.10-0.9.0.1.tgz cd kafka_2.10-0.9.0.1
  1.   Change the following properties in the
$KAFKA_HOME/config/server.properties file:

log.dirs=/var/kafka- logszookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

Here, zoo1, zoo2, and zoo3 represent the hostnames of the ZooKeeper nodes. The following are the definitions of the important properties in the server.properties file:

  • broker.id: This is a unique integer ID for each of the brokers in a Kafka cluster.
  • port: This is the port number for a Kafka broker. Its default value is 9092. If you want to run multiple brokers on a single machine, give a unique port to each broker.
  • host.name: The hostname to which the broker should bind and advertise itself.
  • log.dirs: The name of this property is a bit unfortunate as it represents not the log directory for Kafka, but the directory where Kafka stores the actual data sent to it. This can take a single directory or a comma-separated list of directories to store data. Kafka throughput can be increased by attaching multiple physical disks to the broker node and specifying multiple data directories, each lying on a different disk. It is not much use specifying multiple directories on the same physical disk, as all the I/O will still be happening on the same disk.
  • num.partitions: This represents the default number of partitions for newly created topics. This property can be overridden when creating new topics. A greater number of partitions results in greater parallelism at the cost of a larger number of files.
  • log.retention.hours: Kafka does not delete messages immediately after consumers consume them. It retains them for the number of hours defined by this property so that in the event of any issues the consumers can replay the messages from Kafka. The default value is 168 hours, which is 1 week.
  • zookeeper.connect: This is the comma-separated list of ZooKeeper nodes in hostname:port form.
  1.    Start the Kafka server by running the following command:
> ./bin/kafka-server-start.sh config/server.properties [2017-04-23 17:44:36,667] INFO New leader is 0

(kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2017-04-23 17:44:36,668] INFO Kafka version : 0.9.0.1

(org.apache.kafka.common.utils.AppInfoParser)

[2017-04-23 17:44:36,668] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser)

[2017-04-23 17:44:36,670] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

If you get something similar to the preceding three lines on your console, then your Kafka broker is up-and-running and we can proceed to test it.

  1. Now we will verify that the Kafka broker is set up correctly by sending and receiving some test messages. First, let’s create a verification topic for testing by executing the following command:
> bin/kafka-topics.sh --zookeeper zoo1:2181 --replication-factor 1

--partition 1 --topic verification-topic --create Created topic "verification-topic".
  1.    Now let’s verify if the topic creation was successful by listing all the topics:
> bin/kafka-topics.sh --zookeeper zoo1:2181 --list verification-topic
  1.    The topic is created; let’s produce some sample messages for the Kafka cluster. Kafka comes with a command-line producer that we can use to produce messages:
> bin/kafka-console-producer.sh --broker-list localhost:9092 -- topic verification-topic
  1.    Write the following messages on your console:
Message 1

Test Message 2

Message 3
  1. Let’s consume these messages by starting a new console consumer on a new console window:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic verification-topic --from-beginning

Message 1

Test Message 2

Message 3

Now, if we enter any message on the producer console, it will automatically be consumed by this consumer and displayed on the command line.

Collecting Tweets

We are assuming you already have a twitter account, and that the consumer key and access token are generated for your application. You can refer to: https://bdthemes.com/support/knowledge-base/generate-api-key-consumer-token-acc ess-key-twitter-oauth/ to generate a consumer key and access token. Take the following steps:

  1. Create a new maven project with groupId, com.storm advance and artifactId, kafka_producer_twitter.
  1. Add the following dependencies to the pom.xml file. We are adding the Kafka and Twitter streaming Maven dependencies to pom.xml to support the Kafka Producer and the streaming tweets from Twitter:
<dependencies>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.9.0.1</version>

<exclusions>

<exclusion>

<groupId>com.sun.jdmk</groupId>

<artifactId>jmxtools</artifactId>

</exclusion>

<exclusion>

<groupId>com.sun.jmx</groupId>

<artifactId>jmxri</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.apache.logging.log4j</groupId>

<artifactId>log4j-slf4j-impl</artifactId>

<version>2.0-beta9</version>

</dependency>

<dependency>

<groupId>org.apache.logging.log4j</groupId>

<artifactId>log4j-1.2-api</artifactId>

<version>2.0-beta9</version>

</dependency>

<!--

https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream -->

<dependency>

<groupId>org.twitter4j</groupId>

<artifactId>twitter4j-stream</artifactId>

<version>4.0.6</version>

</dependency>

</dependencies>

3. Now, we need to create a class, TwitterData, that contains the code to consume/stream data from Twitter and publish it to the Kafka cluster. We are assuming you already have a running Kafka cluster and topic, twitterData, created in the Kafka cluster. for information on the installation of the Kafka cluster and the creation of a Kafka please refer to . The class contains an instance of the
twitter4j.conf.ConfigurationBuilder class; we need to set the access token and consumer keys in configuration, as mentioned in the source code.4. The twitter4j.StatusListener class returns the continuous stream of tweets inside the onStatus() method. We are using the Kafka Producer code inside the onStatus() method to publish the tweets in Kafka. The following is the source code for the TwitterData class:

public class TwitterData {

/** The actual Twitter stream. It's set up to collect raw JSON data */

private TwitterStream twitterStream;

static String consumerKeyStr = "r1wFskT3q";

static String consumerSecretStr = "fBbmp71HKbqalpizIwwwkBpKC"; static String accessTokenStr =

"298FPfE16frABXMcRIn7aUSSnNneMEPrUuZ";

static String accessTokenSecretStr = "1LMNZZIfrAimpD004QilV1pH3PYTvM";

public void start() {

ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setOAuthConsumerKey(consumerKeyStr); cb.setOAuthConsumerSecret(consumerSecretStr); cb.setOAuthAccessToken(accessTokenStr); cb.setOAuthAccessTokenSecret(accessTokenSecretStr); cb.setJSONStoreEnabled(true); cb.setIncludeEntitiesEnabled(true);

// instance of TwitterStreamFactory twitterStream = new

TwitterStreamFactory(cb.build()).getInstance();

final Producer<String, String> producer = new KafkaProducer<String, 
String>(getProducerConfig());// topicDetails

CreateTopic("127.0.0.1:2181").createTopic("twitterData", 2, 1);

/** Twitter listener **/

StatusListener listener = new StatusListener() { public
void onStatus(Status status) {

ProducerRecord<String, String> data = new ProducerRecord<String, String>("twitterData", DataObjectFactory.getRawJSON(status));

// send the data to kafka producer.send(data);

}

public void onException(Exception arg0) { System.out.println(arg0);

}

arg0) {

 };

public void onDeletionNotice(StatusDeletionNotice

}

public void onScrubGeo(long arg0, long arg1) {

}

public void onStallWarning(StallWarning arg0) {

}

public void onTrackLimitationNotice(int arg0) {

}

/** Bind the listener **/ twitterStream.addListener(listener);

/** GOGOGO **/ twitterStream.sample();

}

private Properties getProducerConfig() { Properties props = new Properties();

// List of kafka borkers. Complete list of brokers is not required as

// the producer will auto discover the rest of the brokers.

props.put("bootstrap.servers", "localhost:9092"); props.put("batch.size", 1);

// new

sending

// Serializer used for sending data to kafka. Since we are

// string,

// we are using StringSerializer. props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer"); props.put("producer.type", "sync");

return props;

}

public static void main(String[] args) throws InterruptedException {

new TwitterData().start();

}

Use valid Kafka properties before executing the TwitterDataAfter executing the preceding class, the user will have a real-time stream of Twitter tweets in Kafka. In the next section, we are going to cover how we can use Storm to calculate the sentiments of the collected tweets.

To summarize we covered how to install a single node Apache Kafka cluster and how to collect tweet from Twitter to store in a Kafka cluster

If you enjoyed this post, check out the book Mastering Apache Storm to know more about different types of real time processing techniques used to create distributed applications.

Mastering Apache Storm

Being a Senior Content Marketing Editor at Packt Publishing, I handle vast array of content in the tech space ranging from Data science, Web development, Programming, Cloud & Networking, IoT, Security and Game development. With prior experience and understanding of Marketing I aspire to grow leaps and bounds in the Content & Digital Marketing field. On the personal front I am an ambivert and love to read inspiring articles and books on life and in general.

LEAVE A REPLY

Please enter your comment!
Please enter your name here