There are many differences between the idealized usage of ML algorithms and real-world usage. This post gives advice related to using ML in the real world, in real applications, and in production environments. Specifically, we will talk about how to build a real-time data pipeline.
The article aims to answer the following questions
- How do you collect, store, and process gigabytes or terabytes of training data?
- How and where do you store and distribute serialized models to clients?
- How do you collect new training examples from millions of users?
What are Data pipelines?
When developing a production ML system, it’s not likely that you will have the training data handed to you in a ready-to-process format. Production ML systems are typically part of larger application systems, and the data that you use will probably originate from several different sources. The training set for an ML algorithm may be a subset of your larger database, combined with images hosted on a Content Delivery Network (CDN) and event data from an Elasticsearch server.
The process of ushering data through various stages of a life cycle is called data pipelining. Data pipelining may include data selectors that run SQL or Elasticsearch queries for objects, event subscriptions which allow data to flow in from event-or log-based data, aggregations, joins, combining data with data from third-party APIs, sanitization, normalization, and storage.
In an ideal implementation, the data pipeline acts as an abstraction layer between the larger application environment and the ML process. The ML algorithm should be able to read the output of the data pipeline without any knowledge of the original source of the data, similar to our examples.
As there are many possible data sources and infinite ways to architect an application, there is no one-size-fits-all data pipeline. However, most data pipelines will contain these components, which we will discuss in the following sections:
- Data querying and event subscription
- Data joining or aggregation
- Transformation and normalization
- Storage and delivery
This article is a two-part post. In the first part, we will talk about Data Querying and event subscription and Data joining.
Imagine an application such as Disqus, which is an embeddable comment form that website owners can use to add comment functionality to blog posts or other pages. The primary functionality of Disqus is to allow users to like or leave comments on posts, however, as an additional feature and revenue stream, Disqus can make content recommendations and display them alongside sponsored content. The content recommendation system is an example of an ML system that is only one feature of a larger application.
A content recommendation system in an application such as Disqus does not necessarily need to interact with the comment data, but might use the user’s likes history to generate recommendations similar to the current page. Such a system would also need to analyze the text content of the liked pages and compare that to the text content of all pages in the network in order to make recommendations. Disqus does not need the post’s content in order to provide comment functionality, but does need to store metadata about the page (like its URL and title) in its database. The post content may therefore not reside in the application’s main database, though the likes and page metadata would likely be stored there.
A data pipeline built around Disqus’s recommendation system needs first to query the main database for pages the user has liked—or pages that were liked by users who liked the current page—and return their metadata. In order to find similar content, however, the system will need to use the text content of each liked post. This data might be stored in a separate system, perhaps a secondary database such as MongoDB or Elasticsearch, or in Amazon S3 or some other data warehouse. The pipeline will need to retrieve the text content based on the metadata returned by the main database, and associate the content with the metadata.
This is an example of multiple data selectors or data sources in the early stages of a data pipeline. One data source is the primary application data, which stores post and likes metadata. The other data source is a secondary server which stores the post’s text content.
The next step in this pipeline might involve finding a number of candidate posts similar to the ones the user has liked, perhaps through a request to Elasticsearch or some other service that can find similar content. Similar content is not necessarily the correct content to serve, however, so these candidate articles will ultimately be ranked by an (hypothetical) ANN in order to determine the best content to display. In this example, the input to the data pipeline is the current page and the output from the data pipeline is a list of, say, 200 similar pages that the ANN will then rank.
If all the necessary data resides in the primary database, the entire pipeline can be achieved with an SQL statement and some JOINs. Even in this case, care should be taken to develop a degree of abstraction between the ML algorithm and the data pipeline, as you may decide to update the application’s architecture in the future. In other cases, however, the data will reside in separate locations and a more considered pipeline should be developed.
The earliest stage of a pipeline is typically some sort of data query operation. Training examples must be extracted from a larger database, keeping in mind that not every record in a database is necessarily a training example. In the case of a spam filter, for instance, you should only select messages that have been marked as spam or not spam by a user. Messages that were automatically marked as spam by a spam filter should probably not be used for training, as that might cause a positive feedback loop that ultimately causes an unacceptable false positive rate.
Similarly, you may want to prevent users that have been blocked or banned by your system from influencing your model training. A bad actor could intentionally mislead an ML model by taking inappropriate actions on their own data, so you should disqualify these data points as training examples.
Alternatively, if your application is such that recent data points should take precedence over older training points, your data query operation might set a time-based limit on the data to use for training, or select a fixed limit ordered reverse chronologically. No matter the situation, make sure you carefully consider your data queries as they are an essential first step in your data pipeline.
Not all data needs to come from database queries, however. Many applications use a pub/sub or event subscription architecture to capture streaming data. This data could be activity logs aggregated from a number of servers, or live transaction data from a number of sources. In these cases, an event subscriber will be an early part of your data pipeline. Note that event subscription and data querying are not mutually exclusive operations. Events that come in through a pub/sub system can still be filtered based on various criteria; this is still a form of data querying.
One potential issue with an event subscription model arises when it’s combined with a batch-training scheme. If you require 5,000 data points but receive only 100 per second, your pipeline will need to maintain a buffer of data points until the target size is reached. There are various message-queuing systems that can assist with this, such as RabbitMQ or Redis. A pipeline requiring this type of functionality might hold messages in a queue until the target of 5,000 messages is achieved, and only then release the messages for batch processing through the rest of the pipeline.
In the case that data is collected from multiple sources, it most likely will need to be joined or aggregated in some manner. Let’s now take a look at a situation where data needs to be joined to data from an external API.
Data joining and aggregation
Let’s return to our example of the Disqus content recommendation system. Imagine that the data pipeline is able to query likes and post metadata directly from the primary database, but that no system in the applications stores the post’s text content. Instead, a microservice was developed in the form of an API that accepts a post ID or URL and returns the page’s sanitized text content.
In this case, the data pipeline will need to interact with the microservice API in order to get the text content for each post. This approach is perfectly valid, though if the frequency of post content requests is high, some caching or storage should probably be implemented.
The data pipeline will need to employ an approach similar to the buffering of messages in the event subscription model. The pipeline can use a message queue to queue posts that still require content, and make requests to the content microservice for each post in the queue until the queue is depleted. As each post’s content is retrieved it is added to the post metadata and stored in a separate queue for completed requests. Only when the source queue is depleted and the sink queue is full should the pipeline move on to the next step.
Data joining does not necessarily need to involve a microservice API. If the pipeline collects data from two separate sources that need to be combined, a similar approach can be employed. The pipeline is the only component that needs to understand the relationship between the two data sources and formats, leaving both the data sources and the ML algorithm to operate independently of those details.
The queue approach also works well when a data aggregation is required. An example of this situation is a pipeline in which the input is streaming input data and the output is token counts or value aggregations. Using a message queue is desirable in these situations as most message queues ensure that a message can be consumed only once, therefore preventing any duplication by the aggregator. This is especially valuable when the event stream is very high frequency, such that tokenizing each event as it comes in would lead to backups or server overload.
Because message queues ensure that each message is consumed only once, high-frequency event data can stream directly into a queue where messages are consumed by multiple workers in parallel. Each worker might be responsible for tokenizing the event data and then pushing the token stream to a different message queue. The message queue software ensures that no two workers process the same event, and each worker can operate as an independent unit that is only concerned with tokenization.
As the tokenizers push their results onto a new message queue, another worker can consume those messages and aggregate token counts, delivering its own results to the next step in the pipeline every second or minute or 1,000 events, whatever is appropriate for the application. The output of this style of pipeline might be fed into a continually updating Bayesian model, for example.
One benefit of a data pipeline designed in this manner is performance. If you were to attempt to subscribe to high-frequency event data, tokenize each message, aggregate token counts, and update a model all in one system, you might be forced to use a very powerful (and expensive) single server. The server would simultaneously need a high-performance CPU, lots of RAM, and a high-throughput network connection.
By breaking up the pipeline into stages, however, you can optimize each stage of the pipeline for its specific task and load condition. The message queue that receives the source event stream needs only to receive the event stream but does not need to process it. The tokenizer workers do not necessarily need to be high-performance servers, as they can be run in parallel. The aggregating queue and worker will process a large volume of data but will not need to retain data for longer than a few seconds and therefore may not need much RAM. The final model, which is a compressed version of the source data, can be stored on a more modest machine. Many components of the data pipeline can be built of commodity hardware simply because a data pipeline encourages modular design.
In many cases, you will need to transform your data from format to format throughout the pipeline. That could mean converting from native data structures to JSON, transposing or interpolating values, or hashing values.
We talked about two data pipelines components. In the next post, we will discuss several types of data transformations that may occur in the data pipeline. We will also discuss a few considerations to make when transporting and storing training data or serialized models.