48 min read

In this article created by Rodney Ringler, the author of C# Multithreaded and Parallel Programming, we will explore two popular design patterns to solve concurrent problems—Pipeline and producer-consumer, which are used in developing parallel applications using the TPL. A Pipeline design is one where an application is designed with multiple tasks or stages of functionality with queues of work items between them. So, for each stage, the application will read from a queue of work to be performed, execute the work on that item, and then queue the results for the next stage. By designing the application this way, all of the stages can execute in parallel. Each stage just reads from its work queue, performs the work, and puts the results of the work into the queue for the next stage.

Each stage is a task and can run independently of the other stages or tasks. They continue executing until their queue is empty and marked completed. They also block and wait for more work items if the queue is empty but not completed.

The producer-consumer design pattern is a similar concept but different. In this design, we have a set of functionality that produces data that is then consumed by another set of functionality. Each set of functionality is a TPL task. So, we have a producer task and a consumer task, with a buffer between them. Each of these tasks can run independently of each other. We can also have multiple producer tasks and multiple consumer tasks. The producers run independently and produce queue results to the buffer. The consumers run independently and dequeue from the buffer and perform work on the item. The producer can block if the buffer is full and wait for room to become available before producing more results. Also, the consumer can block if the buffer is empty, waiting on more results to be available to consume.

In this article, you will learn the following:

  • Designing an application with a Pipeline design
  • Designing an application with a producer-consumer design
  • Learning how to use BlockingCollection
  • Learning how to use BufferedBlocks
  • Understanding the classes of the System.Threading.Tasks.Dataflow library

(For more resources related to this topic, see here.)

Pipeline design pattern

The Pipeline design is very useful in parallel design when you can divide an application up into series of tasks to be performed in such a way that each task can run concurrently with other tasks. It is important that the output of each task is in the same order as the input. If the order does not matter, then a parallel loop can be performed. When the order matters and we don’t want to wait until all items have completed task A before the items start executing task B, then a Pipeline implementation is perfect.

Some applications that lend themselves to pipelining are video streaming, compression, and encryption. In each of these examples, we need to perform a set of tasks on the data and preserve the data’s order, but we do not want to wait for each item of data to perform a task before any of the data can perform the next task.

The key class that .NET has provided for implementing this design pattern is BlockingCollection of the System.Collections.Concurrent namespace. The BlockingCollection class was introduced with .NET 4.5. It is a thread-safe collection specifically designed for producer-consumer and Pipeline design patterns. It supports concurrently adding and removing items by multiple threads to and from the collection. It also has methods to add and remove that block when the collection is full or empty. You can specify a maximum collection size to ensure a producing task that outpaces a consuming task does not make the queue too large. It supports cancellation tokens. Finally, it supports enumerations so that you can use the foreach loop when processing items of the collection.

A producer of items to the collection can call the CompleteAdding method when the last item of data has been added to the collection. Until this method is called if a consumer is consuming items from the collection with a foreach loop and the collection is empty, it will block until an item is put into the collection instead of ending the loop.

Next, we will see a simple example of a Pipeline design implementation using an encryption program. This program will implement three stages in our pipeline. The first stage will read a text file character-by-character and place each character into a buffer (BlockingCollection). The next stage will read each character out of the buffer and encrypt it by adding 1 to its ASCII number. It will then place the new character into our second buffer and write it to an encryption file. Our final stage will read the character out of the second buffer, decrypt it to its original character, and write it out to a new file and to the screen. As you will see, stages 2 and 3 will start processing characters before stage 1 has finished reading all the characters from the input file. And all of this will be done while maintaining the order of the characters so that the final output file is identical to the input file:

C# Multithreaded and Parallel Programming

Let’s get started.

How to do it

First, let’s open up Visual Studio and create a new Windows Presentation Foundation (WPF) application named PipeLineApplication and perform the following steps:

      1. Create a new class called Stages.cs. Next, make sure it has the following using statements.

        using System;
        using System.Collections.Concurrent;
        using System.Collections.Generic;
        using System.IO;
        using System.Linq;
        using System.Text;
        using System.Threading.Tasks;
        using System.Threading;
      2. In the MainWindow.xaml.cs file, make sure the following using statements are present:

        using System;
        using System.Collections.Concurrent;
        using System.Collections.Generic;
        using System.IO;
        using System.Linq;
        using System.Text;
        using System.Threading.Tasks;
        using System.Threading;
      3. Next, we will add a method for each of the three stages in our pipeline. First, we will create a method called FirstStage. It will take two parameters: one will be a BlockingCollection object that will be the output buffer of this stage, and the second will be a string pointing to the input data file. This will be a text file containing a couple of paragraphs of text to be encrypted. We will place this text file in the projects folder on C:. The FirstStage method will have the following code:

        public void FirstStage(BlockingCollection<char> output, String PipelineInputFile)
               {
                   String DisplayData = "";
                   try
                   {
                       foreach (char C in GetData(PipelineInputFile))
                       {
        //Displayed characters read in from the file.
                          DisplayData = DisplayData + C.ToString();
         
        // Add each character to the buffer for the next stage.
                           output.Add(C);
         
                       }
                   }
                   finally
                   {
                       output.CompleteAdding();
                    }
             }
      4. Next, we will add a method for the second stage called StageWorker. This method will not return any values and will take three parameters. One will be a BlockingCollection value that will be its input buffer, the second one will be the output buffer of the stage, and the final one will be a file path to store the encrypted text in a data file. The code for this method will look like this:

        public void StageWorker(BlockingCollection<char> input, BlockingCollection<char> output, String PipelineEncryptFile)
               {
                   String DisplayData = "";
         
                   try
                   {
                       foreach (char C in input.GetConsumingEnumerable())
                       {
                           //Encrypt each character.
                           char encrypted = Encrypt(C);
         
                           DisplayData = DisplayData + encrypted.ToString();
         
        //Add characters to the buffer for the next stage.
                           output.Add(encrypted);
         
                       }
         
        //write the encrypted string to the output file.
                        using (StreamWriter outfile =
                                   new StreamWriter(PipelineEncryptFile))
                       {
                           outfile.Write(DisplayData);
                       }
         
                   }
                   finally
                   {
                       output.CompleteAdding();
                   }
               }
      5. Now, we will add a method for the third and final stage of the Pipeline design. This method will be named FinalStage. It will not return any values and will take two parameters. One will be a BlockingCollection object that is the input buffer and the other will be a string pointing to an output data file. It will have the following code in it:

        public void FinalStage(BlockingCollection<char> input, String PipelineResultsFile)
               {
                   String OutputString = "";
                   String DisplayData = "";
         
                   //Read the encrypted characters from the buffer, decrypt them, and display them.
                   foreach (char C in input.GetConsumingEnumerable())
                   {
                       //Decrypt the data.
                       char decrypted = Decrypt(C);
         
                       //Display the decrypted data.
                       DisplayData = DisplayData + decrypted.ToString();
         
                       //Add to the output string.
                       OutputString += decrypted.ToString();
         
                   }
         
                   //write the decrypted string to the output file.
                   using (StreamWriter outfile =
                               new StreamWriter(PipelineResultsFile))
                   {
                       outfile.Write(OutputString);
                   }
               }
      6. Now that we have methods for the three stages of our pipeline, let’s add a few utility methods. The first of these methods will be one that reads in the input data file and places each character in the data file in a List object. This method will take a string parameter that has a filename and will return a List object of characters. It will have the following code:

        public List<char> GetData(String PipelineInputFile)
               {
                   List<char> Data = new List<char>();
         
                   //Get the Source data.
                   using (StreamReader inputfile = new StreamReader(PipelineInputFile))
                   {
                       while (inputfile.Peek() >= 0)
                       {
                           Data.Add((char)inputfile.Read());
                       }
         
                   }
         
                   return Data;
               }
      7. Now we will need a method to encrypt the characters. This will be a simple encryption method. The encryption method is not really important to this exercise. This exercise is designed to demonstrate the Pipeline design, not implement the world’s toughest encryption. This encryption will simply take each character and add one to its ASCII numerical value. The method will take a character type as an input parameter and return a character. The code for it will be as follows:

        public char Encrypt(char C)
               {
                   //Take the character, convert to an int, add 1, then convert back to a character.
                   int i = (int)C;
                   i = i + 1;
                   C = Convert.ToChar(i);
         
                   return C;
        }
      8. Now we will add one final method to the Stages class to decrypt a character value. It will simply do the reverse of the encrypt method. It will take the ASCII numerical value and subtract 1. The code for this method will look like this:

        public char Decrypt(char C)
             {
                   int i = (int)C;
                   i = i - 1;
                   C = Convert.ToChar(i);
         
                   return C;
               }
      9. Now that we are done with the Stages class, let’s switch our focus back to the MainWindow.xaml.cs file. First, you will need to add three using statements. They are for the StreamReader, StreamWriter, Threads, and BlockingCollection classes:

        using System.Collections.Concurrent;
        using System.IO;
        using System.Threading;
      10. At the top of the MainWindow class, we need four variables available for the whole class. We need three strings that point to our three data files—the input data, encrypted data, and output data. Then we will need a Stages object. These declarations will look like this:

        private static String PipelineResultsFile = @"c:projectsOutputData.txt";
               private static String PipelineEncryptFile = @"c:projectsEncryptData.txt";
               private static String PipelineInputFile = @"c:projectsInputData.txt";
               private Stages Stage;
      11. Then, in the MainWindow constructor method, right after the InitializeComponent call, add a line to instantiate our Stages object:

        //Create the Stage object and register the event listeners to update the UI as the stages work.
        Stage = new Stages();
      12. Next, add a button to the MainWindow.xaml file that will initiate the pipeline and encryption. Name this button control butEncrypt, and set its Content property to Encrypt File. Next, add a click event handler for this button in the MainWindow.xaml.cs file. Its event handler method will be butEncrypt_Click and will contain the main code for this application. It will instantiate two BlockingCollection objects for two queues. One queue between stages 1 and 2, and one queue between stages 2 and 3. This method will then create a task for each stage that executes the corresponding methods from the Stages classes. It will then start these three tasks and wait for them to complete. Finally, it will write the output of each stage to the input, encrypted, and results data files and text blocks for viewing. The code for it will look like the following code:

        private void butEncrpt_Click(object sender, RoutedEventArgs e)
               {
                   //PipeLine Design Pattern
         
                   //Create queues for input and output to stages.
                   int size = 20;
                   BlockingCollection<char> Buffer1 = new BlockingCollection<char>(size);
                   BlockingCollection<char> Buffer2 = new BlockingCollection<char>(size);
         
                   TaskFactory tasks = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
         
                   Task Stage1 = tasks.StartNew(() => Stage.FirstStage(Buffer1, PipelineInputFile));
                   Task Stage2 = tasks.StartNew(() => Stage.StageWorker(Buffer1, Buffer2, PipelineEncryptFile));
                   Task Stage3 = tasks.StartNew(() => Stage.FinalStage(Buffer2, PipelineResultsFile));
         
                   Task.WaitAll(Stage1, Stage2, Stage3);
         
                   //Display the 3 files.
                   using (StreamReader inputfile = new StreamReader(PipelineInputFile))
                   {
                       while (inputfile.Peek() >= 0)
                       {
                           tbStage1.Text = tbStage1.Text + (char)inputfile.Read();
                       }
         
                   }
                   using (StreamReader inputfile = new StreamReader(PipelineEncryptFile))
                   {
                        while (inputfile.Peek() >= 0)
                       {
                           tbStage2.Text = tbStage2.Text + (char)inputfile.Read();
                       }
         
                   }
                   using (StreamReader inputfile = new StreamReader(PipelineResultsFile))
                    {
                       while (inputfile.Peek() >= 0)
                       {
                           tbStage3.Text = tbStage3.Text + (char)inputfile.Read();
                       }
         
                   }
             }
      13. One last thing. Let’s add three textblocks to display the outputs. We will call these tbStage1, tbStage2, and tbStage3. We will also add three label controls with the text Input File, Encrypted File, and Output File. These will be placed by the corresponding textblocks. Now, the MainWindow.xaml file should look like the following screenshot:

        C# Multithreaded and Parallel Programming

      14. Now we will need an input data file to encrypt. We will call this file InputData.txt and put it in the C:projects folder on our computer. For our example, we have added the following text to it:

        C# Multithreaded and Parallel Programming

      15. We are all finished and ready to try it out. Compile and run the application and you should have a window that looks like the following screenshot:

        C# Multithreaded and Parallel Programming

      16. Now, click on the Encrypt File button and you should see the following output:

        C# Multithreaded and Parallel Programming

        As you can see, the input and output files look the same and the encrypted file looks different. Remember that Input File is the text we put in the input data text file; this is the input from the end of stage 1 after we have read the file in to a character list. Encrypted File is the output from stage 2 after we have encrypted each character. Output File is the output of stage 3 after we have decrypted the characters again. It should match Input File.

Now, let’s take a look at how this works.

How it works

Let’s look at the butEncrypt click event handler method in the MainWindow.xaml.cs file, as this is where a lot of the action takes place. Let’s examine the following lines of code:

           //Create queues for input and output to stages.
           int size = 20;
           BlockingCollection<char> Buffer1 = new BlockingCollection<char>(size);
           BlockingCollection<char> Buffer2 = new BlockingCollection<char>(size);
           TaskFactory tasks = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
 
           Task Stage1 = tasks.StartNew(() => Stage.FirstStage(Buffer1, PipelineInputFile));
           Task Stage2 = tasks.StartNew(() => Stage.StageWorker(Buffer1, Buffer2, PipelineEncryptFile));
           Task Stage3 = tasks.StartNew(() => Stage.FinalStage(Buffer2, PipelineResultsFile));

First, we create two queues that are implemented using BlockingCollection objects. Each of these is set with a size of 20 items. These two queues take a character datatype.

Then we create a TaskFactory object and use it to start three tasks. Each task uses a lambda expression that executes one of the stages methods from the Stages class—FirstStage, StageWorker, and FinalStage.

So, now we have three separate tasks running besides the main UI thread. Stage1 will read the input data file character by character and place each character in the queue Buffer1. Remember that this queue can only hold 20 items before it will block the FirstStage method waiting on room in the queue. This is how we know that Stage2 starts running before Stage1 completes. Otherwise, Stage1 will only queue the first 20 characters and then block.

Once Stage1 has read all of the characters from the input file and placed them into Buffer1, it then makes the following call:

           finally
           {
               output.CompleteAdding();
           }

This lets the BlockingCollection instance, Buffer1, to know that there are no more items to be put in the queue. So, when Stage2 has emptied the queue after Stage1 has called this method, it will not block but will instead continue until completion. Prior to the CompleteAdding method call, Stage2 will block if Buffer1 is empty, waiting until more items are placed in the queue. This is why a BlockingCollection instance was developed for Pipeline and producer-consumer applications. It provides the perfect mechanism for this functionality.

When we created the TaskFactory, we used the following parameter:

TaskCreationOptions.LongRunning

This tells the threadpool that these tasks may run for a long time and could occasionally block waiting on their queues. In this way, the threadpool can decide how to best manage the threads allocated for these tasks.

Now, let’s look at the code in Stage2—the StageWorker method. We need a way to remove items in an enumerable way so that we can iterate over the queues items with a foreach loop because we do not know how many items to expect. Also, since BlockingCollection objects support multiple consumers, we need a way to remove items that no other consumer might remove. We use this method of the BlockingCollection class:

foreach (char C in input.GetConsumingEnumerable())

This allows multiple consumers to remove items from a BlockingCollection instance while maintaining the order of the items. To further improve performance of this application (assuming we have enough available processing cores), we could create a fourth task that also runs the StageWorker method. So, then we would have two stages and two tasks running. This might be helpful if there are enough processing cores and stage 1 runs faster than stage 2. If this happens, it will continually fill the queue and block until space becomes available. But if we run multiple stage 2 tasks, then we will be able to keep up with stage 1.

Then, finally we have this line of code:

Task.WaitAll(Stage1, Stage2, Stage3);

This tells our button handler to wait until all of the tasks are complete. Once we have called the CompleteAdding method on each BlockingCollection instance and the buffers are then emptied, all of our stages will complete and the TaskFactory.WaitAll command will be satisfied and this method on the UI thread can complete its processing, which in this application is to update the UI and data files:

           //Display the 3 files.
           using (StreamReader inputfile = new StreamReader(PipelineInputFile))
           {
               while (inputfile.Peek() >= 0)
               {
                   tbStage1.Text = tbStage1.Text + (char)inputfile.Read();
               }
 
           }
           using (StreamReader inputfile = new StreamReader(PipelineEncryptFile))
           {
               while (inputfile.Peek() >= 0)
               {
                   tbStage2.Text = tbStage2.Text + (char)inputfile.Read();
               }
 
           }
           using (StreamReader inputfile = new StreamReader(PipelineResultsFile))
           {
               while (inputfile.Peek() >= 0)
               {
                   tbStage3.Text = tbStage3.Text + (char)inputfile.Read();
               }
 
           }

Next, experiment with longer running, more complex stages and multiple consumer stages. Also, try stepping through the application with the Visual Studio debugger. Make sure you understand the interaction between the stages and the buffers.

Explaining message blocks

Let’s talk for a minute about message blocks and the TPL. There is a new library that Microsoft has developed as part of the TPL, but it does not ship directly with .NET 4.5. This library is called the TPL Dataflow library. It is located in the System.Threading.Tasks.Dataflow namespace. It comes with various dataflow components that assist in asynchronous concurrent applications where messages need to be passed between multiple tasks or the data needs to be passed when it becomes available, as in the case of a web camera streaming video.

The Dataflow library’s message blocks are very helpful for design patterns such as Pipeline and producer-consumer where you have multiple producers producing data that can be consumed by multiple consumers. The two that we will take a look at are BufferBlock and ActionBlock.

The TPL Dataflow library contains classes to assist in message passing and parallelizing I/O-heavy applications that have a lot of throughput. It provides explicit control over how data is buffered and passed. Consider an application that asynchronously loads large binary files from storage and manipulates that data. Traditional programming requires that you use callbacks and synchronization classes, such as locks, to coordinate tasks and have access to data that is shared. By using the TPL Dataflow objects, you can create objects that process image files as they are read in from a disk location. You can set how data is handled when it becomes available. Because the CLR runtime engine manages dependencies between data, you do not have to worry about synchronizing access to shared data. Also, since the CLR engine schedules the work depending on the asynchronous arrival of data, the TPL Dataflow objects can improve performance by managing the threads the tasks run on.

In this section, we will cover two of these classes, BufferBlock and ActionBlock.

The TPL Dataflow library (System.Threading.Tasks.Dataflow) does not ship with .NET 4.5. To install System.Threading.Tasks.Dataflow, open your project in Visual Studio, select Manage NuGet Packages from under the Project menu and then search online for Microsoft.Tpl.Dataflow.

BufferBlock

The BufferBlock object in the Dataflow library provides a buffer to store data. The syntax is, BufferBlock<T>. The T indicates that the datatype is generic and can be of any type. All static variables of this object type are guaranteed to be thread-safe. BufferBlock is an asynchronous message structure that stores messages in a first-in-first-out queue. Messages can be “posted” to the queue by multiple producers and “received” from the queue by multiple consumers.

The TPL DatafLow library provides interfaces for three types of objects—source blocks, target blocks, and propagator blocks. BufferBlock is a general-purpose message block that can act as both a source and a target message buffer, which makes it perfect for a producer-consumer application design. To act as both a source and a target, it implements two interfaces defined by the TPL Dataflow library—ISourceBlock<TOutput> and ITargetBlock<TOutput>. So, in the application that we will develop in the Producer-consumer design pattern section of this article, you will see that the producer method implements BufferBlock using the ITargetBlock interface and the consumer implements BufferBlock with the ISourceBlock interface.

This will be the same BufferBlock object that they will act on but by defining their local objects with a different interface there will be different methods available to use. The producer method will have Post and Complete methods, and the consumer method will use the OutputAvailableAsync and Receive methods.

The BufferBlock object only has two properties, namely Count, which is a count of the number of data messages in the queue, and Completion, which gets a task that is an asynchronous operation and completion of the message block.

The following is a set of methods for this class:

C# Multithreaded and Parallel Programming

Referenced from http://msdn.microsoft.com/en-us/library/hh160414(v=vs.110).aspx

Here is a list of the extension methods provided by the interfaces that it implements:

C# Multithreaded and Parallel Programming

C# Multithreaded and Parallel Programming

Referenced from http://msdn.microsoft.com/en-us/library/hh160414(v=vs.110).aspx

Finally, here are the interface references for this class:

C# Multithreaded and Parallel Programming

Referenced from http://msdn.microsoft.com/en-us/library/hh160414(v=vs.110).aspx

So, as you can see, these interfaces make using the BufferBlock object as a general-purpose queue between stages of a pipeline very easy. This technique is also useful between producers and consumers in a producer-consumer design pattern.

ActionBlock

Another very useful object in the Dataflow library is ActionBlock. Its syntax is ActionBlock<TInput>, where TInput is an Action object. ActionBlock is a target block that executes a delegate when a message of data is received. The following is a very simple example of using an ActionBlock:

           ActionBlock<int> action = new ActionBlock<int>(x => Console.WriteLine(x));
 
           action.Post(10);

In this sample piece of code, the ActionBlock object is created with an integer parameter and executes a simple lambda expression that does a Console.WriteLine when a message of data is posted to the buffer. So, when the action.Post(10) command is executed, the integer, 10, is posted to the ActionBlock buffer and then the ActionBlock delegate, implemented as a lambda expression in this case, is executed.

In this example, since this is a target block, we would then need to call the Complete method to ensure the message block is completed.

Another handy method of the BufferBlock is the LinkTo method. This method allows you to link ISourceBlock to ITargetBlock. So, you can have a BufferBlock that is implemented as an ISourceBlock and link it to an ActionBlock since it is an ITargetBlock. In this way, an Action delegate can be executed when a BufferBlock receives data. This does not dequeue the data from the message block. It just allows you to execute some task when data is received into the buffer.

ActionBlock only has two properties, namely InputCount, which is a count of the number of data messages in the queue, and Completion, which gets a task that is an asynchronous operation and completion of the message block. It has the following methods:

C# Multithreaded and Parallel Programming

Referenced from http://msdn.microsoft.com/en-us/library/hh194684(v=vs.110).aspx

The following extension methods are implemented from its interfaces:

C# Multithreaded and Parallel Programming

Referenced from http://msdn.microsoft.com/en-us/library/hh194684(v=vs.110).aspx

Also, it implements the following interfaces:

C# Multithreaded and Parallel Programming

Referenced from http://msdn.microsoft.com/en-us/library/hh194684(v=vs.110).aspx

Now that we have examined a little of the Dataflow library that Microsoft has developed, let’s use it in a producer-consumer application.

Producer-consumer design pattern

Now, that we have covered the TPL’s Dataflow library and the set of objects it provides to assist in asynchronous message passing between concurrent tasks, let’s take a look at the producer-consumer design pattern. In a typical producer-consumer design, we have one or more producers putting data into a queue or message data block. Then we have one or more consumers taking data from the queue and processing it. This allows for asynchronous processing of data. Using the Dataflow library objects, we can create a consumer task that monitors a BufferBlock and pulls items of the data from it when they arrive. If no items are available, the consumer method will block until items are available or the BufferBlock has been set to Complete. Because of this, we can start our consumer at any time, even before the producer starts to put items into the queue.

Then we create one or more tasks that produce items and place them into the BufferBlock. Once the producers are finished processing all items of data to the BufferBlock, they can mark the block as Complete. Until then, the BufferBlock object is still available to add items into. This is perfect for long-running tasks and applications when we do not know when the data will arrive.

Because the producer task is implementing an input parameter of a BufferBlock as an ITargetBlock object and the consumer task is implementing an input parameter of a BufferBlock as an ISourceBlock, they can both use the same BufferBlock object but have different methods available to them. One has methods to produces items to the block and mark it complete. The other one has methods to receive items and wait for more items until the block is marked complete. In this way, the Dataflow library implements the perfect object to act as a queue between our producers and consumers.

Now, let’s take a look at the application we developed previously as a Pipeline design and modify it using the Dataflow library. We will also remove a stage so that it just has two stages, one producer and one consumer.

How to do it

The first thing we need to do is open Visual Studio and create a new console application called ProducerConsumerConsoleApp. We will use a console application this time just for ease. Our main purpose here is to demonstrate how to implement the producer-consumer design pattern using the TPL Dataflow library.

Once you have opened Visual Studio and created the project, we need to perform the following steps:

        1. First, we need to install and add a reference to the TPL Dataflow library. The TPL Dataflow library (System.Threading.Tasks.Dataflow) does not ship with .NET 4.5. Select Manage NuGet Packages from under the Project menu and then search online for Microsoft.Tpl.Dataflow.
        2. Now, we will need to add two using statements to our program. One for StreamReader and StreamWriter and one for the BufferBlock object:
          using System.Threading.Tasks.Dataflow;
          using System.IO;
        3. Now, let’s add two static strings that will point to our input data file and the encrypted data file that we output:

          private static String PipelineEncryptFile = @"c:projectsEncryptData.txt";
                 private static String PipelineInputFile = @"c:projectsInputData.txt";
        4. Next, let’s add a static method that will act as our producer. This method will have the following code:

                 // Our Producer method.
                 static void Producer(ITargetBlock<char> Target)
                 {
                     String DisplayData = "";
           
                     try
                     {
                         foreach (char C in GetData(PipelineInputFile))
                         {
           
                             //Displayed characters read in from the file.
                             DisplayData = DisplayData + C.ToString();
           
                             // Add each character to the buffer for the next stage.
                             Target.Post(C);
           
                         }
                     }
           
                     finally
                     {
                         Target.Complete();
                     }
           
                 }
        5. Then we will add a static method to perform our consumer functionality. It will have the following code:

                 // This is our consumer method. IT runs asynchronously.
                 static async Task<int> Consumer(ISourceBlock<char> Source)
                 {
                     String DisplayData = "";
           
                     // Read from the source buffer until the source buffer has no
                     // available output data.
                     while (await Source.OutputAvailableAsync())
                     {
                             char C = Source.Receive();
           
                             //Encrypt each character.
                             char encrypted = Encrypt(C);
           
                             DisplayData = DisplayData + encrypted.ToString();
           
                     }
           
                     //write the decrypted string to the output file.
                     using (StreamWriter outfile =
                                  new StreamWriter(PipelineEncryptFile))
                     {
                         outfile.Write(DisplayData);
                     }
           
                     return DisplayData.Length;
                 }
        6. Then, let’s create a simple static helper method to read our input data file and put it in a List collection character by character. This will give us a character list for our producer to use. The code in this method will look like this:

                 public static List<char> GetData(String PipelineInputFile)
                 {
                     List<char> Data = new List<char>();
           
                     //Get the Source data.
                     using (StreamReader inputfile = new StreamReader(PipelineInputFile))
                     {
                         while (inputfile.Peek() >= 0)
                         {
                             Data.Add((char)inputfile.Read());
                         }
           
                     }
           
                     return Data;
                 }
        7. Next, we will add a static method to encrypt our characters. This method will work like the one we used in our pipelining application. It will add one to the ASCII numerical value of the character:

                 public static char Encrypt(char C)
                 {
                     //Take the character, convert to an int, add 1, then convert back to a character.
                     int i = (int)C;
                     i = i + 1;
                     C = Convert.ToChar(i);
           
                     return C;
                 }
          Then, we need to add the code for our
        8. Then, we need to add the code for our Main method. This method will start our consumer and producer tasks. Then, when they have completed processing, it will display the results in the console. The code for this method looks like this:
                 static void Main(string[] args)
                 {
                     // Create the buffer block object to use between the producer and consumer.
                     BufferBlock<char> buffer = new BufferBlock<char>();
           
                     // The consumer method runs asynchronously. Start it now.
                     Task<int> consumer = Consumer(buffer);
           
                     // Post source data to the dataflow block.
                     Producer(buffer);
           
                     // Wait for the consumer to process all data.
                     consumer.Wait();
           
                     // Print the count of characters from the input file.
                     Console.WriteLine("Processed {0} bytes from input file.", consumer.Result);
           
                     //Print out the input file to the console.
                     Console.WriteLine("rnrn");
                     Console.WriteLine("This is the input data file. rn");
                     using (StreamReader inputfile = new StreamReader(PipelineInputFile))
                     {
                         while (inputfile.Peek() >= 0)
                         {
                             Console.Write((char)inputfile.Read());
                         }
           
                     }
           
                     //Print out the encrypted file to the console.
                     Console.WriteLine("rnrn");
                     Console.WriteLine("This is the encrypted data file. rn");
                     using (StreamReader encryptfile = new StreamReader(PipelineEncryptFile))
                     {
                         while (encryptfile.Peek() >= 0)
                         {
                             Console.Write((char)encryptfile.Read());
                         }
           
                     }
           
                    //Wait before closing the application so we can see the results.
                     Console.ReadLine();
                 }
        9. That is all the code that is needed. Now, let’s build and run the application using the following input data file:

          C# Multithreaded and Parallel Programming

        10. Once it runs and completes, your output should look like the following screenshot:

C# Multithreaded and Parallel Programming

Now, try this with your own data files and inputs. Let’s examine what happened and how this works.

How it works

First we will go through the Main method. The first thing Main does is create a BufferBlock object called buffer. This will be used as the queue of items between our producer and consumer. This BufferBlock is defined to accept character datatypes.

Next, we start our consumer task using this command:

Task<int> consumer = Consumer(buffer);

Also, note that when this buffer object goes into the consumer task, it is cast as ISourceBlock. Notice the method header of our consumer:

static async Task<int> Consumer(ISourceBlock<char> Source)

Next, our Main method starts our producer task using the following command:

Producer(buffer);

Then we wait until our consumer task finishes, using this command:

consumer.Wait();

So, now our Main method just waits. Its work is done for now. It has started both the producer and consumer tasks. Now our consumer is waiting for items to appear in its BufferBlock so it can process them. The consumer will stay in the following loop until all items are removed from the message block and the block has been completed, which is done by someone calling its Complete method:

     while (await Source.OutputAvailableAsync())
           {
                   char C = Source.Receive();
 
                   //Encrypt each character.
                   char encrypted = Encrypt(C);
 
                   DisplayData = DisplayData + encrypted.ToString();
 
           }

So, now our consumer task will loop asynchronously, removing items from the message queue as they appear. It uses the following command in the while loop to do this:

await Source.OutputAvailableAsync())

Likewise, other consumer tasks can run at the same time and do the same thing. If the producer is adding items to the block quicker than the consumer can process them, then adding another consumer will improve performance. Once an item is available, then the consumer calls the following command to get the item from the buffer:

char C = Source.Receive();

Since the buffer contains items of type character, we place the item received into a character value. Then the consumer processes it by encrypting the character and appending it to our display string:

Now, let’s look at the consumer. The consumer first gets its data by calling the following command:

GetData(PipelineInputFile)

This method returns a List collection of characters that has an item for each character in the input data file. Now the producer iterates through the collection and uses the following command to place each item into the buffer block:

Target.Post(C);

Also, notice in the method header for our consumer that we cast our buffer as an ITargetBlock type:

static void Producer(ITargetBlock<char> Target)

Once the producer is done processing characters and adding them to the buffer, it officially closes the BufferBlock object using this command:

Target.Complete();

That is it for the producer and consumer. Once the Main method is done waiting on the consumer to finish, it then uses the following code to write out the number of characters processed, the input data, and the encrypted data:

     // Print the count of characters from the input file.
           Console.WriteLine("Processed {0} bytes from input file.", consumer.Result);
 
           //Print out the input file to the console.
           Console.WriteLine("rnrn");
           Console.WriteLine("This is the input data file. rn");
           using (StreamReader inputfile = new StreamReader(PipelineInputFile))
           {
               while (inputfile.Peek() >= 0)
               {
                   Console.Write((char)inputfile.Read());
               }
 
           }
 
           //Print out the encrypted file to the console.
           Console.WriteLine("rnrn");
           Console.WriteLine("This is the encrypted data file. rn");
           using (StreamReader encryptfile = new StreamReader(PipelineEncryptFile))
           {
               while (encryptfile.Peek() >= 0)
               {
                   Console.Write((char)encryptfile.Read());
               }
 
           }

Now that you are comfortable implementing a basic producer-consumer design using objects from the TPL Dataflow library, try experimenting with this basic idea but use multiple producers and multiple consumers all with the same BufferBlock object as the queue between them all.

Also, try converting our original Pipeline application from the beginning of the article into a TPL Dataflow producer-consumer application with two sets of producers and consumers. The first will act as stage 1 and stage 2, and the second will act as stage 2 and stage 3. So, in effect, stage 2 will be both a consumer and a producer.

Summary

We have covered a lot in this article. We have learned the benefits and how to implement a Pipeline design pattern and a producer-consumer design pattern. As we saw, these are both very helpful design patterns when building parallel and concurrent applications that require multiple asynchronous processes of data between tasks.

In the Pipeline design, we are able to run multiple tasks or stages concurrently even though the stages rely on data being processed and output by other stages. This is very helpful for performance since all functionality doesn’t have to wait on each stage to finish processing every item of data. In our example, we are able to start decrypting characters of data while a previous stage is still encrypting data and placing it into the queue.

In the Pipeline example, we examined the benefits of the BlockingCollection class in acting as a queue between stages in our pipeline.

Next, we explored the new TPL Dataflow library and some of its message block classes. These classes implement several interfaces defined in the library—ISourceBlock, ITargetBlock, and IPropogatorBlock. By implementing these interfaces, it allows us to write generic producer and consumer task functionality that can be reused in a variety of applications.

Both of these design patterns and the Dataflow library allow for easy implementations of common functionality in a concurrent manner. You will use these techniques in many applications, and this will become a go-to design pattern when you evaluate a system’s requirements and determine how to implement concurrency to help improve performance. Like all programming, parallel programming is made easier when you have a toolbox of easy-to-use techniques that you are comfortable with.

Most applications that benefit from parallelism will be conducive to some variation of a producer-consumer or Pipeline pattern. Also, the BlockingCollection and Dataflow message block objects are useful mechanisms for coordinating data between parallel tasks, no matter what design pattern is used in the application. It will be very useful to become comfortable with these messaging and queuing classes.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here