16 min read

In this article by Andrew Morgan, Antoine Amend, Matthew Hallett, David George, the author of the book Mastering Spark for Data Science, readers will learn how to construct a content registerand use it to track all input loaded to the system, and to deliver metrics on ingestion pipelines, so that these flows can be reliably run as an automated, lights-out process.

Readers will learn how to construct a content registerand use it to track all input loaded to the system, and to deliver metrics on ingestion pipelines, so that these flows can be reliably run as an automated, lights-out process.

In this article we will cover the following topics:

  • Welcome the GDELT Dataset
  • Data Pipelines
  • Universal Ingestion Framework
  • Real-time monitoring for new data
  • Receiving Streaming Data via Kafka
  • Registering new content and vaulting for tracking purposes
  • Visualization of content metrics in Kibana – to monitor ingestion processes & data health

 

(For more resources related to this topic, see here.)

Data Pipelines

Even with the most basic of analytics, we always require some data. In fact, finding the right data is probably among the hardest problems to solve in data science (but that’s a whole topic for another book!). We have already seen that the way in which we obtain our data can be as simple or complicated as is needed. In practice, we can break this decision into two distinct areas: Ad-hoc and scheduled.

  • Ad-hoc data acquisition is the most common method during prototyping and small scale analytics as it usually doesn’t require any additional software to implement – the user requires some data and simply downloads it from source as and when required. This method is often a matter of clicking on a web link and storing the data somewhere convenient, although the data may still need to be versioned and secure.
  • Scheduled data acquisition is used in more controlled environments for large scale and production analytics, there is also an excellent case for ingesting a dataset into a data lake for possible future use. With Internet of Things (IoT) on the increase, huge volumes of data are being produced, in many cases if the data is not ingested now it is lost forever. Much of this data may not have an immediate or apparent use today, but could do in the future; so the mind-set is to gather all of the data in case it is needed and delete it later when sure it is not.

It’s clear we need a flexible approach to data acquisition that supports a variety of procurement options.

Universal Ingestion Framework

There are many ways to approach data acquisition ranging from home grown bash scripts through to high-end commercial tools. The aim of this section is to introduce a highly flexible framework that we can use for small scale data ingest, and then grow as our requirements change – all the way through to a full corporately managed workflow if needed – that framework will be build using Apache NiFi. NiFi enables us to build large-scale integrated data pipelines that move data around the planet. In addition, it’s also incredibly flexible and easy to build simple pipelines – usually quicker even than using Bash or any other traditional scripting method.

If an ad-hoc approach is taken to source the same dataset on a number of occasions, then some serious thought should be given as to whether it falls into the scheduled category, or at least whether a more robust storage and versioning setup should be introduced.

We have chosen to use Apache NiFi as it offers a solution that provides the ability to create many, varied complexity pipelines that can be scaled to truly Big Data and IoT levels, and it also provides a great drag & drop interface (using what’s known as flow-based programming[1]). With patterns, templates and modules for workflow production, it automatically takes care of many of the complex features that traditionally plague developers such as multi-threading, connection management and scalable processing. For our purposes it will enable us to quickly build simple pipelines for prototyping, and scale these to full production where required.

It’s pretty well documented and easy to get running https://nifi.apache.org/download.html, it runs in a browser and looks like this:

https://en.wikipedia.org/wiki/Flow-based_programming

We leave the installation of NiFi as an exercise for the reader – which we would encourage you to do – as we will be using it in the following section.

Introducing the GDELT News Stream

Hopefully, we have NiFi up and running now and can start to ingest some data. So let’s start with some global news media data from GDELT. Here’s our brief, taken from the GDELT website http://blog.gdeltproject.org/gdelt-2-0-our-global-world-in-realtime/:

“Within 15 minutes of GDELT monitoring a news report breaking anywhere the world, it has translated it, processed it to identify all events, counts, quotes, people, organizations, locations, themes, emotions, relevant imagery, video, and embedded social media posts, placed it into global context, and made all of this available via a live open metadata firehose enabling open research on the planet itself.

[As] the single largest deployment in the world of sentiment analysis, we hope that by bringing together so many emotional and thematic dimensions crossing so many languages and disciplines, and applying all of it in realtime to breaking news from across the planet, that this will spur an entirely new era in how we think about emotion and the ways in which it can help us better understand how we contextualize, interpret, respond to, and understand global events.”

In order to start consuming this open data, we’ll need to hook into that metadata firehose and ingest the news streams onto our platform.  How do we do this?  Let’s start by finding out what data is available.

Discover GDELT Real-time

GDELT publish a list of the latest files on their website – this list is updated every 15 minutes. In NiFi, we can setup a dataflow that will poll the GDELT website, source a file from this list and save it to HDFS so we can use it later.

Inside the NiFi dataflow designer, create a HTTP connector by dragging a processor onto the canvas and selecting GetHTTP.

To configure this processor, you’ll need to enter the URL of the file list as:

http://data.gdeltproject.org/gdeltv2/lastupdate.txt

And also provide a temporary filename for the file list you will download. In the example below, we’ve used the NiFi’s expression language to generate a universally unique key so that files are not overwritten (UUID()).

It’s worth noting that with this type of processor (GetHTTP), NiFi supports a number of scheduling and timing options for the polling and retrieval. For now, we’re just going to use the default options and let NiFi manage the polling intervals for us.

An example of latest file list from GDELT is shown below.

Next, we will parse the URL of the GKG news stream so that we can fetch it in a moment. Create a Regular Expression parser by dragging a processor onto the canvas and selecting ExtractText. Now position the new processor underneath the existing one and drag a line from the top processor to the bottom one. Finish by selecting the success relationship in the connection dialog that pops up.

This is shown in the example below.

Next, let’s configure the ExtractText processor to use a regular expression that matches only the relevant text of the file list, for example:

([^ ]*gkg.csv.*)

From this regular expression, NiFi will create a new property (in this case, called url) associated with the flow design, which will take on a new value as each particular instance goes through the flow. It can even be configured to support multiple threads.

Again, this is example is shown below.

It’s worth noting here that while this is a fairly specific example, the technique is deliberately general purpose and can be used in many situations.

Our First GDELT Feed

Now that we have the URL of the GKG feed, we fetch it by configuring an InvokeHTTP processor to use the url property we previously created as it’s remote endpoint, and dragging the line as before.

All that remains is to decompress the zipped content with a UnpackContent processor (using the basic zip format) and save to HDFS using a PutHDFS processor, like so:

Improving with Publish and Subscribe

So far, this flow looks very “point-to-point”, meaning that if we were to introduce a new consumer of data, for example, a Spark-streaming job, the flow must be changed. For example, the flow design might have to change to look like this:

If we add yet another, the flow must change again. In fact, each time we add a new consumer, the flow gets a little more complicated, particularly when all the error handling is added. This is clearly not always desirable, as introducing or removing consumers (or producers) of data, might be something we want to do often, even frequently. Plus, it’s also a good idea to try to keep your flows as simple and reusable as possible.

Therefore, for a more flexible pattern, instead of writing directly to HDFS, we can publish to Apache Kafka. This gives us the ability to add and remove consumers at any time without changing the data ingestion pipeline. We can also still write to HDFS from Kafka if needed, possibly even by designing a separate NiFi flow, or connect directly to Kafka using Spark-streaming.

To do this, we create a Kafka writer by dragging a processor onto the canvas and selecting PutKafka.

We now have a simple flow that continuously polls for an available file list, routinely retrieving the latest copy of a new stream over the web as it becomes available, decompressing the content and streaming it record-by-record into Kafka, a durable, fault-tolerant, distributed message queue, for processing by spark-streaming or storage in HDFS. And what’s more, without writing a single line of bash!

Content Registry

We have seen in this article that data ingestion is an area that is often overlooked, and that its importance cannot be underestimated. At this point we have a pipeline that enables us to ingest data from a source, schedule that ingest and direct the data to our repository of choice. But the story does not end there. Now we have the data, we need to fulfil our data management responsibilities. Enter the content registry.

We’re going to build an index of metadata related to that data we have ingested. The data itself will still be directed to storage (HDFS, in our example) but, in addition, we will store metadata about the data, so that we can track what we’ve received and understand basic information about it, such as, when we received it, where it came from, how big it is, what type it is, etc.

Choices and More Choices

The choice of which technology we use to store this metadata is, as we have seen, one based upon knowledge and experience. For metadata indexing, we will require at least the following attributes:

  • Easily searchable
  • Scalable
  • Parallel write ability
  • Redundancy

There are many ways to meet these requirements, for example we could write the metadata to Parquet, store in HDFS and search using Spark SQL. However, here we will use Elasticsearch as it meets the requirements a little better, most notably because it facilitates low latency queries of our metadata over a REST API – very useful for creating dashboards. In fact, Elasticsearch has the advantage of integrating directly with Kibana, meaning it can quickly produce rich visualizations of our content registry. For this reason, we will proceed with Elasticsearch in mind.

Going with the Flow

Using our current NiFi pipeline flow, let’s fork the output from “Fetch GKG files from URL” to add an additional set of steps to allow us to capture and store this metadata in Elasticsearch. These are:

  1. Replace the flow content with our metadata model
  2. Capture the metadata
  3. Store directly in Elasticsearch

Here’s what this looks like in NiFi:

Metadata Model

So, the first step here is to define our metadata model. And there are many areas we could consider, but let’s select a set that helps tackle a few key points from earlier discussions. This will provide a good basis upon which further data can be added in the future, if required. So, let’s keep it simple and use the following three attributes:

  • File size
  • Date ingested
  • File name

These will provide basic registration of received files.

Next, inside the NiFi flow, we’ll need to replace the actual data content with this new metadata model. An easy way to do this, is to create a JSON template file from our model. We’ll save it to local disk and use it inside a FetchFile processor to replace the flow’s content with this skeleton object. This template will look something like:

{
  "FileSize": SIZE,
  "FileName": "FILENAME",
  "IngestedDate": "DATE"
}

Note the use of placeholder names (SIZE, FILENAME, DATE) in place of the attribute values. These will be substituted, one-by-one, by a sequence of ReplaceText processors, that swap the placeholder names for an appropriate flow attribute using regular expressions provided by the NiFi Expression Language, for example DATE becomes ${now()}.

The last step is to output the new metadata payload to Elasticsearch. Once again, NiFi comes ready with a processor for this; the PutElasticsearch processor.

An example metadata entry in Elasticsearch:

{
         "_index": "gkg",
         "_type": "files",
         "_id": "AVZHCvGIV6x-JwdgvCzW",
         "_score": 1,
         "source": {
            "FileSize": 11279827,
            "FileName": "20150218233000.gkg.csv.zip",
            "IngestedDate": "2016-08-01T17:43:00+01:00"
         }
      }

Now that we have added the ability to collect and interrogate metadata, we now have access to more statistics that can be used for analysis. This includes:

  • Time based analysis e.g. file sizes over time
  • Loss of data, for example are there data “holes” in the timeline?

If there is a particular analytic that is required, the NIFI metadata component can be adjusted to provide the relevant data points. Indeed, an analytic could be built to look at historical data and update the index accordingly if the metadata does not exist in current data.

Kibana Dashboard

We have mentioned Kibana a number of times in this article, now that we have an index of metadata in Elasticsearch, we can use the tool to visualize some analytics. The purpose of this brief section is to demonstrate that we can immediately start to model and visualize our data. In this simple example we have completed the following steps:

Added the Elasticsearch index for our GDELT metadata to the “Settings” tab

  1. Selected “file size” under the “Discover” tab
  2. Selected Visualize for “file size”
  3. Changed the Aggregation field to “Range”
  4. Entered values for the ranges

The resultant graph displays the file size distribution:

From here we are free to create new visualizations or even a fully featured dashboard that can be used to monitor the status of our file ingest. By increasing the variety of metadata written to Elasticsearch from NiFi, we can make more fields available in Kibana and even start our data science journey right here with some ingest based actionable insights.

Now that we have a fully-functioning data pipeline delivering us real-time feeds of data, how do we ensure data quality of the payload we are receiving?  Let’s take a look at the options.

Quality Assurance

With an initial data ingestion capability implemented, and data streaming onto your platform, you will need to decide how much quality assurance is required at the front door. It’s perfectly viable to start with no initial quality controls and build them up over time (retrospectively scanning historical data as time and resources allow). However, it may be prudent to install a basic level of verification to begin with. For example, basic checks such as file integrity, parity checking, completeness, checksums, type checking, field counting, overdue files, security field pre-population, denormalization, etc.

You should take care that your up-front checks do not take too long. Depending on the intensity of your examinations and the size of your data, it’s not uncommon to encounter a situation where there is not enough time to perform all processing before the next dataset arrives. You will always need to monitor your cluster resources and calculate the most efficient use of time.

Here are some examples of the type of rough capacity planning calculation you can perform:

Example 1: Basic Quality Checking, No Contending Users

  • Data is ingested every 15 minutes and takes 1 minute to pull from the source
  • Quality checking (integrity, field count, field pre-population) takes 4 minutes
  • There are no other users on the compute cluster

There are 10 minutes of resources available for other tasks.

As there are no other users on the cluster, this is satisfactory – no action needs to be taken.

Example 2: Advanced Quality Checking, No Contending Users

  • Data is ingested every 15 minutes and takes 1 minute to pull from the source
  • Quality checking (integrity, field count, field pre-population, denormalization, sub dataset building) takes 13 minutes
  • There are no other users on the compute cluster

There is only 1 minute of resource available for other tasks.

We probably need to consider, either:

  • Configuring a resource scheduling policy
  • Reducing the amount of data ingested
  • Reducing the amount of processing we undertake
  • Adding additional compute resources to the cluster

Example 3: Basic Quality Checking, 50% Utility Due to Contending Users

  • Data is ingested every 15 minutes and takes 1 minute to pull from the source
  • Quality checking (integrity, field count, field pre-population) takes 4 minutes (100% utility)
  • There are other users on the compute cluster

There are 6 minutes of resources available for other tasks (15 – 1 – (4 * (100 / 50))). Since there are other users there is a danger that, at least some of the time, we will not be able to complete our processing and a backlog of jobs will occur.

When you run into timing issues, you have a number of options available to you in order to circumvent any backlog:

  • Negotiating sole use of the resources at certain times
  • Configuring a resource scheduling policy, including:
    • YARN Fair Scheduler: allows you to define queues with differing priorities and target your Spark jobs by setting the spark.yarn.queue property on start-up so your job always takes precedence
    • Dynamicandr Resource Allocation: allows concurrently running jobs to automatically scale to match their utilization
    • Spark Scheduler Pool: allows you to define queues when sharing a SparkContext using multithreading model, and target your Spark job by setting the spark.scheduler.pool property per execution thread so your thread takes precedence
  • Running processing jobs overnight when the cluster is quiet

In any case, you will eventually get a good idea of how the various parts to your jobs perform and will then be in a position to calculate what changes could be made to improve efficiency. There’s always the option of throwing more resources at the problem, especially when using a cloud provider, but we would certainly encourage the intelligent use of existing resources – this is far more scalable, cheaper and builds data expertise.

Summary

In this article we walked through the full setup of an Apache NiFi GDELT ingest pipeline, complete with metadata forks and a brief introduction to visualizing the resultant data. This section is particularly important as GDELT is used extensively throughout the book and the NiFi method is a highly effective way to source data in a scalable and modular way.

Resources for Article:


Further resources on this subject:


1 COMMENT

  1. it’s not clear how you did the new model for data to put on local and fetch with a capability to converted into JSON format, also how you put the data into elasticsearch is unclear

LEAVE A REPLY

Please enter your comment!
Please enter your name here