12 min read

In this article by Andreas Niedermair, the author of the book Mastering ServiceStack, we will see the communication between asynchronous components. The recent release of .NET has added several new ways to further embrace asynchronous and parallel processing by introducing the Task Parallel Library (TPL) and async and await.

(For more resources related to this topic, see here.)

The need for asynchronous processing has been there since the early days of programming. Its main concept is to offload the processing to another thread or process to release the calling thread from waiting and it has become a standard model since the rise of GUIs.

In such interfaces only one thread is responsible for drawing the GUI, which must not be blocked in order to remain available and also to avoid putting the application in a non-responding state.

This paradigm is a core point in distributed systems, at some point, long running operations are offloaded to a separate component, either to overcome blocking or to avoid resource bottlenecks using dedicated machines, which also makes the processing more robust against unexpected application pool recycling and other such issues.

A synonym for “fire-and-forget” is “one-way”, which is also reflected by the design of static routes of ServiceStack endpoints, where the default is /{format}/oneway/{service}.

Asynchronism adds a whole new level of complexity to our processing chain, as some callers might depend on a return value. This problem can be overcome by adding callback or another event to your design.

Messaging or in general a producer consumer chain is a fundamental design pattern, which can be applied within the same process or inter-process, on the same or a cross machine to decouple components.

Consider the following architecture:

The client issues a request to the service, which processes the message and returns a response. The server is known and is directly bound to the client, which makes an on-the-fly addition of servers practically impossible. You’d need to reconfigure the clients to reflect the collection of servers on every change and implement a distribution logic for requests.

Therefore, a new component is introduced, which acts as a broker (without any processing of the message, except delivery) between the client and service to decouple the service from the client.

This gives us the opportunity to introduce more services for heavy load scenarios by simply registering a new instance to the broker, as shown in the following figure:.

I left out the clustering (scaling) of brokers and also the routing of messages on purpose at this stage of introduction.

In many cross process scenarios a database is introduced as a broker, which is constantly polled by services (and clients, if there’s a response involved) to check whether there’s a message to be processed or not. Adding a database as a broker and implementing your own logic can be absolutely fine for basic systems, but for more advanced scenarios it lacks some essential features, which Messages Queues come shipped with.

  • Scalability: Decoupling is the biggest step towards a robust design, as it introduces the possibility to add more processing nodes to your data flow.
  • Resilience: Messages are guaranteed to be delivered and processed as automatic retrying is available for non-acknowledged (processed) messages. If the retry count is exceeded, failed messages are stored in a Dead Letter Queue (DLQ) to be inspected later and are requeued after fixing the issue that caused the failure. In case of a partial failure of your infrastructure, clients can still produce messages that get delivered and processed as soon as there is even a single consumer back online.
  • Pushing instead of polling: This is where asynchronism comes into play, as clients do not constantly poll for messages but instead it gets pushed by the broker when there’s a new message in their subscribed queue. This minimizes the spinning and wait time, when the timer ticks only for 10 seconds.
  • Guaranteed order: Most Message Queues offer a guaranteed order of the processing under defined conditions (mostly FIFO).
  • Load balancing: With multiple services registered for messages, there is an inherent load balancing so that the heavy load scenarios can be handled better. In addition to this round-robin routing there are other routing logics, such as smallest-mailbox, tail-chopping, or random routing.
  • Message persistence: Message Queues can be configured to persist their data to disk and even survive restarts of the host on which they are running. To overcome the downtime of the Message Queue you can even setup a cluster to offload the demand to other brokers while restarting a single node.
  • Built-in priority: Message Queues usually have separate queues for different messages and even provide a separate in queue for prioritized messages.

There are many more features, such as Time to live, security and batching modes, which we will not cover as they are outside the scope of ServiceStack.

In the following example we will refer to two basic DTOs:

public class Hello : ServiceStack.IReturn<HelloResponse>
{
public string Name { get; set; }
}
public class HelloResponse
{
public string Result { get; set; }
}

The Hello class is used to send a Name to a consumer that generates a message, which will be enqueued in the Message Queue as well.

RabbitMQ

RabbitMQ is a mature broker built on top of the Advanced Message Queuing Protocol (AMQP), which makes it possible to solve even more complex scenarios, as shown here:

The messages will survive restarts of the RabbitMQ service and the additional guaranty of delivery is accomplished by depending upon an acknowledgement of the receipt (and processing) of the message, by default it is done by ServiceStack for typical scenarios.

The client of this Message Queue is located in the ServiceStack.RabbitMq object’s NuGet package (it uses the official client in the RabbitMQ.Client package under the hood).

You can add additional protocols to RabbitMQ, such as Message Queue Telemetry Transport (MQTT) and Streaming Text Oriented Messaging Protocol (STOMP), with plugins to ease Interop scenarios.

Due to its complexity, we will focus on an abstracted interaction with the broker. There are many books and articles available for a deeper understanding of RabbitMQ. A quick overview of the covered scenarios is available at https://www.rabbitmq.com/getstarted.html.

The method of publishing a message with RabbitMQ does not differ much from RedisMQ:

using ServiceStack;
using ServiceStack.RabbitMq;

using (var rabbitMqServer = new RabbitMqServer())
{
using (var messageProducer =
rabbitMqServer.CreateMessageProducer())
{
   var hello = new Hello
   {
     Name = "Demo"
   };
   messageProducer.Publish(hello);
}
}

This will create a Helloobject and publish it to the corresponding queue in RabbitMQ. To retrieve this message, we need to register a handler, as shown here:

using System;
using ServiceStack;
using ServiceStack.RabbitMq;
using ServiceStack.Text;

var rabbitMqServer = new RabbitMqServer();
rabbitMqServer.RegisterHandler<Hello>(message =>
{
var hello = message.GetBody();
var name = hello.Name;
var result = "Hello {0}".Fmt(name);

result.Print();

return null;
});
rabbitMqServer.Start();

"Listening for hello messages".Print();

Console.ReadLine();

rabbitMqServer.Dispose();

This registers a handler for Hello objects and prints a message to the console.

In favor of a straightforward example we are omitting all the parameters with default values of the constructor of RabbitMqServer, which will connect us to the local instance at port 5672. To change this, you can either provide a connectionString parameter (and optional credentials) or use a RabbitMqMessageFactory object to customize the connection.

Setup

Setting up RabbitMQ involves a bit of effort. At first you need to install Erlang from http://www.erlang.org/download.html, which is the runtime for RabbitMQ due to its functional and concurrent nature. Then you can grab the installer from https://www.rabbitmq.com/download.html, which will set RabbitMQ up and running as a service with a default configuration.

Processing chain

Due to its complexity, the processing chain with any mature Message Queue is different from what you know from RedisMQ. Exchanges are introduced in front of queues to route the messages to their respective queues according to their routing keys:

The default exchange name is mx.servicestack (defined in ServiceStack.Messaging.QueueNames.Exchange) and is used in any Publish to call an IMessageProducer or IMessageQueueClient object. With IMessageQueueClient.Publish you can inject a routing key (queueName parameter), to customize the routing of a queue. Failed messages are published to the ServiceStack.Messaging.QueueNames.ExchangeDlq (mx.servicestack.dlq) and routed to queues with the name mq:{type}.dlq. Successful messages are published to ServiceStack.Messaging.QueueNames.ExchangeTopic (mx.servicestack.topic) and routed to the queue mq:{type}.outq. Additionally, there’s also a priority queue to the in-queue with the name mq:{type}.priority.

If you interact with RabbitMQ on a lower level, you can directly publish to queues and leave the routing via an exchange out of the picture.

Each queue has features to define whether the queue is durable, deletes itself after the last consumer disconnected, or which exchange is to be used to publish dead messages with which routing key.

More information on the concepts, different exchange types, queues, and acknowledging messages can be found at https://www.rabbitmq.com/tutorials/amqp-concepts.html.

Replying directly back to the producer

Messages published to a queue are dequeued in FIFO mode, hence there is no guarantee if the responses are delivered to the issuer of the initial message or not. To force a response to the originator you can make use of the ReplyTo property of a message:

using System;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Text;

var rabbitMqServer = new RabbitMqServer();
var messageQueueClient =
rabbitMqServer.CreateMessageQueueClient();

var queueName = messageQueueClient.GetTempQueueName();
var hello = new Hello
{
Name = "reply to originator"
};
messageQueueClient.Publish(new Message<Hello>(hello)
{
ReplyTo = queueName
});
var message = messageQueueClient.Get<HelloResponse>(queueName);
var helloResponse = message.GetBody();

This code is more or less identical to the RedisMQ approach, but it does something different under the hood. The messageQueueClient.GetTempQueueName object creates a temporary queue, whose name is generated by ServiceStack.Messaging.QueueNames.GetTempQueueName. This temporary queue does not survive a restart of RabbitMQ, and gets deleted as soon as the consumer disconnects.

As each queue is a separate Erlang process, you may encounter the process limits of Erlang and the maximum amount of file descriptors of your OS.

Broadcasting a message

In many scenarios a broadcast to multiple consumers is required, for example if you need to attach multiple loggers to a system it needs a lower level of implementation. The solution to this requirement is to create a fan-out exchange that will forward the message to all the queues instead of one connected queue, where one queue is consumed exclusively by one consumer, as shown:

using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;

var fanoutExchangeName = string.Concat(QueueNames.Exchange,
                                       ".",
                                       ExchangeType.Fanout);
var rabbitMqServer = new RabbitMqServer();
var messageProducer= (RabbitMqProducer)
rabbitMqServer.CreateMessageProducer();
var channel = messageProducer.Channel;

channel.ExchangeDeclare(exchange: fanoutExchangeName,
                       type: ExchangeType.Fanout,
                       durable: true,
                       autoDelete: false,
                      arguments: null);

With the cast to RabbitMqProducer we have access to lower level actions, we need to declare and exchange this with the name mx.servicestack.fanout, which is durable and does not get deleted.

Now, we need to bind a temporary and an exclusive queue to the exchange:

var messageQueueClient = (RabbitMqQueueClient)
rabbitMqServer.CreateMessageQueueClient();
var queueName = messageQueueClient.GetTempQueueName();
channel.QueueBind(queue: queueName,
                 exchange: fanoutExchangeName,
                 routingKey: QueueNames<Hello>.In);

The call to messageQueueClient.GetTempQueueName() creates a temporary queue, which lives as long as there is just one consumer connected. This queue is bound to the fan-out exchange with the routing key mq:Hello.inq, as shown here:

To publish the messages we need to use the RabbitMqProducer object (messageProducer):

var hello = new Hello
{
Name = "Broadcast"
};
var message = new Message<Hello>(hello);
messageProducer.Publish(queueName: QueueNames<Hello>.In,
message: message,
exchange: fanoutExchangeName);

Even though the first parameter of Publish is named queueName, it is propagated as the routingKey to the underlying PublishMessagemethod call.

This will publish the message on the newly generated exchange with mq:Hello.inq as the routing key:

Now, we need to encapsulate the handling of the message as:

var messageHandler = new MessageHandler<Hello>(rabbitMqServer,
                                               message =>
{
var hello = message.GetBody();
var name = hello.Name;

name.Print();

return null;
});

The MessageHandler<T> class is used internally in all the messaging solutions and looks for retries and replies.

Now, we need to connect the message handler to the queue.

using System;
using System.IO;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;

var consumer = new RabbitMqBasicConsumer(channel);
channel.BasicConsume(queue: queueName,
                     noAck: false,
                     consumer: consumer);

Task.Run(() =>
{
while (true)
{
   BasicGetResult basicGetResult;
   try
   {
     basicGetResult = consumer.Queue.Dequeue();
   }
   catch (EndOfStreamException)
   {
     // this is ok
     return;
   }
   catch (OperationInterruptedException)
   {
     // this is ok
     return;
   }

   var message = basicGetResult.ToMessage<Hello>();
   messageHandler.ProcessMessage(messageQueueClient,
                                message);
}
});

This creates a RabbitMqBasicConsumer object, which is used to consume the temporary queue. To process messages we try to dequeuer from the Queue property in a separate task.

This example does not handle the disconnects and reconnects from the server and does not integrate with the services (however, both can be achieved).

Integrate RabbitMQ in your service

The integration of RabbitMQ in a ServiceStack service does not differ overly from RedisMQ. All you have to do is adapt to the Configure method of your host.

using Funq;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;

public override void Configure(Container container)
{
container.Register<IMessageService>(arg => new
RabbitMqServer());
container.Register<IMessageFactory>(arg => new
RabbitMqMessageFactory());

var messageService = container.Resolve<IMessageService>();
messageService.RegisterHandler<Hello>
(this.ServiceController.ExecuteMessage);
messageService.Start();
}

The registration of an IMessageService is needed for the rerouting of the handlers to your service; and also, the registration of an IMessageFactory is relevant if you want to publish a message in your service with PublishMessage.

Summary

In this article the messaging pattern was introduced along with all the available clients of existing Message Queues.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here