30 min read

In this article by Antonio Esposito from the book Reactive Programming for .NET Developers , we will see a practical example of what is reactive programming with pure C# coding.

The following topics will be discussed here:

  • IObserver interface
  • IObservable interface
  • Subscription life cycle
  • Sourcing events
  • Filtering events
  • Correlating events
  • Sourcing from CLR streams
  • Sourcing from CLR enumerables

(For more resources related to this topic, see here.)

IObserver interface

This core level interface is available within the Base Class Library (BCL) of .NET 4.0 and is available for the older 3.5 as an add-on.

The usage is pretty simple and the goal is to provide a standard way of handling the most basic features of any reactive message consumer.

Reactive messages flow by a producer and a consumer and subscribe for some messages. The IObserver C# interface is available to construct message receivers that comply with the reactive programming layout by implementing the three main message-oriented events: a message received, an error received, and a task completed message.

The IObserver interface has the following sign and description:

 // Summary:

    //     Provides a mechanism for receiving push-based notifications.

    //

    // Type parameters:

    //   T:

    //     The object that provides notification information.This type parameter is

    //     contravariant. That is, you can use either the type you specified or any

    //     type that is less derived. For more information about covariance and contravariance,

    //     see Covariance and Contravariance in Generics.

    public interface IObserver<in T>

    {

        // Summary:

        //     Notifies the observer that the provider has finished sending push-based notifications.

        void OnCompleted();

        //

        // Summary:

        //     Notifies the observer that the provider has experienced an error condition.

        //

        // Parameters:

        //   error:

        //     An object that provides additional information about the error.

        void OnError(Exception error);

        //

        // Summary:

        //     Provides the observer with new data.

        //

        // Parameters:

        //   value:

        //     The current notification information.

        void OnNext(T value);

    }

Any new message to flow to the receiver implementing such an interface will reach the OnNext method. Any error will reach the OnError method, while the task completed acknowledgement message will reach the OnCompleted method.

The usage of an interface means that we cannot use generic premade objects from the BCL. We need to implement any receiver from scratch by using such an interface as a service contract.

Let’s see an example because talking about a code example is always simpler than talking about something theoretic. The following examples show how to read from a console application command from a user in a reactive way:

cass Program

{

    static void Main(string[] args)

    {

        //creates a new console input consumer

        var consumer = new ConsoleTextConsumer();

 

        while (true)

        {

            Console.WriteLine("Write some text and press ENTER to send a messagernPress ENTER to exit");

            //read console input

            var input = Console.ReadLine();

 

            //check for empty messate to exit

            if (string.IsNullOrEmpty(input))

            {

                //job completed

                consumer.OnCompleted();

 

                Console.WriteLine("Task completed. Any further message will generate an error");

            }

            else

            {

                //route the message to the consumer

                consumer.OnNext(input);

            }

        }

    }

}

public class ConsoleTextConsumer : IObserver<string>

{

    private bool finished = false;

    public void OnCompleted()

    {

        if (finished)

        {

            OnError(new Exception("This consumer already finished it's lifecycle"));

            return;

        }

 

        finished = true;

        Console.WriteLine("<- END");

    }

 

    public void OnError(Exception error)

    {

        Console.WriteLine("<- ERROR");

        Console.WriteLine("<- {0}", error.Message);

    }

 

    public void OnNext(string value)

    {

        if (finished)

        {

            OnError(new Exception("This consumer finished its lifecycle"));

            return;

        }

 

        //shows the received message

        Console.WriteLine("-> {0}", value);

        //do something

 

        //ack the caller

        Console.WriteLine("<- OK");

    }

}

The preceding example shows the IObserver interface usage within the ConsoleTextConsumer class that simply asks a command console (DOS-like) for the user input text to do something. In this implementation, the class simply writes out the input text because we simply want to look at the reactive implementation.

The first important concept here is that a message consumer knows nothing about how messages are produced. The consumer simply reacts to one of the tree events (not CLR events). Besides this, some kind of logic and cross-event ability is also available within the consumer itself. In the preceding example, we can see that the consumer simply showed any received message again on the console. However, if a complete message puts the consumer in a finished state (by signaling the finished flag), any other message that comes on the OnNext method will be automatically routed to the error one. Likewise, any other complete message that will reach the consumer will produce another error once the consumer is already in the finished state.

IObservable interface

The IObservable interface, the opposite of the IObserver interface, has the task of handling message production and the observer subscription. It routes right messages to the OnNext message handler and errors to the OnError message handler. At its life cycle end, it acknowledges all the observers on the OnComplete message handler.

To create a valid reactive observable interface, we must write something that is not locking against user input or any other external system input data. The observable object acts as an infinite message generator, something like an infinite enumerable of messages; although in such cases, there is no enumeration.

Once a new message is available somehow, observer routes it to all the subscribers.

In the following example, we will try creating a console application to ask the user for an integer number and then route such a number to all the subscribers. Otherwise, if the given input is not a number, an error will be routed to all the subscribers.

This is observer similar to the one already seen in the previous example. Take a look at the following codes:

/// <summary>

/// Consumes numeric values that divides without rest by a given number

/// </summary>

public class IntegerConsumer : IObserver<int>

{

    readonly int validDivider;

    //the costructor asks for a divider

    public IntegerConsumer(int validDivider)

    {

        this.validDivider = validDivider;

    }

 

    private bool finished = false;

    public void OnCompleted()

    {

        if (finished)

            OnError(new Exception("This consumer already finished it's lifecycle"));

        else

        {

            finished = true;

            Console.WriteLine("{0}: END", GetHashCode());

        }

    }

 

    public void OnError(Exception error)

    {

        Console.WriteLine("{0}: {1}", GetHashCode(), error.Message);

    }

 

    public void OnNext(int value)

    {

        if (finished)

            OnError(new Exception("This consumer finished its lifecycle"));

 

        //the simple business logic is made by checking divider result

        else if (value % validDivider == 0)

            Console.WriteLine("{0}: {1} divisible by {2}", GetHashCode(), value, validDivider);

    }

}

This observer consumes integer numeric messages, but it requires that the number is divisible by another one without producing any rest value. This logic, because of the encapsulation principle, is within the observer object. The observable interface, instead, only has the logic of the message sending of valid or error messages.

This filtering logic is made within the receiver itself. Although that is not something wrong, in more complex applications, specific filtering features are available in the publish-subscribe communication pipeline. In other words, another object will be available between observable (publisher) and observer (subscriber) that will act as a message filter.

Back to our numeric example, here we have the observable implementation made using an inner Task method that does the main job of parsing input text and sending messages. In addition, a cancellation token is available to handle the user cancellation request and an eventual observable dispose:

//Observable able to parse strings from the Console

//and route numeric messages to all subscribers

public class ConsoleIntegerProducer : IObservable<int>, IDisposable

{

    //the subscriber list

    private readonly List<IObserver<int>> subscriberList = new List<IObserver<int>>();

 

    //the cancellation token source for starting stopping

    //inner observable working thread

    private readonly CancellationTokenSource cancellationSource;

    //the cancellation flag

    private readonly CancellationToken cancellationToken;

    //the running task that runs the inner running thread

    private readonly Task workerTask;

    public ConsoleIntegerProducer()

    {

        cancellationSource = new CancellationTokenSource();

        cancellationToken = cancellationSource.Token;

        workerTask = Task.Factory.StartNew(OnInnerWorker, cancellationToken);

    }

           

    //add another observer to the subscriber list

    public IDisposable Subscribe(IObserver<int> observer)

    {

        if (subscriberList.Contains(observer))

            throw new ArgumentException("The observer is already subscribed to this observable");

 

        Console.WriteLine("Subscribing for {0}", observer.GetHashCode());

        subscriberList.Add(observer);

 

        return null;

    }

 

    //this code executes the observable infinite loop

    //and routes messages to all observers on the valid

    //message handler

    private void OnInnerWorker()

    {

        while (!cancellationToken.IsCancellationRequested)

        {

            var input = Console.ReadLine();

            int value;

 

            foreach (var observer in subscriberList)

                if (string.IsNullOrEmpty(input))

                    break;

                else if (input.Equals("EXIT"))

                {

                    cancellationSource.Cancel();

                    break;

                }

                else if (!int.TryParse(input, out value))

                    observer.OnError(new FormatException("Unable to parse given value"));

                else

                    observer.OnNext(value);

        }

        cancellationToken.ThrowIfCancellationRequested();

    }

 

 

    //cancel main task and ack all observers

    //by sending the OnCompleted message

    public void Dispose()

    {

        if (!cancellationSource.IsCancellationRequested)

        {

            cancellationSource.Cancel();

            while (!workerTask.IsCanceled)

                Thread.Sleep(100);

        }

 

        cancellationSource.Dispose();

        workerTask.Dispose();

 

        foreach (var observer in subscriberList)

            observer.OnCompleted();

    }

 

    //wait until the main task completes or went cancelled

    public void Wait()

    {

        while (!(workerTask.IsCompleted || workerTask.IsCanceled))

            Thread.Sleep(100);

    }

}

To complete the example, here there is the program Main:

static void Main(string[] args)

{

    //this is the message observable responsible of producing messages

    using (var observer = new ConsoleIntegerProducer())

    //those are the message observer that consume messages

    using (var consumer1 = observer.Subscribe(new IntegerConsumer(2)))

    using (var consumer2 = observer.Subscribe(new IntegerConsumer(3)))

    using (var consumer3 = observer.Subscribe(new IntegerConsumer(5)))

        observer.Wait();

 

    Console.WriteLine("END");

    Console.ReadLine();

}

The cancellationToken.ThrowIfCancellationRequested may raise an exception in your Visual Studio when debugging. Simply go next by pressing F5, or test such code example without the attached debugger by starting the test with Ctrl + F5 instead of the F5 alone.

The application simply creates an observable variable, which is able to parse user data. Then, register three observers specifying to each observer variables the wanted valid divider value.

Then, the observable variable will start reading user data from the console and valid or error messages will flow to all the observers. Each observer will apply its internal logic of showing the message when it divides for the related divider.

Here is the result of executing the application:

Observables and observers in action

Subscription life cycle

What will happen if we want to stop a single observer from receiving messages from the observable event source? If we change the program Main from the preceding example to the following one, we could experience a wrong observer life cycle design. Here’s the code:

//this is the message observable responsible of producing messages

using (var observer = new ConsoleIntegerProducer())

//those are the message observer that consume messages

using (var consumer1 = observer.Subscribe(new IntegerConsumer(2)))

using (var consumer2 = observer.Subscribe(new IntegerConsumer(3)))

{

    using (var consumer3 = observer.Subscribe(new IntegerConsumer(5)))

    {

        //internal lifecycle

    }

 

    observer.Wait();

}

 

Console.WriteLine("END");

Console.ReadLine();

Here is the result in the output console:

The third observer unable to catch value messages

By using the using construct method, we should stop the life cycle of the consumer object. However, we do not, because in the previous example, the Subscribe method of the observable simply returns a NULL object.

To create a valid observer, we must handle and design its life cycle management. This means that we must eventually handle the external disposing of the Subscribe method’s result by signaling the right observer that his life cycle reached the end.

We have to create a Subscription class to handle an eventual object disposing in the right reactive way by sending the message for the OnCompleted event handler.

Here is a simple Subscription class implementation:

/// <summary>

/// Handle observer subscription lifecycle

/// </summary>

public sealed class Subscription<T> : IDisposable

{

    private readonly IObserver<T> observer;

    public Subscription(IObserver<T> observer)

    {

        this.observer = observer;

    }

 

    //the event signalling that the observer has

    //completed its lifecycle

    public event EventHandler<IObserver<T>> OnCompleted;

 

    public void Dispose()

    {

        if (OnCompleted != null)

            OnCompleted(this, observer);

 

        observer.OnCompleted();

    }

}

The usage is within the observable Subscribe method. Here’s an example:

//add another observer to the subscriber list

public IDisposable Subscribe(IObserver<int> observer)

{

    if (observerList.Contains(observer))

        throw new ArgumentException("The observer is already subscribed to this observable");

 

    Console.WriteLine("Subscribing for {0}", observer.GetHashCode());

    observerList.Add(observer);

 

    //creates a new subscription for the given observer

var subscription = new Subscription<int>(observer);

//handle to the subscription lifecycle end event

    subscription.OnCompleted += OnObserverLifecycleEnd;

    return subscription;

}

 

void OnObserverLifecycleEnd(object sender, IObserver<int> e)

{

    var subscription = sender as Subscription<int>;

    //remove the observer from the internal list within the observable

    observerList.Remove(e);

    //remove the handler from the subscription event

    //once already handled

    subscription.OnCompleted -= OnObserverLifecycleEnd;

}

As visible, the preceding example creates a new Subscription<T> object to handle this observer life cycle with the IDisposable.Dispose method.

Here is the result of such code edits against the full example available in the previous paragraph:

The observer will end their life as we dispose their life cycle tokens

This time, an observer ends up its life cycle prematurely by disposing the subscription object. This is visible by the first END message. Later, only two observers remain available at the application ending; when the user asks for EXIT, only such two observers end their life cycle by themselves rather than by the Subscription disposing.

In real-world applications, often, observers subscribe to observables and later unsubscribe by disposing the Subscription token. This happens because we do not always want a reactive module to handle all the messages. In this case, this means that we have to handle the observer life cycle by ourselves, as we already did in the previous examples, or we need to apply filters to choose which messages flows to which subscriber, as visible in the later section Filtering events. Kindly consider that although filters make things easier, we will always have to handle the observer life cycle.

Sourcing events

Sourcing events is the ability to obtain from a particular source where few useful events are usable in reactive programming.

Reactive programming is all about event message handling. Any event is a specific occurrence of some kind of handleable behavior of users or external systems. We can actually program event reactions in the most pleasant and productive way for reaching our software goals.

In the following example, we will see how to react to CLR events. In this specific case, we will handle filesystem events by using events from the System.IO.FileSystemWatcher class that gives us the ability to react to the filesystem’s file changes without the need of making useless and resource-consuming polling queries against the file system status.

Here’s the observer and observable implementation:

public sealed class NewFileSavedMessagePublisher : IObservable<string>, IDisposable

{

    private readonly FileSystemWatcher watcher;

    public NewFileSavedMessagePublisher(string path)

    {

        //creates a new file system event router

        this.watcher = new FileSystemWatcher(path);

        //register for handling File Created event

        this.watcher.Created += OnFileCreated;

        //enable event routing

        this.watcher.EnableRaisingEvents = true;

    }

 

    //signal all observers a new file arrived

    private void OnFileCreated(object sender, FileSystemEventArgs e)

    {

        foreach (var observer in subscriberList)

            observer.OnNext(e.FullPath);

    }

 

    //the subscriber list

    private readonly List<IObserver<string>> subscriberList = new List<IObserver<string>>();

 

    public IDisposable Subscribe(IObserver<string> observer)

    {

        //register the new observer

        subscriberList.Add(observer);

 

        return null;

    }

 

    public void Dispose()

    {

        //disable file system event routing

        this.watcher.EnableRaisingEvents = false;

        //deregister from watcher event handler

        this.watcher.Created -= OnFileCreated;

        //dispose the watcher

        this.watcher.Dispose();

 

        //signal all observers that job is done

        foreach (var observer in subscriberList)

            observer.OnCompleted();

    }

}

 

/// <summary>

/// A tremendously basic implementation

/// </summary>

public sealed class NewFileSavedMessageSubscriber : IObserver<string>

{

    public void OnCompleted()

    {

        Console.WriteLine("-> END");

    }

 

    public void OnError(Exception error)

    {

        Console.WriteLine("-> {0}", error.Message);

    }

 

    public void OnNext(string value)

    {

        Console.WriteLine("-> {0}", value);

    }

}

The observer interface simply gives us the ability to write text to the console. I think, there is nothing to say about it.

On the other hand, the observable interface makes the most of the job in this implementation.

The observable interface creates the watcher object and registers the right event handler to catch the wanted reactive events. It handles the life cycle of itself and the internal watcher object. Then, it correctly sends the OnComplete message to all the observers.

Here’s the program’s initialization:

static void Main(string[] args)

{

    Console.WriteLine("Watching for new files");

    using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]"))

    using (var subscriber = publisher.Subscribe(new NewFileSavedMessageSubscriber()))

    {

        Console.WriteLine("Press RETURN to exit");

        //wait for user RETURN

        Console.ReadLine();

    }

}

Any new file that arises in the folder will let route  full FileName to observer. This is the result of a copy and paste of the same file three times:

-> [YOUR PATH]out – Copy.png
-> [YOUR PATH]out – Copy (2).png
-> [YOUR PATH]out – Copy (3).png

By using a single observable interface and a single observer interface, the power of reactive programming is not so evident. Let’s begin with writing some intermediate object to change the message flow within the pipeline of our message pump made in a reactive way with filters, message correlator, and dividers.

Filtering events

As said in the previous section, it is time to alter message flow.

The observable interface has the task of producing messages, while observer at the opposite consumes such messages. To create a message filter, we need to create an object that is a publisher and subscriber altogether.

The implementation must take into consideration the filtering need and the message routing to underlying observers that subscribes to the filter observable object instead of the main one.

Here’s an implementation of the filter:

/// <summary>

/// The filtering observable/observer

/// </summary>

public sealed class StringMessageFilter : IObservable<string>, IObserver<string>, IDisposable

{

    private readonly string filter;

    public StringMessageFilter(string filter)

    {

        this.filter = filter;

    }

 

    //the observer collection

    private readonly List<IObserver<string>> observerList = new List<IObserver<string>>();

    public IDisposable Subscribe(IObserver<string> observer)

    {

        this.observerList.Add(observer);

        return null;

    }

 

    //a simple implementation

    //that disables message routing once

    //the OnCompleted has been invoked

    private bool hasCompleted = false;

    public void OnCompleted()

    {

        hasCompleted = true;

        foreach (var observer in observerList)

            observer.OnCompleted();

    }

 

    //routes error messages until not completed

    public void OnError(Exception error)

    {

        if (!hasCompleted)

            foreach (var observer in observerList)

                observer.OnError(error);

    }

 

    //routes valid messages until not completed

    public void OnNext(string value)

    {

        Console.WriteLine("Filtering {0}", value);

 

        if (!hasCompleted && value.ToLowerInvariant().Contains(filter.ToLowerInvariant()))

            foreach (var observer in observerList)

                observer.OnNext(value);

    }

 

    public void Dispose()

    {

        OnCompleted();

    }

}

This filter can be used together with the example from the previous section that routes the FileSystemWatcher events of created files. This is the new program initialization:

static void Main(string[] args)

{

    Console.WriteLine("Watching for new files");

    using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]"))

    using (var filter = new StringMessageFilter(".txt"))

    {

        //subscribe the filter to publisher messages

        publisher.Subscribe(filter);

        //subscribe the console subscriber to the filter

        //instead that directly to the publisher

        filter.Subscribe(new NewFileSavedMessageSubscriber());

 

        Console.WriteLine("Press RETURN to exit");

        Console.ReadLine();

    }

}

As visible, this new implementation creates a new filter object that takes parameter to verify valid filenames to flow to the underlying observers.

The filter subscribes to the main observable object, while the observer subscribes to the filter itself. It is like a chain where each chain link refers to the near one.

This is the output console of the running application:

The filtering observer in action

Although I made a copy of two files (a .png and a .txt file), we can see that only the text file reached the internal observer object, while the image file reached the OnNext of filter because the invalid against the filter argument never reached internal observer.

Correlating events

Sometimes, especially when dealing with integration scenarios, there is the need of correlating multiple events that not always came altogether. This is the case of a header file that came together with multiple body files.

In reactive programming, correlating events means correlating multiple observable messages into a single message that is the result of two or more original messages. Such messages must be somehow correlated to a value (an ID, serial, or metadata) that defines that such initial messages belong to the same correlation set.

Useful features in real-world correlators are the ability to specify a timeout (that may be infinite too) in the correlation waiting logic and the ability to specify a correlation message count (infinite too).

Here’s a correlator implementation made for the previous example based on the FileSystemWatcher class:

public sealed class FileNameMessageCorrelator : IObservable<string>, IObserver<string>, IDisposable

{

    private readonly Func<string, string> correlationKeyExtractor;

    public FileNameMessageCorrelator(Func<string, string> correlationKeyExtractor)

    {

        this.correlationKeyExtractor = correlationKeyExtractor;

    }

 

    //the observer collection

    private readonly List<IObserver<string>> observerList = new List<IObserver<string>>();

    public IDisposable Subscribe(IObserver<string> observer)

    {

        this.observerList.Add(observer);

        return null;

    }

 

    private bool hasCompleted = false;

    public void OnCompleted()

    {

        hasCompleted = true;

        foreach (var observer in observerList)

            observer.OnCompleted();

    }

 

    //routes error messages until not completed

    public void OnError(Exception error)

    {

        if (!hasCompleted)

            foreach (var observer in observerList)

                observer.OnError(error);

    }

Just a pause. Up to this row, we simply created the reactive structure of FileNameMessageCorrelator class by implementing the two main interfaces. Here is the core implementation that correlates messages:

//the container of correlations able to contain

//multiple strings per each key

private readonly NameValueCollection correlations = new NameValueCollection();

 

//routes valid messages until not completed

public void OnNext(string value)

{

    if (hasCompleted) return;

 

    //check if subscriber has completed

    Console.WriteLine("Parsing message: {0}", value);

 

    //try extracting the correlation ID

    var correlationID = correlationKeyExtractor(value);

 

    //check if the correlation is available

    if (correlationID == null) return;

 

    //append the new file name to the correlation state

    correlations.Add(correlationID, value);

 

    //in this example we will consider always

    //correlations of two items

    if (correlations.GetValues(correlationID).Count() == 2)

    {

        //once the correlation is complete

        //read the two files and push the

        //two contents altogether to the

        //observers

 

        var fileData = correlations.GetValues(correlationID)

            //route messages to the ReadAllText method

            .Select(File.ReadAllText)

            //materialize the query

            .ToArray();

 

        var newValue = string.Join("|", fileData);

 

        foreach (var observer in observerList)

            observer.OnNext(newValue);

 

        correlations.Remove(correlationID);

    }

}

This correlator class accepts a correlation function as a constructor parameter. This function is later used to evaluate correlationID when a new filename variable flows within the OnNext method.

Once the function returns valid correlationID, such IDs will be used as key for NameValueCollection, a specialized string collection to store multiple values per key. When there are two values for the same key, correlation is ready to flow out to the underlying observers by reading file data and joining such data into a single string message.

Here’s the application’s initialization:

static void Main(string[] args)

{

    using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]"))

    //creates a new correlator by specifying the correlation key

    //extraction function made with a Regular expression that

    //extract a file ID similar to FILEID0001

    using (var correlator = new FileNameMessageCorrelator(ExtractCorrelationKey))

    {

        //subscribe the correlator to publisher messages

        publisher.Subscribe(correlator);

 

        //subscribe the console subscriber to the correlator

        //instead that directly to the publisher

        correlator.Subscribe(new NewFileSavedMessageSubscriber());

 

        //wait for user RETURN

        Console.ReadLine();

    }

}

 

private static string ExtractCorrelationKey(string arg)

{

    var match = Regex.Match(arg, "(FILEID\d{4})");

    if (match.Success)

        return match.Captures[0].Value;

    else

        return null;

}

The initialization is quite the same of the filtering example seen in the previous section. The biggest difference is that the correlator object, instead of a string filter variable, accepts a function that analyses the incoming filename and produces the eventually available correlationID variable.

I prepared two files with the same ID in filename variable. Here’s the console output of the running example:

Two files correlated by their name

As visible, correlator made its job by joining the two file’s data into a single message regardless of the order in which the two files were stored in the filesystem.

These examples regarding the filtering and correlation of messages should give you the idea that we can do anything with received messages. We can put a message in standby until a correlated message comes, we can join multiple messages into one, we can produce multiple times the same message, and so on.

This programming style opens the programmer’s mind to lot of new application designs and possibilities.

Sourcing from CLR streams

Any class that extends System.IO.Stream is some kind of cursor-based flow of data. The same happens when we want to see a video stream, a sort of locally not persisted data that flows only in the network with the ability to go forward and backward, stop, pause, resume, play, and so on. The same behavior is available while streaming any kind of data, thus, the Stream class is the base class that exposes such behavior for any need.

There are specialized classes that extend Stream, helping work with the streams of text data (StreamWriter and StreamReader), binary serialized data (BinaryReader and BinaryWriter), memory-based temporary byte containers (MemoryStream), network-based streams (NetworkStream), and lot of others.

Regarding reactive programming, we are dealing with the ability to source events from any stream regardless of its kind (network, file, memory, and so on).

Real-world applications that use reactive programming based on streams are cheats, remote binary listeners (socket programming), and any other unpredictable event-oriented applications. On the other hand, it is useless to read a huge file in reactive way, because there is simply nothing reactive in such cases.

It is time to look at an example. Here’s a complete example of a reactive application made for listening to a TPC port and route string messages (CR + LF divides multiple messages) to all the available observers. The program Main and the usual ConsoleObserver methods are omitted for better readability:

public sealed class TcpListenerStringObservable : IObservable<string>, IDisposable

    {

        private readonly TcpListener listener;

        public TcpListenerStringObservable(int port, int backlogSize = 64)

        {

            //creates a new tcp listener on given port

            //with given backlog size

            listener = new TcpListener(IPAddress.Any, port);

            listener.Start(backlogSize);

 

            //start listening asynchronously

            listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected);

        }

 

        private void OnTcpClientConnected(Task<TcpClient> clientTask)

        {

            //if the task has not encountered errors

            if (clientTask.IsCompleted)

                //we will handle a single client connection per time

                //to handle multiple connections, simply put following

                //code into a Task

                using (var tcpClient = clientTask.Result)

                using (var stream = tcpClient.GetStream())

                using (var reader = new StreamReader(stream))

                    while (tcpClient.Connected)

                    {

                        //read the message

                        var line = reader.ReadLine();

 

                        //stop listening if nothing available

                        if (string.IsNullOrEmpty(line))

                            break;

                        else

                        {

                            //construct observer message adding client's remote endpoint address and port

                            var msg = string.Format("{0}: {1}", tcpClient.Client.RemoteEndPoint, line);

 

                            //route messages

                            foreach (var observer in observerList)

                                observer.OnNext(msg);

                        }

                    }

 

            //starts another client listener

            listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected);

        }

 

        private readonly List<IObserver<string>> observerList = new List<IObserver<string>>();

        public IDisposable Subscribe(IObserver<string> observer)

        {

            observerList.Add(observer);

 

            //subscription lifecycle missing

            //for readability purpose

            return null;

        }

 

        public void Dispose()

        {

            //stop listener

            listener.Stop();

        }

    }

The preceding example shows how to create a reactive TCP listener that acts as observable of string messages.

The observable method uses an internal TcpListener class that provides mid-level network services across an underlying Socket object. The example asks the listener to start listening and starts waiting for a client into another thread with the usage of a Task object. When a remote client becomes available, its communication with the internals of observable is guaranteed by the OnTcpClientConneted method that verifies the normal execution of Task. Then, it catches TcpClient from Task, reads the network stream, and appends StreamReader to such a network stream to start a reading feature.

Once the message reading feature is complete, another Task starts repeating the procedure. Although, this design handles a backlog of pending connections, it makes available only a single client per time. To change such designs to handle multiple connections altogether, simply encapsulate the OnTcpClientConnected logic. Here’s an example:

private void OnTcpClientConnected(Task<TcpClient> clientTask)

{

    //if the task has not encountered errors

    if (clientTask.IsCompleted)

        Task.Factory.StartNew(() =>

            {

                using (var tcpClient = clientTask.Result)

                using (var stream = tcpClient.GetStream())

                using (var reader = new StreamReader(stream))

                    while (tcpClient.Connected)

                    {

                        //read the message

                        var line = reader.ReadLine();

 

                        //stop listening if nothing available

                        if (string.IsNullOrEmpty(line))

                            break;

                        else

                        {

                            //construct observer message adding client's remote endpoint address and port

                            var msg = string.Format("{0}: {1}", tcpClient.Client.RemoteEndPoint, line);

 

                            //route messages

                            foreach (var observer in observerList)

                                observer.OnNext(msg);

                        }

                    }

            }, TaskCreationOptions.PreferFairness);

 

    //starts another client listener

    listener.AcceptTcpClientAsync().ContinueWith(OnTcpClientConnected);

}

This is the output of the reactive application when it receives two different connections by using telnet as a client (C:>telnet localhost 8081). The program Main and the usual ConsoleObserver methods are omitted for better readability:

The observable routing events from the telnet client

As you can see, each client starts connecting to the listener by using a different remote port. This gives us the ability to differentiate multiple remote connections although they connect altogether.

Sourcing from CLR enumerables

Sourcing from a finite collection is something useless with regard to reactive programming. Differently, specific enumerable collections are perfect for reactive usages. These collections are the changeable collections that support collection change notifications by implementing the INotifyCollectionChanged(System.Collections.Specialized) interface like the ObservableCollection(System.Collections.ObjectModel) class and any infinite collection that supports the enumerator pattern with the usage of the yield keyword.

Changeable collections

The ObservableCollection<T> class gives us the ability to understand, in an event-based way, any change that occurs against the collection content. Kindly consider that changes regarding collection child properties are outside of the collection scope. This means that we are notified only for collection changes like the one produced from the Add or Remove methods. Changes within a single item does not produce an alteration of the collection size, thus, they are not notified at all.

Here’s a generic (nonreactive) example:

static void Main(string[] args)

{

    //the observable collection

    var collection = new ObservableCollection<string>();

    //register a handler to catch collection changes

    collection.CollectionChanged += OnCollectionChanged;

 

    collection.Add("ciao");

    collection.Add("hahahah");

 

    collection.Insert(0, "new first line");

    collection.RemoveAt(0);

 

    Console.WriteLine("Press RETURN to EXIT");

    Console.ReadLine();

}

 

private static void OnCollectionChanged(object sender, NotifyCollectionChangedEventArgs e)

{

    var collection = sender as ObservableCollection<string>;

 

    if (e.NewStartingIndex >= 0) //adding new items

        Console.WriteLine("-> {0} {1}", e.Action, collection[e.NewStartingIndex]);

    else //removing items

        Console.WriteLine("-> {0} at {1}", e.Action, e.OldStartingIndex);

}

As visible, collection notifies all the adding operations, giving the ability to catch the new message. The Insert method signals an Add operation; although with the Insert method, we can specify the index and the value will be available within collection. Obviously, the parameter containing the index value (e.NewStartingIndex) contains the new index accordingly to the right operation. Differently, the Remove operation, although notifying the removed element index, cannot give us the ability to read the original message before the removal, because the event triggers after the remove operation has already occurred.

In a real-world reactive application, the most interesting operation against ObservableCollection is the Add operation. Here’s an example (console observer omitted for better readability):

class Program

{

    static void Main(string[] args)

    {

        //the observable collection

        var collection = new ObservableCollection<string>();

 

        using (var observable = new NotifiableCollectionObservable(collection))

        using (var observer = observable.Subscribe(new ConsoleStringObserver()))

        {

            collection.Add("ciao");

            collection.Add("hahahah");

 

            collection.Insert(0, "new first line");

            collection.RemoveAt(0);

 

            Console.WriteLine("Press RETURN to EXIT");

            Console.ReadLine();

        }

    }

 

public sealed class NotifiableCollectionObservable : IObservable<string>, IDisposable

{

    private readonly ObservableCollection<string> collection;

    public NotifiableCollectionObservable(ObservableCollection<string> collection)

    {

        this.collection = collection;

        this.collection.CollectionChanged += collection_CollectionChanged;

    }

 

    private readonly List<IObserver<string>> observerList = new List<IObserver<string>>();

    public IDisposable Subscribe(IObserver<string> observer)

    {

        observerList.Add(observer);

 

        //subscription lifecycle missing

        //for readability purpose

        return null;

    }

 

    public void Dispose()

    {

        this.collection.CollectionChanged -= collection_CollectionChanged;

 

        foreach (var observer in observerList)

            observer.OnCompleted();

    }

}

}

The result is the same as the previous example about ObservableCollection without the reactive objects. The only difference is that observable routes only messages when the Action values add.

The ObservableCollection signaling its content changes

Infinite collections

Our last example is regarding sourcing events from an infinite collection method.

In C#, it is possible to implement the enumerator pattern by signaling each object to enumerate per time, thanks to the yield keyword. Here’s an example:

static void Main(string[] args)

{

    foreach (var value in EnumerateValuesFromSomewhere())

        Console.WriteLine(value);

}

 

static IEnumerable<string> EnumerateValuesFromSomewhere()

{

    var random = new Random(DateTime.Now.GetHashCode());

    while (true) //forever

    {

        //returns a random integer number as string

        yield return random.Next().ToString();

        //some throttling time

        Thread.Sleep(100);

    }

}

This implementation is powerful because it never materializes all the values into the memory. It simply signals that a new object is available to the enumerator that the foreach structure internally uses by itself. The result is writing forever numbers onto the output console.

Somehow, this behavior is useful for reactive usage, because it never creates a useless state like a temporary array, list, or generic collection. It simply signals new items available to the enumerable.

Here’s an example:

public sealed class EnumerableObservable : IObservable<string>, IDisposable

    {

        private readonly IEnumerable<string> enumerable;

        public EnumerableObservable(IEnumerable<string> enumerable)

        {

            this.enumerable = enumerable;

            this.cancellationSource = new CancellationTokenSource();

            this.cancellationToken = cancellationSource.Token;

            this.workerTask = Task.Factory.StartNew(() =>

                {

                    foreach (var value in this.enumerable)

                    {

                        //if task cancellation triggers, raise the proper exception

                        //to stop task execution

                        cancellationToken.ThrowIfCancellationRequested();

 

                        foreach (var observer in observerList)

                            observer.OnNext(value);

                    }

                }, this.cancellationToken);

        }

 

        //the cancellation token source for starting stopping

        //inner observable working thread

        private readonly CancellationTokenSource cancellationSource;

        //the cancellation flag

        private readonly CancellationToken cancellationToken;

        //the running task that runs the inner running thread

        private readonly Task workerTask;

        //the observer list

        private readonly List<IObserver<string>> observerList = new List<IObserver<string>>();

        public IDisposable Subscribe(IObserver<string> observer)

        {

            observerList.Add(observer);

 

            //subscription lifecycle missing

            //for readability purpose

            return null;

        }

 

        public void Dispose()

        {

            //trigger task cancellation

            //and wait for acknoledge

            if (!cancellationSource.IsCancellationRequested)

            {

                cancellationSource.Cancel();

                while (!workerTask.IsCanceled)

                    Thread.Sleep(100);

            }

 

            cancellationSource.Dispose();

            workerTask.Dispose();

 

            foreach (var observer in observerList)

                observer.OnCompleted();

        }

    }

This is the code of the program startup with the infinite enumerable generation:

class Program

    {

        static void Main(string[] args)

        {

            //we create a variable containing the enumerable

            //this does not trigger item retrieval

            //so the enumerator does not begin flowing datas

            var enumerable = EnumerateValuesFromSomewhere();

 

            using (var observable = new EnumerableObservable(enumerable))

            using (var observer = observable.Subscribe(new ConsoleStringObserver()))

            {

                //wait for 2 seconds than exit

                Thread.Sleep(2000);

            }

 

            Console.WriteLine("Press RETURN to EXIT");

            Console.ReadLine();

        }

 

        static IEnumerable<string> EnumerateValuesFromSomewhere()

        {

            var random = new Random(DateTime.Now.GetHashCode());

            while (true) //forever

            {

                //returns a random integer number as string

                yield return random.Next().ToString();

                //some throttling time

                Thread.Sleep(100);

            }

        }

    }

As against the last examples, here we have the usage of the Task class. The observable uses the enumerable within the asynchronous Task method to give the programmer the ability to stop the execution of the whole operation by simply exiting the using scope or by manually invoking the Dispose method.

This example shows a tremendously powerful feature: the ability to yield values without having to source them from a concrete (finite) array or collection by simply implementing the enumerator pattern. Although few are used, the yield operator gives the ability to create complex applications simply by pushing messages between methods. The more methods we create that cross send messages to each other, the more complex business logics the application can handle.

Consider the ability to catch all such messages with observables, and you have a little idea about how powerful reactive programming can be for a developer.

Summary

In this article, we had the opportunity to test the main features that any reactive application must implement: message sending, error sending, and completing acknowledgement. We focused on plain C# programming to give the first overview of how reactive classic designs can be applied to all main application needs, such as sourcing from streams, from user input, from changeable and infinite collection.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here