[box type=”note” align=”” class=”” width=””]This article is an excerpt from a book authored by Ankit Jain titled Mastering Apache Storm. This book explores various real-time processing functionalities offered by Apache Storm such as parallelism, data partitioning, and more.[/box]
Today, we are going to cover how to stream tweets from Twitter using the twitter streaming API. We are also going to explore how we can store fetched tweets in Kafka for later processing through Storm.
Following are the steps to set up a single node Kafka cluster:
tar -xvzf kafka_2.10-0.9.0.1.tgz cd kafka_2.10-0.9.0.1
$KAFKA_HOME/config/server.properties file:
log.dirs=/var/kafka- logszookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
Here, zoo1, zoo2, and zoo3 represent the hostnames of the ZooKeeper nodes. The following are the definitions of the important properties in the server.properties file:
broker.id
: This is a unique integer ID for each of the brokers in a Kafka cluster.port
: This is the port number for a Kafka broker. Its default value is 9092. If you want to run multiple brokers on a single machine, give a unique port to each broker.host.name
: The hostname to which the broker should bind and advertise itself.log.dirs
: The name of this property is a bit unfortunate as it represents not the log directory for Kafka, but the directory where Kafka stores the actual data sent to it. This can take a single directory or a comma-separated list of directories to store data. Kafka throughput can be increased by attaching multiple physical disks to the broker node and specifying multiple data directories, each lying on a different disk. It is not much use specifying multiple directories on the same physical disk, as all the I/O will still be happening on the same disk.num.partitions
: This represents the default number of partitions for newly created topics. This property can be overridden when creating new topics. A greater number of partitions results in greater parallelism at the cost of a larger number of files.log.retention.hours
: Kafka does not delete messages immediately after consumers consume them. It retains them for the number of hours defined by this property so that in the event of any issues the consumers can replay the messages from Kafka. The default value is 168 hours, which is 1 week.zookeeper.connect
: This is the comma-separated list of ZooKeeper nodes in hostname:port form.> ./bin/kafka-server-start.sh config/server.properties [2017-04-23 17:44:36,667] INFO New leader is 0
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2017-04-23 17:44:36,668] INFO Kafka version : 0.9.0.1
(org.apache.kafka.common.utils.AppInfoParser)
[2017-04-23 17:44:36,668] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser)
[2017-04-23 17:44:36,670] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
If you get something similar to the preceding three lines on your console, then your Kafka broker is up-and-running and we can proceed to test it.
> bin/kafka-topics.sh --zookeeper zoo1:2181 --replication-factor 1
--partition 1 --topic verification-topic --create Created topic "verification-topic".
> bin/kafka-topics.sh --zookeeper zoo1:2181 --list verification-topic
> bin/kafka-console-producer.sh --broker-list localhost:9092 -- topic verification-topic
Message 1
Test Message 2
Message 3
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic verification-topic --from-beginning
Message 1
Test Message 2
Message 3
Now, if we enter any message on the producer console, it will automatically be consumed by this consumer and displayed on the command line.
We are assuming you already have a twitter account, and that the consumer key and access token are generated for your application. You can refer to: https://bdthemes.com/support/knowledge-base/generate-api-key-consumer-token-acc ess-key-twitter-oauth/ to generate a consumer key and access token. Take the following steps:
groupId, com.storm
advance and artifactId, kafka_producer_twitter.
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.0-beta9</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.0-beta9</version>
</dependency>
<!--
https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream -->
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.6</version>
</dependency>
</dependencies>
3. Now, we need to create a class, TwitterData
, that contains the code to consume/stream data from Twitter and publish it to the Kafka cluster. We are assuming you already have a running Kafka cluster and topic, twitterData
, created in the Kafka cluster. for information on the installation of the Kafka cluster and the creation of a Kafka please refer to . The class contains an instance of the
twitter4j.conf.ConfigurationBuilder
class; we need to set the access token and consumer keys in configuration, as mentioned in the source code.4. The twitter4j.StatusListener
class returns the continuous stream of tweets inside the onStatus()
method. We are using the Kafka Producer code inside the onStatus()
method to publish the tweets in Kafka. The following is the source code for the TwitterData
class:
public class TwitterData {
/** The actual Twitter stream. It's set up to collect raw JSON data */
private TwitterStream twitterStream;
static String consumerKeyStr = "r1wFskT3q";
static String consumerSecretStr = "fBbmp71HKbqalpizIwwwkBpKC"; static String accessTokenStr =
"298FPfE16frABXMcRIn7aUSSnNneMEPrUuZ";
static String accessTokenSecretStr = "1LMNZZIfrAimpD004QilV1pH3PYTvM";
public void start() {
ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setOAuthConsumerKey(consumerKeyStr); cb.setOAuthConsumerSecret(consumerSecretStr); cb.setOAuthAccessToken(accessTokenStr); cb.setOAuthAccessTokenSecret(accessTokenSecretStr); cb.setJSONStoreEnabled(true); cb.setIncludeEntitiesEnabled(true);
// instance of TwitterStreamFactory twitterStream = new
TwitterStreamFactory(cb.build()).getInstance();
final Producer<String, String> producer = new KafkaProducer<String,
String>(getProducerConfig());// topicDetails
CreateTopic("127.0.0.1:2181").createTopic("twitterData", 2, 1);
/** Twitter listener **/
StatusListener listener = new StatusListener() { public
void onStatus(Status status) {
ProducerRecord<String, String> data = new ProducerRecord<String, String>("twitterData", DataObjectFactory.getRawJSON(status));
// send the data to kafka producer.send(data);
}
public void onException(Exception arg0) { System.out.println(arg0);
}
arg0) {
};
public void onDeletionNotice(StatusDeletionNotice
}
public void onScrubGeo(long arg0, long arg1) {
}
public void onStallWarning(StallWarning arg0) {
}
public void onTrackLimitationNotice(int arg0) {
}
/** Bind the listener **/ twitterStream.addListener(listener);
/** GOGOGO **/ twitterStream.sample();
}
private Properties getProducerConfig() { Properties props = new Properties();
// List of kafka borkers. Complete list of brokers is not required as
// the producer will auto discover the rest of the brokers.
props.put("bootstrap.servers", "localhost:9092"); props.put("batch.size", 1);
// new
sending
// Serializer used for sending data to kafka. Since we are
// string,
// we are using StringSerializer. props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); props.put("producer.type", "sync");
return props;
}
public static void main(String[] args) throws InterruptedException {
new TwitterData().start();
}
Use valid Kafka properties before executing the TwitterData
. After executing the preceding class, the user will have a real-time stream of Twitter tweets in Kafka. In the next section, we are going to cover how we can use Storm to calculate the sentiments of the collected tweets.
To summarize we covered how to install a single node Apache Kafka cluster and how to collect tweet from Twitter to store in a Kafka cluster
If you enjoyed this post, check out the book Mastering Apache Storm to know more about different types of real time processing techniques used to create distributed applications.
I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…
Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…
Once we learn how to deploy an Ubuntu server, how to manage users, and how…
Key-takeaways: Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…
While developing a web application, or setting dynamic pages and meta tags we need to deal with…
Software architecture is one of the most discussed topics in the software industry today, and…