12 min read

[box type=”note” align=”” class=”” width=””]This article is an excerpt from a book written by Andrew Morgan and Antoine Amend titled Mastering Spark for Data Science. In this book, you will learn about advanced Spark architectures, how to work with geographic data in Spark, and how to tune Spark algorithms so they scale linearly.[/box]

In this article, we will learn to extract and analyse large number of tweets related to the 2017 US elections on Twitter.

Following the US elections on Twitter

On November 8, 2016, American citizens went in millions to polling stations to cast their votes for the next President of the United States. Counting began almost immediately and, although not officially confirmed until sometime later, the forecasted result was well known by the next morning. Let’s start our investigation a couple of days before the major event itself, on November 6, 2016, so that we can preserve some context in the run-up. Although we do not exactly know what we will find in advance, we know that Twitter will play an oversized role in the political commentary given its influence in the build-up, and it makes sense to start collecting data as soon as possible. In fact, data scientists may sometimes experience this as a gut feeling – a strange and often exciting notion that compels us to
commence working on something without a clear plan or absolute justification, just a sense that it will pay off. And actually, this approach can be vital since, given the normal time required to formulate and realize such a plan and the transient nature of events, a major news event may occur , a new product may have been released, or the stock market may be trending differently; by this time, the original dataset may no longer be available.

Acquiring data in stream

The first action is to start acquiring Twitter data. As we plan to download more than 48 hours worth of tweets, the code should be robust enough to not fail somewhere in the middle of the process; there is nothing more frustrating than a fatal NullPointerException occurring after many hours of intense processing. We know we will be working on sentiment analysis at some point down the line, but for now we do not wish to over-complicate our code with large dependencies as this can decrease stability and lead to more unchecked exceptions. Instead, we will start by collecting and storing the data and subsequent processing will be done offline on the collected data, rather than applying this logic to the live stream. We create a new Streaming context reading from Twitter 1% firehose using the utility methods. We also use the excellent GSON library to serialize Java class Status (Java class embedding Twitter4J records) to JSON objects.

<dependency>

<groupId>com.google.code.gson</groupId>

<artifactId>gson</artifactId>

<version>2.3.1</version>

</dependency>

We read Twitter data every 5 minutes and have a choice to optionally supply Twitter filters as command line arguments. Filters can be keywords such as Trump, Clinton or #MAGA#StrongerTogether. However, we must bear in mind that by doing this we may not capture all relevant tweets as we can never be fully up to date with the latest hashtag trends (such as #DumpTrump, #DrainTheSwamp, #LockHerUp, or #LoveTrumpsHate) and many tweets will be overlooked with an inadequate filter, so we will use an empty filter list to ensure that we catch everything.

val sparkConf = new SparkConf().setAppName(“Twitter Extractor”)

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Minutes(5))

val filter = args

val twitterStream = createTwitterStream(ssc, filter)

.mapPartitions { it =>

val gson = new GsonBuilder().create()

it.map { s: Status =>

Try(gson.toJson(s)).toOption

}

}

We serialize our Status class using the GSON library and persist our JSON objects in HDFS. Note that the serialization occurs within a Try clause to ensure that unwanted exceptions are not thrown. Instead, we return JSON as an optional String:

twitterStream

.filter(_.isSuccess)

.map(_.get)

.saveAsTextFiles(“/path/to/twitter”)

Finally, we run our Spark Streaming context and keep it alive until a new president has been elected, no matter what happens!

ssc.start()

ssc.awaitTermination()

Acquiring data in batch

Only 1% of tweets are retrieved through the Spark Streaming API, meaning that 99% of records will be discarded. Although able to download around 10 million tweets, we can potentially download more data, but this time only for a selected hashtag and within a small period of time. For example, we can download all tweets related to the #LockHerUp or #BuildTheWall hashtags.

The search API

For that purpose, we consume Twitter historical data through the twitter4j Java API. This library comes as a transitive dependency of spark-streaming-twitter_2.11. To use it outside of a Spark project, the following maven dependency should be used:

<dependency>

<groupId>org.twitter4j</groupId>

<artifactId>twitter4j-core</artifactId>

<version>4.0.4</version>

</dependency>

We create a Twitter4J client as follows:

ConfigurationBuilder builder = new ConfigurationBuilder();

builder.setOAuthConsumerKey(apiKey);

builder.setOAuthConsumerSecret(apiSecret);

Configuration configuration = builder.build();

AccessToken token = new AccessToken(

accessToken,

accessTokenSecret

);

Twitter twitter =

new TwitterFactory(configuration)

.getInstance(token);

Then, we consume the /search/tweets service through the Query object:

Query q = new Query(filter);

q.setSince(fromDate);

q.setUntil(toDate);

q.setCount(400);

QueryResult r = twitter.search(q);

List<Status> tweets = r.getTweets();

Finally, we get a list of Status objects that can easily be serialized using the GSON library introduced earlier.

Rate limit

Twitter is a fantastic resource for data science, but it is far from a non-profit organization, and as such, they know how to value and price data. Without any special agreement, the search API is limited to a few days retrospective, a maximum of 180 queries per 15 minute window and 450 records per query. This limit can be confirmed on both the Twitter DEV website (https://dev.twitter.com/rest/public/rate-limits) and from the API itself using the RateLimitStatus class:

Map<String, RateLimitStatus> rls = twitter.getRateLimitStatus(“search”);

System.out.println(rls.get(“/search/tweets”));

/*

RateLimitStatusJSONImpl{remaining=179, limit=180,

resetTimeInSeconds=1482102697, secondsUntilReset=873}

*/

Unsurprisingly, any queries on popular terms, such as #MAGA on November 9, 2016, hit this threshold. To avoid a rate limit exception, we have to page and throttle our download requests by keeping track of the maximum number of tweet IDs processed and monitor our status limit after each search request.

RateLimitStatus strl = rls.get(“/search/tweets”);

int totalTweets = 0;

long maxID = -1;

for (int i = 0; i < 400; i++) {

// throttling

if (strl.getRemaining() == 0)

Thread.sleep(strl.getSecondsUntilReset() * 1000L);

Query q = new Query(filter);

q.setSince(fromDate);

q.setUntil(toDate);

q.setCount(100);

// paging

if (maxID != -1) q.setMaxId(maxID – 1);

QueryResult r = twitter.search(q);

for (Status s: r.getTweets()) {

totalTweets++;

if (maxID == -1 || s.getId() < maxID)

maxID = s.getId();

writer.println(gson.toJson(s));

}

strl = r.getRateLimitStatus();

}

With around half a billion tweets a day, it will be optimistic, if not Naive, to gather all USrelated data. Instead, the simple ingest process detailed earlier should be used to intercept tweets matching specific queries only. Packaged as main class in an assembly jar, it can be executed as follows:

java -Dtwitter.properties=twitter.properties /

-jar trump-1.0.jar #maga 2016-11-08 2016-11-09 /

/path/to/twitter-maga.json

Here, the twitter.properties file contains your Twitter API keys:

twitter.token = XXXXXXXXXXXXXX

twitter.token.secret = XXXXXXXXXXXXXX

twitter.api.key = XXXXXXXXXXXXXX

twitter.api.secret = XXXXXXXXXXXXXX

Analysing sentiment

After 4 days of intense processing, we extracted around 10 million tweets;representing approximately 30 GB worth of JSON data.

Massaging Twitter data

One of the key reasons Twitter became so popular is that any message has to fit into a maximum of 140 characters. The drawback is also that every message has to fit into a maximum of 140 characters! Hence, the result is massive increase in the use of abbreviations, acronyms, slang words, emoticons, and hashtags. In this case, the main emotion may no longer come from the text itself, but rather from the emoticons used (http://dl.acm.org/citation.cfm?id=1628969), though some studies showed that the emoticons may sometimes lead to inadequate predictions in sentiments (https://arxiv.org/pdf/1511.02556.pdf). Emojis are even broader than emoticons as they include pictures of animals, transportation, business icons, and so on. Also, while emoticons can easily be retrieved through simple regular expressions, emojis are usually encoded in Unicode and are more difficult to extract without a dedicated library.

<dependency>

<groupId>com.kcthota</groupId>

<artifactId>emoji4j</artifactId>

<version>5.0</version>

</dependency>

The Emoji4J library is easy to use (although computationally expensive) and given some text with emojis/emoticons, we can either codify – replace Unicode values with actual code names or clean – simply remove any emojis.

Sentiment Analysis in Twitter

So firstly, let’s clean our text from any junk (special characters, emojis, accents, URLs, and so on) to access plain English content:

import emoji4j.EmojiUtils

def clean = {

var text = tweet.toLowerCase()

text = text.replaceAll(“https?://S+”, “”)

text = StringUtils.stripAccents(text)

EmojiUtils.removeAllEmojis(text)

.trim

.toLowerCase()

.replaceAll(“rts+”, “”)

.replaceAll(“@[wd-_]+”, “”)

.replaceAll(“[^w#[]:’.!?,]+”, ” “)

.replaceAll(“s+([:’.!?,])1”, “$1”)

.replaceAll(“[st]+”, ” “)

.replaceAll(“[rn]+”, “. “)

.replaceAll(“(w)1{2,}”, “$1$1”) // avoid looooool

.replaceAll(“#W”, “”)

.replaceAll(“[#’:,;.]$”, “”)

.trim

}

Let’s also codify and extract all emojis and emoticons and keep them aside as a list:

val eR = “(:w+:)”.r

def emojis = {

var text = tweet.toLowerCase()

text = text.replaceAll(“https?://S+”, “”)

eR.findAllMatchIn(EmojiUtils.shortCodify(text))

.map(_.group(1))

.filter { emoji =>

EmojiUtils.isEmoji(emoji)

}.map(_.replaceAll(“W”, “”))

.toArray

}

Writing these methods inside an implicit class means that they can be applied directly a String through a simple import statement.

US elections sentiment analysis

Using the Stanford NLP

Our next step is to pass our cleaned text through a Sentiment Annotator. We use the Stanford NLP library for that purpose:

<dependency>

<groupId>edu.stanford.nlp</groupId>

<artifactId>stanford-corenlp</artifactId>

<version>3.5.0</version>

<classifier>models</classifier>

</dependency>

<dependency>

<groupId>edu.stanford.nlp</groupId>

<artifactId>stanford-corenlp</artifactId>

<version>3.5.0</version>

</dependency>

We create a Stanford annotator that tokenizes content into sentences (tokenize), splits

sentences (ssplit), tags elements (pos), and lemmatizes each word (lemma) before

analyzing the overall sentiment:

def getAnnotator: StanfordCoreNLP = {

val p = new Properties()

p.setProperty(

“annotators”,

“tokenize, ssplit, pos, lemma, parse, sentiment”

)

new StanfordCoreNLP(pipelineProps)

}

def lemmatize(text: String,

annotator: StanfordCoreNLP = getAnnotator) = {

val annotation = annotator.process(text.clean)

val sentences = annotation.get(classOf[SentencesAnnotation])

sentences.flatMap { sentence =>

sentence.get(classOf[TokensAnnotation])

.map { token =>

token.get(classOf[LemmaAnnotation])

}

.mkString(” “)

}

val text = “If you’re bashing Trump and his voters and calling them a

variety of hateful names, aren’t you doing exactly what you accuse them?”

println(lemmatize(text))

/*

if you be bash trump and he voter and call they a variety of hateful name,

be not you do exactly what you accuse they

*/

Any word is replaced by its most basic form, that is, you’re is replaced with you be and aren’t you doing replaced with be not you do.

def sentiment(coreMap: CoreMap) = {
coreMap.get(classOf[SentimentCoreAnnotations.ClassName].match {
case “Very negative” => 0
case “Negative” => 1
case “Neutral” => 2
case “Positive” => 3
case “Very positive” => 4
case _ =>
throw new IllegalArgumentException(
s”Could not get sentiment for [${coreMap.toString}]”
)
}
}
def extractSentiment(text: String,
annotator: StanfordCoreNLP = getSentimentAnnotator) =
{
val annotation = annotator.process(text)
val sentences = annotation.get(classOf[SentencesAnnotation])
val totalScore = sentences map sentiment
if (sentences.nonEmpty) {
totalScore.sum / sentences.size()
} else {
2.0f
}
}
extractSentiment(“God bless America. Thank you Donald Trump!”)
// 2.5
extractSentiment(“This is the most horrible day ever”)
// 1.0

A sentiment spans from Very Negative (0.0) to Very Positive (4.0) and is averaged per sentence. As we do not get more than 1 or 2 sentences per tweet, we expect a very small variance; most of the tweets should be Neutral (around 2.0), with only extremes to be scored (below ~1.5 or above ~2.5).

Building the Pipeline

For each of our Twitter records (stored as JSON objects), we do the following things:

  • Parse the JSON object using json4s library
  • Extract the date
  • Extract the text
  • Extract the location and map it to a US state
  • Clean the text
  • Extract emojis
  • Lemmatize text
  • Analyze sentiment

We then wrap all these values into the following Tweet case class:

case class Tweet(

date: Long,

body: String,

sentiment: Float,

state: Option[String],

geoHash: Option[String],

emojis: Array[String]

)

As mentioned in previous chapters, creating a new NLP instance wouldn’t scale for each record out of our dataset of 10 million records. Instead, we create only one annotator per Iterator (which means one per partition):

val analyzeJson = (it: Iterator[String]) => {

implicit val format = DefaultFormats

val annotator = getAnnotator

val sdf = new SimpleDateFormat(“MMM d, yyyy hh:mm:ss a”)

it.map { tweet =>

val json = parse(tweet)

val dateStr = (json “createdAt”).extract[String]

)

As mentioned in previous chapters, creating a new NLP instance wouldn’t scale for each record out of our dataset of 10 million records. Instead, we create only one annotator per Iterator (which means one per partition):

val analyzeJson = (it: Iterator[String]) => {

implicit val format = DefaultFormats

val annotator = getAnnotator

val sdf = new SimpleDateFormat(“MMM d, yyyy hh:mm:ss a”)

it.map { tweet =>

val json = parse(tweet)

val dateStr = (json “createdAt”).extract[String]

val date = Try(

sdf.parse(dateStr).getTime

)

.getOrElse(0L)

val text = (json “text”).extract[String]

val location = Try(

(json “user” “location”).extract[String]

)

.getOrElse(“”)

.toLowerCase()

val state = Try {

location.split(“s”)

.map(_.toUpperCase())

.filter { s =>

states.contains(s)

}

.head

}

.toOption

val cleaned = text.clean

Tweet(

date,

cleaned.lemmatize(annotator),

cleaned.sentiment(annotator),

state,

text.emojis

)

}

}

val tweetJsonRDD = sc.textFile(“/path/to/twitter”)

val tweetRDD = twitterJsonRDD mapPartitions analyzeJson

tweetRDD.toDF().show(5)

/*

+————-+—————+———+——–+———-+

| date| body|sentiment| state| emojis|

+————-+—————+———+——–+———-+

|1478557859000|happy halloween| 2.0| None [ghost] |

|1478557860000|slave to the gr| 2.5| None|[] |

|1478557862000|why be he so pe| 3.0|Some(MD)|[] |

|1478557862000|marcador sentim| 2.0| None|[] |

|1478557868000|you mindset tow| 2.0| None|[sparkles]|

+————-+—————+———+——–+———-+

*/

We learnt about gathering sentimental data from twitter, you can further get to know about Data Acquisition and Scrapping link-based external data from Mastering Spark for Data Science.

Mastering Spark for Data Science

LEAVE A REPLY

Please enter your comment!
Please enter your name here