21 min read

(For more resources related to this topic, see here.)

Patterns in programming mean a concrete and standard solution to a given problem. Usually, programming patterns are the result of people gathering experience, analyzing the common problems, and providing solutions to these problems.

Since parallel programming has existed for quite a long time, there are many different patterns for programming parallel applications. There are even special programming languages to make programming of specific parallel algorithms easier. However, this is where things start to become increasingly complicated. In this article, I will provide a starting point from where you will be able to study parallel programming further. We will review very basic, yet very useful, patterns that are quite helpful for many common situations in parallel programming.

First is about using a shared-state object from multiple threads. I would like to emphasize that you should avoid it as much as possible. A shared state is really bad when you write parallel algorithms, but in many occasions it is inevitable. We will find out how to delay an actual computation of an object until it is needed, and how to implement different scenarios to achieve thread safety.

The next two recipes will show how to create a structured parallel data flow. We will review a concrete case of a producer/consumer pattern, which is called as Parallel Pipeline. We are going to implement it by just blocking the collection first, and then see how helpful is another library from Microsoft for parallel programming—TPL DataFlow.

The last pattern that we will study is the Map/Reduce pattern. In the modern world, this name could mean very different things. Some people consider map/reduce not as a common approach to any problem but as a concrete implementation for large, distributed cluster computations. We will find out the meaning behind the name of this pattern and review some examples of how it might work in case of small parallel applications.

Implementing Lazy-evaluated shared states

This recipe shows how to program a Lazy-evaluated thread-safe shared state object.

Getting ready

To start with this recipe, you will need a running Visual Studio 2012. There are no other prerequisites. The source code for this recipe can be found at Packt site.

How to do it…

For implementing Lazy-evaluated shared states, perform the following steps:

  1. Start Visual Studio 2012. Create a new C# Console Application project.
  2. In the Program.cs file, add the following using directives:

    using System; using System.Threading; using System.Threading.Tasks;

    
    
  3. Add the following code snippet below the Main method:

    static async Task ProcessAsynchronously() { var unsafeState = new UnsafeState(); Task[] tasks = new Task[4]; for (int i = 0; i < 4; i++) { tasks[i] = Task.Run(() => Worker(unsafeState)); } await Task.WhenAll(tasks); Console.WriteLine(” ————————— “); var firstState = new DoubleCheckedLocking(); for (int i = 0; i < 4; i++) { tasks[i] = Task.Run(() => Worker(firstState)); } await Task.WhenAll(tasks); Console.WriteLine(” ————————— “); var secondState = new BCLDoubleChecked(); for (int i = 0; i < 4; i++) { tasks[i] = Task.Run(() => Worker(secondState)); } await Task.WhenAll(tasks); Console.WriteLine(” ————————— “); var thirdState = new Lazy<ValueToAccess>(Compute); for (int i = 0; i < 4; i++) { tasks[i] = Task.Run(() => Worker(thirdState)); } await Task.WhenAll(tasks); Console.WriteLine(” ————————— “); var fourthState = new BCLThreadSafeFactory(); for (int i = 0; i < 4; i++) { tasks[i] = Task.Run(() => Worker(fourthState)); } await Task.WhenAll(tasks); Console.WriteLine(” ————————— “); } static void Worker(IHasValue state) { Console.WriteLine(“Worker runs on thread id {0}”,Thread
    .CurrentThread.ManagedThreadId); Console.WriteLine(“State value: {0}”, state.Value.Text); } static void Worker(Lazy<ValueToAccess> state) { Console.WriteLine(“Worker runs on thread id {0}”,Thread
    .CurrentThread.ManagedThreadId); Console.WriteLine(“State value: {0}”, state.Value.Text); } static ValueToAccess Compute() { Console.WriteLine(“The value is being constructed on athread id {0}”,
    Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(1)); return new ValueToAccess(string.Format(“Constructed on thread id {0}”,
    Thread.CurrentThread.ManagedThreadId)); } class ValueToAccess { private readonly string _text; public ValueToAccess(string text) { _text = text; } public string Text { get { return _text; } } } class UnsafeState : IHasValue { private ValueToAccess _value; public ValueToAccess Value { get { if (_value == null) { _value = Compute(); } return _value; } } } class DoubleCheckedLocking : IHasValue { private object _syncRoot = new object(); private volatile ValueToAccess _value; public ValueToAccess Value { get { if (_value == null) { lock (_syncRoot) { if (_value == null) _value = Compute(); } } return _value; } } } class BCLDoubleChecked : IHasValue { private object _syncRoot = new object(); private ValueToAccess _value; private bool _initialized = false; public ValueToAccess Value { get { return LazyInitializer.EnsureInitialized( ref _value, ref _initialized, ref _syncRoot,Compute); } } } class BCLThreadSafeFactory : IHasValue { private ValueToAccess _value; public ValueToAccess Value { get { return LazyInitializer.EnsureInitialized(ref _value,Compute); } } } interface IHasValue { ValueToAccess Value { get; } }

    
    
  4. Add the following code snippet inside the Main method:

    var t = ProcessAsynchronously(); t.GetAwaiter().GetResult(); Console.WriteLine(“Press ENTER to exit”); Console.ReadLine();

    
    
  5. Run the program.

How it works…

The first example show why it is not safe to use the UnsafeState object with multiple accessing threads. We see that the Construct method was called several times, and different threads use different values, which is obviously not right. To fix this, we can use a lock when reading the value, and if it is not initialized, create it first. It will work, but using a lock with every read operation is not efficient. To avoid using locks every time, there is a traditional approach called the double-checked locking pattern. We check the value for the first time, and if is not null, we avoid unnecessary locking and just use the shared object. However, if it was not constructed yet, we use the lock and then check the value for the second time, because it could be initialized between our first check and the lock operation. If it is still not initialized, only then we compute the value. We can clearly see that this approach works with the second example—there is only one call to the Construct method, and the first-called thread defines the shared object state.

Please note that if the lazy- evaluated object implementation is thread-safe, it does not automatically mean that all its properties are thread-safe as well.

If you add, for example, an int public property to the ValueToAccess object, it will not be thread-safe; you still have to use interlocked constructs or locking to ensure thread safety.

This pattern is very common, and that is why there are several classes in the Base Class Library to help us. First, we can use the LazyInitializer.EnsureInitialized method, which implements the double-checked locking pattern inside. However, the most comfortable option is to use the Lazy<T> class that allows us to have thread-safe Lazy-evaluated, shared state, out of the box. The next two examples show us that they are equivalent to the second one, and the program behaves the same. The only difference is that since LazyInitializer is a static class, we do not have to create a new instance of a class as we do in the case of Lazy<T>, and therefore the performance in the first case will be better in some scenarios.

The last option is to avoid locking at all, if we do not care about the Construct method. If it is thread-safe and has no side effects and/or serious performance impacts, we can just run it several times but use only the first constructed value. The last example shows the described behavior, and we can achieve this result by using another LazyInitializer.EnsureInitialized method overload.

Implementing Parallel Pipeline with BlockingCollection

This recipe will describe how to implement a specific scenario of a producer/consumer pattern, which is called Parallel Pipeline, using the standard BlockingCollection data structure.

Getting ready

To begin this recipe, you will need a running Visual Studio 2012. There are no other prerequisites. The source code for this recipe can be found at Packt site.

How to do it…

To understand how to implement Parallel Pipeline using BlockingCollection, perform the following steps:

  1. Start Visual Studio 2012. Create a new C# Console Application project.
  2. In the Program.cs file, add the following using directives:

    using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks;

    
    
  3. Add the following code snippet below the Main method:

    private const int CollectionsNumber = 4; private const int Count = 10; class PipelineWorker<TInput, TOutput> { Func<TInput, TOutput> _processor = null; Action<TInput> _outputProcessor = null; BlockingCollection<TInput>[] _input; CancellationToken _token; public PipelineWorker( BlockingCollection<TInput>[] input, Func<TInput, TOutput> processor, CancellationToken token, string name) { _input = input; Output = new BlockingCollection<TOutput>[_input.Length]; for (int i = 0; i < Output.Length; i++) Output[i] = null == input[i] ? null : new
    BlockingCollection<TOutput>(Count); _processor = processor; _token = token; Name = name; } public PipelineWorker( BlockingCollection<TInput>[] input, Action<TInput> renderer, CancellationToken token, string name) { _input = input; _outputProcessor = renderer; _token = token; Name = name; Output = null; } public BlockingCollection<TOutput>[] Output { get; private set; } public string Name { get; private set; } public void Run() { Console.WriteLine(“{0} is running”, this.Name); while (!_input.All(bc => bc.IsCompleted) &&
    !_token.IsCancellationRequested) { TInput receivedItem; int i = BlockingCollection<TInput>.TryTakeFromAny( _input, out receivedItem, 50, _token); if (i >= 0) { if (Output != null) { TOutput outputItem = _processor(receivedItem); BlockingCollection<TOutput>.AddToAny(Output,outputItem); Console.WriteLine(“{0} sent {1} to next,on thread id {2}”,
    Name, outputItem,Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromMilliseconds(100)); } else { _outputProcessor(receivedItem); } } else { Thread.Sleep(TimeSpan.FromMilliseconds(50)); } } if (Output != null) { foreach (var bc in Output) bc.CompleteAdding(); } } }

    
    
  4. Add the following code snippet inside the Main method:

    var cts = new
    CancellationTokenSource(); Task.Run(() => { if (Console.ReadKey().KeyChar == ‘c’) cts.Cancel(); }); var sourceArrays = new
    BlockingCollection<int>[CollectionsNumber]; for (int i = 0; i < sourceArrays.Length; i++) { sourceArrays[i] = new BlockingCollection<int>(Count); } var filter1 = new PipelineWorker<int, decimal> (sourceArrays, (n) => Convert.ToDecimal(n * 0.97), cts.Token, “filter1” ); var filter2 = new PipelineWorker<decimal, string> (filter1.Output, (s) => String.Format(“–{0}–“, s), cts.Token, “filter2” ); var filter3 = new PipelineWorker<string, string> (filter2.Output, (s) => Console.WriteLine(“The final result is {0} onthread id {1}”, s,
    Thread.CurrentThread.ManagedThreadId), cts.Token,”filter3″); try { Parallel.Invoke( () => { Parallel.For(0, sourceArrays.Length * Count,(j, state) => { if (cts.Token.IsCancellationRequested) { state.Stop(); } int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j); if (k >= 0) { Console.WriteLine(“added {0} to source data onthread id {1}”, j,
    Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromMilliseconds(100)); } }); foreach (var arr in sourceArrays) { arr.CompleteAdding(); } }, () => filter1.Run(), () => filter2.Run(), () => filter3.Run() ); } catch (AggregateException ae) { foreach (var ex in ae.InnerExceptions) Console.WriteLine(ex.Message + ex.StackTrace); } if (cts.Token.IsCancellationRequested) { Console.WriteLine(“Operation has been canceled!Press ENTER to exit.”); } else { Console.WriteLine(“Press ENTER to exit.”); } Console.ReadLine();

    
    
  5. Run the program.

How it works…

In the preceding example, we implement one of the most common parallel programming scenarios. Imagine that we have some data that has to pass through several computation stages, which take a significant amount of time. The latter computation requires the results of the former, so we cannot run them in parallel.

If we had only one item to process, there would not be many possibilities to enhance the performance. However, if we run many items through the set of same computation stages, we can use a Parallel Pipeline technique. This means that we do not have to wait until all items pass through the first computation stage to go to the next one. It is enough to have just one item that finishes the stage, we move it to the next stage, and meanwhile the next item is being processed by the previous stage, and so on. As a result, we almost have parallel processing shifted by a time required for the first item to pass through the first computation stage.

Here, we use four collections for each processing stage, illustrating that we can process every stage in parallel as well. The first step that we do is to provide a possibility to cancel the whole process by pressing the C key. We create a cancellation token and run a separate task to monitor the C key. Then, we define our pipeline. It consists of three main stages. The first stage is where we put the initial numbers on the first four collections that serve as the item source to the latter pipeline. This code is inside the Parallel.For loop, which in turn is inside the Parallel.Invoke statement, as we run all the stages in parallel; the initial stage runs in parallel as well.

The next stage is defining our pipeline elements. The logic is defined inside the PipelineWorker class. We initialize the worker with the input collection, provide a transformation function, and then run the worker in parallel with the other workers. This way we define two workers, or filters, because they filter the initial sequence. One of them turns an integer into a decimal value, and the second one turns a decimal to a string. Finally, the last worker just prints every incoming string to the console. Everywhere we provide a running thread ID to see how everything works. Besides this, we added artificial delays, so the items processing will be more natural, as we really use heavy computations.

As a result, we see the exact expected behavior. First, some items are being created on the initial collections. Then, we see that the first filter starts to process them, and as they are being processed, the second filter starts to work, and finally the item goes to the last worker that prints it to the console.

Implementing Parallel Pipeline with TPL DataFlow

This recipe shows how to implement a Parallel Pipeline pattern with the help of TPL DataFlow library.

Getting ready

To start with this recipe, you will need a running Visual Studio 2012. There are no other prerequisites. The source code for this recipe could be found at Packt site.

How to do it…

To understand how to implement Parallel Pipeline with TPL DataFlow, perform the following steps:

  1. Start Visual Studio 2012. Create a new C# Console Application project.
  2. Add references to the Microsoft TPL DataFlow NuGet package.
    1. Right-click on the References folder in the project and select the Manage NuGet Packages… menu option.
    2. Now add your preferred references to the Microsoft TPL DataFlow NuGet package. You can use the search option in the Manage NuGet Packages dialog as follows:

  3. In the Program.cs file, add the following using directives:

    using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow;

    
    
  4. Add the following code snippet below the Main method:

    async static Task ProcessAsynchronously() { var cts = new CancellationTokenSource(); Task.Run(() => { if (Console.ReadKey().KeyChar == ‘c’) cts.Cancel(); }); var inputBlock = new BufferBlock<int>( new DataflowBlockOptions { BoundedCapacity = 5,
    CancellationToken = cts.Token }); var filter1Block = new TransformBlock<int, decimal>( n => { decimal result = Convert.ToDecimal(n * 0.97); Console.WriteLine(“Filter 1 sent {0} to the nextstage on thread id {1}”, result,
    Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromMilliseconds(100)); return result; }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 4,
    CancellationToken =cts.Token }); var filter2Block = new TransformBlock<decimal, string>( n => { string result = string.Format(“–{0}–“, n); Console.WriteLine(“Filter 2 sent {0} to the nextstage on thread id {1}”, result,
    Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromMilliseconds(100)); return result; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken =cts.Token }); var outputBlock = new ActionBlock<string>( s => { Console.WriteLine(“The final result is {0} on threadid {1}”, s,
    Thread.CurrentThread.ManagedThreadId); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken =cts.Token }); inputBlock.LinkTo(filter1Block, new DataflowLinkOptions
    {PropagateCompletion = true }); filter1Block.LinkTo(filter2Block, new DataflowLinkOptions
    { PropagateCompletion = true }); filter2Block.LinkTo(outputBlock, new DataflowLinkOptions
    { PropagateCompletion = true }); try { Parallel.For(0, 20, new ParallelOptions {MaxDegreeOfParallelism = 4, CancellationToken =cts.Token }, i => { Console.WriteLine(“added {0} to source data on threadid {1}”, i,
    Thread.CurrentThread.ManagedThreadId); inputBlock.SendAsync(i).GetAwaiter().GetResult(); }); inputBlock.Complete(); await outputBlock.Completion; Console.WriteLine(“Press ENTER to exit.”); } catch (OperationCanceledException) { Console.WriteLine(“Operation has been canceled!Press ENTER to exit.”); } Console.ReadLine(); }

    
    
  5. Add the following code snippet inside the Main method:

    var t = ProcessAsynchronously(); t.GetAwaiter().GetResult();

    
    
  6. Run the program.

How it works…

In the previous recipe, we have implemented a Parallel Pipeline pattern to process items through sequential stages. It is quite a common problem, and one of the proposed ways to program such algorithms is using a TPL DataFlow library from Microsoft. It is distributed via NuGet, and is easy to install and use in your application.

The TPL DataFlow library contains different type of blocks that can be connected with each other in different ways and form complicated processes that can be partially parallel and sequential where needed. To see some of the available infrastructure, let’s implement the previous scenario with the help of the TPL DataFlow library.

First, we define the different blocks that will be processing our data. Please note that these blocks have different options that can be specified during their construction; they can be very important. For example, we pass the cancellation token into every block we define, and when we signal the cancellation, all of them will stop working.

We start our process with BufferBlock. This block holds items to pass it to the next blocks in the flow. We restrict it to the five-items capacity, specifying the BoundedCapacity option value. This means that when there will be five items in this block, it will stop accepting new items until one of the existing items pass to the next blocks.

The next block type is TransformBlock. This block is intended for a data transformation step. Here we define two transformation blocks, one of them creates decimals from integers, and the second one creates a string from a decimal value. There is a MaxDegreeOfParallelism option for this block, specifying the maximum simultaneous worker threads.

The last block is the ActionBlock type. This block will run a specified action on every incoming item. We use this block to print our items to the console.

Now, we link these blocks together with the help of the LinkTo methods. Here we have an easy sequential data flow, but it is possible to create schemes that are more complicated. Here we also provide DataflowLinkOptions with the PropagateCompletion property set to true. This means that when the step completes, it will automatically propagate its results and exceptions to the next stage. Then we start adding items to the buffer block in parallel, calling the block’s Complete method, when we finish adding new items. Then we wait for the last block to complete. In case of a cancellation, we handle OperationCancelledException and cancel the whole process.

Implementing Map/Reduce with PLINQ

This recipe will describe how to implement the Map/Reduce pattern while using PLINQ.

Getting ready

To begin with this recipe, you will need a running Visual Studio 2012. There are no other prerequisites. The source code for this recipe can be found at Packt site.

How to do it…

To understand how to implement Map/Reduce with PLINQ, perform the following steps:

  1. Start Visual Studio 2012. Create a new C# Console Application project.
  2. In the Program.cs file, add the following using directives:

    using System; using System.Collections.Generic; using System.IO; using System.Linq;

    
    
  3. Add the following code snippet below the Main method:

    private static readonly char[] delimiters =Enumerable.Range(0, 256). Select(i => (char)i).Where(c =>!char.IsLetterOrDigit(c)).ToArray(); private const string textToParse = @” Call me Ishmael. Some years ago – never mind how long precisely – having little
    or no money in my purse, and nothing particular to interest me on
    shore, I thought I would sail about a little and see the watery part
    of the world. It is a way I have of driving off the spleen, and regulating
    the circulation. Whenever I find myself growing grim about the mouth; whenever
    it is a damp, drizzly November in my soul; whenever I find myself involuntarily
    pausing before coffin warehouses, and bringing up the rear of every
    funeral I meet; and especially whenever my hypos get such an upper hand of me
    , that it requires a strong moral principle to prevent me from deliberately
    stepping into the street, and methodically
    knocking people’s hats off – then, I account it high
    time to get to sea as soon as I can. ― Herman Melville, Moby Dick. “;

    
    
  4. Add the following code snippet inside the Main method:

    var q = textToParse.Split(delimiters) .AsParallel() .MapReduce( s => s.ToLower().ToCharArray() , c => c , g => new[] {new {Char = g.Key, Count = g.Count()}}) .Where(c => char.IsLetterOrDigit(c.Char)) .OrderByDescending( c => c.Count); foreach (var info in q) { Console.WriteLine(“Character {0} occured in the text {1}{2}”, info.Char,
    info.Count, info.Count == 1 ? “time” : “times”); } Console.WriteLine(” ——————————————-“); const string searchPattern = “en”; var q2 = textToParse.Split(delimiters) .AsParallel() .Where(s => s.Contains(searchPattern)) .MapReduce( s => new [] {s} , s => s , g => new[] {new {Word = g.Key, Count = g.Count()}}) .OrderByDescending(s => s.Count); Console.WriteLine(“Words with search pattern ‘{0}’:”,searchPattern); foreach (var info in q2) { Console.WriteLine(“{0} occured in the text {1} {2}”,info.Word, info.Count, info.Count == 1 ? “time” : “times”); } int halfLengthWordIndex = textToParse.IndexOf(‘ ‘,textToParse.Length/2); using(var sw = File.CreateText(“1.txt”)) { sw.Write(textToParse.Substring(0, halfLengthWordIndex)); } using(var sw = File.CreateText(“2.txt”)) { sw.Write(textToParse.Substring(halfLengthWordIndex)); } string[] paths = new[] { “.\” }; Console.WriteLine(” ————————————————“); var q3 = paths .SelectMany(p => Directory.EnumerateFiles(p, “*.txt”)) .AsParallel() .MapReduce( path => File.ReadLines(path).SelectMany(line =>line.Trim(delimiters).Split (delimiters)),word => string.IsNullOrWhiteSpace(word) ? ‘t’ :word.ToLower()[0], g =>
    new [] { new {FirstLetter = g.Key, Count = g.Count()}}) .Where(s => char.IsLetterOrDigit(s.FirstLetter)) .OrderByDescending(s => s.Count); Console.WriteLine(“Words from text files”); foreach (var info in q3) { Console.WriteLine(“Words starting with letter ‘{0}’occured in the text
    {1} {2}”, info.FirstLetter,info.Count, info.Count == 1 ? “time” : “times”); }

    
    
  5. Add the following code snippet after the Program class definition:

    static class PLINQExtensions { public static ParallelQuery<TResult>
    MapReduce<TSource,TMapped, TKey, TResult>( this ParallelQuery<TSource> source, Func<TSource, IEnumerable<TMapped>> map, Func<TMapped, TKey> keySelector, Func<IGrouping<TKey, TMapped>, IEnumerable<TResult>> reduce) { return source.SelectMany(map) .GroupBy(keySelector) .SelectMany(reduce); } }

    
    
  6. Run the program.

How it works…

The Map/Reduce functions are another important parallel programming pattern. It is suitable for a small program and large multi-server computations. The meaning of this pattern is that you have two special functions to apply to your data. The first of them is the Map function. It takes a set of initial data in a key/value list form and produces another key/value sequence, transforming the data to the comfortable format for further processing. Then we use another function called Reduce. The Reduce function takes the result of the Map function and transforms it to a smallest possible set of data that we actually need. To understand how this algorithm works, let’s look through the recipe.

First, we define a relatively large text in the string variable: textToParse. We need this text to run our queries on. Then we define our Map/Reduce implementation as a PLINQ extension method in the PLINQExtensions class. We use SelectMany to transform the initial sequence to the sequence we need by applying the Map function. This function produces several new elements from one sequence element. Then we choose how we group the new sequence with the keySelector function, and we use GroupBy with this key to produce an intermediate key/value sequence. The last thing we do is applying Reduce to the resulting grouped sequence to get the result.

In our first example, we split the text into separate words, and then we chop each word into character sequences with the help of the Map function, and group the result by the character value. The Reduce function finally transforms the sequence into a key value pair, where we have a character and a number for the times it was used in the text ordered by the usage. Therefore, we are able to count each character appearance in the text in parallel (since we use PLINQ to query the initial data).

The next example is quite similar, but now we use PLINQ to filter the sequence leaving only the words containing our search pattern, and we then get all those words sorted by their usage in the text.

Finally, the last example uses file I/O. We save the sample text on the disk, splitting it into two files. Then we define the Map function as producing a number of strings from the directory name, which are all the words from all the lines in all text files in the initial directory. Then we group those words by the first letter (filtering out the empty strings) and use reduce to see which letter is most often used as the first word letter in the text. What is nice is that we can easily change this program to be distributed by just using other implementations of map and reduce functions, and we still are able to use PLINQ with them to make our program easy to read and maintain.

Summary

In this article we covered implementing lazy-evaluated shared states, implementing Parallel Pipeline using BlockingCollection and TPL DataFlow, and finally we covered the implementation of Map/Reduce with PLINQ.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here