20 min read

In this article by Arindam Mukherjee, the author of Learning Boost C++ Libraries, we learch how to execute a task using Boost Asio (pronounced ay-see-oh), a portable library for performing efficient network I/O using a consistent programming model.

At its core, Boost Asio provides a task execution framework that you can use to perform operations of any kind. You create your tasks as function objects and post them to a task queue maintained by Boost Asio. You enlist one or more threads to pick these tasks (function objects) and invoke them. The threads keep picking up tasks, one after the other till the task queues are empty at which point the threads do not block but exit.

(For more resources related to this topic, see here.)

IO Service, queues, and handlers

At the heart of Asio is the type boost::asio::io_service. A program uses the io_service interface to perform network I/O and manage tasks. Any program that wants to use the Asio library creates at least one instance of io_service and sometimes more than one. In this section, we will explore the task management capabilities of io_service.

Here is the IO Service in action using the obligatory “hello world” example:

Listing 11.1: Asio Hello World
1 #include <boost/asio.hpp>
2 #include <iostream>
3 namespace asio = boost::asio;
4
5 int main() {
6   asio::io_service service;
7
8   service.post(
9     [] {
10       std::cout << "Hello, world!" << 'n';
11     });
12
13   std::cout << "Greetings: n";
14   service.run();
15 }

We include the convenience header boost/asio.hpp, which includes most of the Asio library that we need for the examples in this aritcle (line 1). All parts of the Asio library are under the namespace boost::asio, so we use a shorter alias for this (line 3). The program itself just prints Hello, world! on the console but it does so through a task.

The program first creates an instance of io_service (line 6) and posts a function object to it, using the post member function of io_service. The function object, in this case defined using a lambda expression, is referred to as a handler. The call to post adds the handler to a queue inside io_service; some thread (including that which posted the handler) must dispatch them, that is, remove them off the queue and call them. The call to the run member function of io_service (line 14) does precisely this. It loops through the handlers in the queue inside io_service, removing and calling each handler. In fact, we can post more handlers to the io_service before calling run, and it would call all the posted handlers. If we did not call run, none of the handlers would be dispatched. The run function blocks until all the handlers in the queue have been dispatched and returns only when the queue is empty. By itself, a handler may be thought of as an independent, packaged task, and Boost Asio provides a great mechanism for dispatching arbitrary tasks as handlers. Note that handlers must be nullary function objects, that is, they should take no arguments.

Asio is a header-only library by default, but programs using Asio need to link at least with boost_system. On Linux, we can use the following command line to build this example:

$ g++ -g listing11_1.cpp -o listing11_1 -lboost_system -std=c++11

Running this program prints the following:

Greetings: Hello, World!

Note that Greetings: is printed from the main function (line 13) before the call to run (line 14). The call to run ends up dispatching the sole handler in the queue, which prints Hello, World!. It is also possible for multiple threads to call run on the same I/O object and dispatch handlers concurrently. We will see how this can be useful in the next section.

Handler states – run_one, poll, and poll_one

While the run function blocks until there are no more handlers in the queue, there are other member functions of io_service that let you process handlers with greater flexibility. But before we look at this function, we need to distinguish between pending and ready handlers.

The handlers we posted to the io_service were all ready to run immediately and were invoked as soon as their turn came on the queue. In general, handlers are associated with background tasks that run in the underlying OS, for example, network I/O tasks. Such handlers are meant to be invoked only once the associated task is completed, which is why in such contexts, they are called completion handlers. These handlers are said to be pending until the associated task is awaiting completion, and once the associated task completes, they are said to be ready.

The poll member function, unlike run, dispatches all the ready handlers but does not wait for any pending handler to become ready. Thus, it returns immediately if there are no ready handlers, even if there are pending handlers. The poll_one member function dispatches exactly one ready handler if there be one, but does not block waiting for pending handlers to get ready.

The run_one member function blocks on a nonempty queue waiting for a handler to become ready. It returns when called on an empty queue, and otherwise, as soon as it finds and dispatches a ready handler.

post versus dispatch

A call to the post member function adds a handler to the task queue and returns immediately. A later call to run is responsible for dispatching the handler. There is another member function called dispatch that can be used to request the io_service to invoke a handler immediately if possible. If dispatch is invoked in a thread that has already called one of run, poll, run_one, or poll_one, then the handler will be invoked immediately. If no such thread is available, dispatch adds the handler to the queue and returns just like post would. In the following example, we invoke dispatch from the main function and from within another handler:

Listing 11.2: post versus dispatch
1 #include <boost/asio.hpp>
2 #include <iostream>
3 namespace asio = boost::asio;
4
5 int main() {
6   asio::io_service service;
7   // Hello Handler – dispatch behaves like post
8   service.dispatch([]() { std::cout << "Hellon"; });
9
10   service.post(
11     [&service] { // English Handler
12       std::cout << "Hello, world!n";
13       service.dispatch([] { // Spanish Handler, immediate
14                         std::cout << "Hola, mundo!n";
15                       });
16     });
17   // German Handler
18   service.post([&service] {std::cout << "Hallo, Welt!n"; });
19   service.run();
20 }

Running this code produces the following output:

Hello
Hello, world!
Hola, mundo!
Hallo, Welt!

The first call to dispatch (line 8) adds a handler to the queue without invoking it because run was yet to be called on io_service. We call this the Hello Handler, as it prints Hello. This is followed by the two calls to post (lines 10, 18), which add two more handlers. The first of these two handlers prints Hello, world! (line 12), and in turn, calls dispatch (line 13) to add another handler that prints the Spanish greeting, Hola, mundo! (line 14). The second of these handlers prints the German greeting, Hallo, Welt (line 18). For our convenience, let’s just call them the English, Spanish, and German handlers. This creates the following entries in the queue:

Hello Handler
English Handler
German Handler

Now, when we call run on the io_service (line 19), the Hello Handler is dispatched first and prints Hello. This is followed by the English Handler, which prints Hello, World! and calls dispatch on the io_service, passing the Spanish Handler. Since this executes in the context of a thread that has already called run, the call to dispatch invokes the Spanish Handler, which prints Hola, mundo!. Following this, the German Handler is dispatched printing Hallo, Welt! before run returns.

What if the English Handler called post instead of dispatch (line 13)? In that case, the Spanish Handler would not be invoked immediately but would queue up after the German Handler. The German greeting Hallo, Welt! would precede the Spanish greeting Hola, mundo!. The output would look like this:

Hello
Hello, world!
Hallo, Welt!
Hola, mundo!

Concurrent execution via thread pools

The io_service object is thread-safe and multiple threads can call run on it concurrently. If there are multiple handlers in the queue, they can be processed concurrently by such threads. In effect, the set of threads that call run on a given io_service form a thread pool. Successive handlers can be processed by different threads in the pool. Which thread dispatches a given handler is indeterminate, so the handler code should not make any such assumptions. In the following example, we post a bunch of handlers to the io_service and then start four threads, which all call run on it:

Listing 11.3: Simple thread pools
1 #include <boost/asio.hpp>
2 #include <boost/thread.hpp>
3 #include <boost/date_time.hpp>
4 #include <iostream>
5 namespace asio = boost::asio;
6
7 #define PRINT_ARGS(msg) do {
8   boost::lock_guard<boost::mutex> lg(mtx); 
9   std::cout << '[' << boost::this_thread::get_id() 
10             << "] " << msg << std::endl; 
11 } while (0)
12
13 int main() {
14   asio::io_service service;
15   boost::mutex mtx;
16
17   for (int i = 0; i < 20; ++i) {
18     service.post([i, &mtx]() {
19                         PRINT_ARGS("Handler[" << i << "]");
20                         boost::this_thread::sleep(
21                               boost::posix_time::seconds(1));
22                       });
23   }
24
25   boost::thread_group pool;
26   for (int i = 0; i < 4; ++i) {
27     pool.create_thread([&service]() { service.run(); });
28   }
29
30   pool.join_all();
31 }

We post twenty handlers in a loop (line 18). Each handler prints its identifier (line 19), and then sleeps for a second (lines 19-20). To run the handlers, we create a group of four threads, each of which calls run on the io_service (line 21) and wait for all the threads to finish (line 24). We define the macro PRINT_ARGS which writes output to the console in a thread-safe way, tagged with the current thread ID (line 7-10). We will use this macro in other examples too.

To build this example, you must also link against libboost_thread, libboost_date_time, and in Posix environments, with libpthread too:

$ g++ -g listing9_3.cpp -o listing9_3 -lboost_system -lboost_thread -lboost_date_time -pthread -std=c++11

One particular run of this program on my laptop produced the following output (with some lines snipped):

[b5c15b40] Handler[0]
[b6416b40] Handler[1]
[b6c17b40] Handler[2]
[b7418b40] Handler[3]
[b5c15b40] Handler[4]
[b6416b40] Handler[5]
…
[b6c17b40] Handler[13]
[b7418b40] Handler[14]
[b6416b40] Handler[15]
[b5c15b40] Handler[16]
[b6c17b40] Handler[17]
[b7418b40] Handler[18]
[b6416b40] Handler[19]

You can see that the different handlers are executed by different threads (each thread ID marked differently).

If any of the handlers threw an exception, it would be propagated across the call to the run function on the thread that was executing the handler.

io_service::work

Sometimes, it is useful to keep the thread pool started, even when there are no handlers to dispatch. Neither run nor run_one blocks on an empty queue. So in order for them to block waiting for a task, we have to indicate, in some way, that there is outstanding work to be performed. We do this by creating an instance of io_service::work, as shown in the following example:

Listing 11.4: Using io_service::work to keep threads engaged
1 #include <boost/asio.hpp>
2 #include <memory>
3 #include <boost/thread.hpp>
4 #include <iostream>
5 namespace asio = boost::asio;
6
7 typedef std::unique_ptr<asio::io_service::work> work_ptr;
8
9 #define PRINT_ARGS(msg) do { …
...
14
15 int main() {
16   asio::io_service service;
17   // keep the workers occupied
18   work_ptr work(new asio::io_service::work(service));
19   boost::mutex mtx;
20
21   // set up the worker threads in a thread group
22   boost::thread_group workers;
23   for (int i = 0; i < 3; ++i) {
24     workers.create_thread([&service, &mtx]() {
25                         PRINT_ARGS("Starting worker thread ");
26                         service.run();
27                         PRINT_ARGS("Worker thread done");
28                       });
29   }
30
31   // Post work
32   for (int i = 0; i < 20; ++i) {
33     service.post(
34       [&service, &mtx]() {
35         PRINT_ARGS("Hello, world!");
36         service.post([&mtx]() {
37                           PRINT_ARGS("Hola, mundo!");
38                         });
39       });
40   }
41
42 work.reset(); // destroy work object: signals end of work
43   workers.join_all(); // wait for all worker threads to finish
44 }

In this example, we create an object of io_service::work wrapped in a unique_ptr (line 18). We associate it with an io_service object by passing to the work constructor a reference to the io_service object. Note that, unlike listing 11.3, we create the worker threads first (lines 24-27) and then post the handlers (lines 33-39). However, the worker threads stay put waiting for the handlers because of the calls to run block (line 26). This happens because of the io_service::work object we created, which indicates that there is outstanding work in the io_service queue. As a result, even after all handlers are dispatched, the threads do not exit. By calling reset on the unique_ptr, wrapping the work object, its destructor is called, which notifies the io_service that all outstanding work is complete (line 42). The calls to run in the threads return and the program exits once all the threads are joined (line 43). We wrapped the work object in a unique_ptr to destroy it in an exception-safe way at a suitable point in the program.

We omitted the definition of PRINT_ARGS here, refer to listing 11.3.

Serialized and ordered execution via strands

Thread pools allow handlers to be run concurrently. This means that handlers that access shared resources need to synchronize access to these resources. We already saw examples of this in listings 11.3 and 11.4, when we synchronized access to std::cout, which is a global object. As an alternative to writing synchronization code in handlers, which can make the handler code more complex, we can use strands.

Think of a strand as a subsequence of the task queue with the constraint that no two handlers from the same strand ever run concurrently.

The scheduling of other handlers in the queue, which are not in the strand, is not affected by the strand in any way. Let us look at an example of using strands:

Listing 11.5: Using strands
1 #include <boost/asio.hpp>
2 #include <boost/thread.hpp>
3 #include <boost/date_time.hpp>
4 #include <cstdlib>
5 #include <iostream>
6 #include <ctime>
7 namespace asio = boost::asio;
8 #define PRINT_ARGS(msg) do {
...
13
14 int main() {
15   std::srand(std::time(0));
16 asio::io_service service;
17   asio::io_service::strand strand(service);
18   boost::mutex mtx;
19   size_t regular = 0, on_strand = 0;
20
21 auto workFuncStrand = [&mtx, &on_strand] {
22           ++on_strand;
23           PRINT_ARGS(on_strand << ". Hello, from strand!");
24           boost::this_thread::sleep(
25                       boost::posix_time::seconds(2));
26         };
27
28   auto workFunc = [&mtx, &regular] {
29                   PRINT_ARGS(++regular << ". Hello, world!");
30                  boost::this_thread::sleep(
31                         boost::posix_time::seconds(2));
32                 };
33   // Post work
34   for (int i = 0; i < 15; ++i) {
35     if (rand() % 2 == 0) {
36       service.post(strand.wrap(workFuncStrand));
37     } else {
38       service.post(workFunc);
39     }
40   }
41
42   // set up the worker threads in a thread group
43   boost::thread_group workers;
44   for (int i = 0; i < 3; ++i) {
45     workers.create_thread([&service, &mtx]() {
46                      PRINT_ARGS("Starting worker thread ");
47                       service.run();
48                       PRINT_ARGS("Worker thread done");
49                     });
50   }
51
52   workers.join_all(); // wait for all worker threads to finish
53 }

In this example, we create two handler functions: workFuncStrand (line 21) and workFunc (line 28). The lambda workFuncStrand captures a counter on_strand, increments it, and prints a message Hello, from strand!, prefixed with the value of the counter. The function workFunc captures another counter regular, increments it, and prints Hello, World!, prefixed with the counter. Both pause for 2 seconds before returning.

To define and use a strand, we first create an object of io_service::strand associated with the io_service instance (line 17). Thereafter, we post all handlers that we want to be part of that strand by wrapping them using the wrap member function of the strand (line 36). Alternatively, we can post the handlers to the strand directly by using either the post or the dispatch member function of the strand, as shown in the following snippet:

33   for (int i = 0; i < 15; ++i) {
34     if (rand() % 2 == 0) {
35       strand.post(workFuncStrand);
37     } else {
...

The wrap member function of strand returns a function object, which in turn calls dispatch on the strand to invoke the original handler. Initially, it is this function object rather than our original handler that is added to the queue. When duly dispatched, this invokes the original handler. There are no constraints on the order in which these wrapper handlers are dispatched, and therefore, the actual order in which the original handlers are invoked can be different from the order in which they were wrapped and posted.

On the other hand, calling post or dispatch directly on the strand avoids an intermediate handler. Directly posting to a strand also guarantees that the handlers will be dispatched in the same order that they were posted, achieving a deterministic ordering of the handlers in the strand. The dispatch member of strand blocks until the handler is dispatched. The post member simply adds it to the strand and returns.

Note that workFuncStrand increments on_strand without synchronization (line 22), while workFunc increments the counter regular within the PRINT_ARGS macro (line 29), which ensures that the increment happens in a critical section. The workFuncStrand handlers are posted to a strand and therefore are guaranteed to be serialized; hence no need for explicit synchronization. On the flip side, entire functions are serialized via strands and synchronizing smaller blocks of code is not possible. There is no serialization between the handlers running on the strand and other handlers; therefore, the access to global objects, like std::cout, must still be synchronized.

The following is a sample output of running the preceding code:

[b73b6b40] Starting worker thread
[b73b6b40] 0. Hello, world from strand!
[b6bb5b40] Starting worker thread
[b6bb5b40] 1. Hello, world!
[b63b4b40] Starting worker thread
[b63b4b40] 2. Hello, world!
[b73b6b40] 3. Hello, world from strand!
[b6bb5b40] 5. Hello, world!
[b63b4b40] 6. Hello, world!
…
[b6bb5b40] 14. Hello, world!
[b63b4b40] 4. Hello, world from strand!
[b63b4b40] 8. Hello, world from strand!
[b63b4b40] 10. Hello, world from strand!
[b63b4b40] 13. Hello, world from strand!
[b6bb5b40] Worker thread done
[b73b6b40] Worker thread done
[b63b4b40] Worker thread done

There were three distinct threads in the pool and the handlers from the strand were picked up by two of these three threads: initially, by thread ID b73b6b40, and later on, by thread ID b63b4b40. This also dispels a frequent misunderstanding that all handlers in a strand are dispatched by the same thread, which is clearly not the case.

Different handlers in the same strand may be dispatched by different threads but will never run concurrently.

Summary

Asio is a well-designed library that can be used to write fast, nimble network servers that utilize the most optimal mechanisms for asynchronous I/O available on a system. It is an evolving library and is the basis for a Technical Specification that proposes to add a networking library to a future revision of the C++ Standard.

In this article, we learned how to use the Boost Asio library as a task queue manager and leverage Asio’s TCP and UDP interfaces to write programs that communicate over the network.

Resources for Article:


Further resources on this subject:


1 COMMENT

LEAVE A REPLY

Please enter your comment!
Please enter your name here