17 min read

In this article by Julien Richard-Foy, author of Play Framework Essentials, we will dive in the framework internals and explain how to leverage its reactive programming model to manipulate data streams.

(For more resources related to this topic, see here.)

Firstly, I would like to mention that the code called by controllers must be thread-safe. We also noticed that the result of calling an action has type Future[Result] rather than just Result. This article explains these subtleties and gives answers to questions such as “How are concurrent requests processed by Play applications?”

More precisely, this article presents the challenges of stream processing and the way the Play framework solves them. You will learn how to consume, produce, and transform data streams in a non-blocking way using the Iteratee library. Then, you will leverage these skills to stream results and push real-time notifications to your clients. By the end of the article, you will be able to do the following:

  • Produce, consume, and transform streams of data
  • Process a large request body chunk by chunk
  • Serve HTTP chunked responses
  • Push real-time notifications using WebSockets or server-sent events
  • Manage the execution context of your code

Play application’s execution model

The streaming programming model provided by Play has been influenced by the execution model of Play applications, which itself has been influenced by the nature of the work a web application performs. So, let’s start from the beginning: what does a web application do?

For now, our example application does the following: the HTTP layer invokes some business logic via the service layer, and the service layer does some computations by itself and also calls the database layer. It is worth noting that in our configuration, the database system runs on the same machine as the web application but this is, however, not a requirement. In fact, there are chances that in real-world projects, your database system is decoupled from your HTTP layer and that both run on different machines. It means that while a query is executed on the database, the web layer does nothing but wait for the response. Actually, the HTTP layer is often waiting for some response coming from another system; it could, for example, retrieve some data from an external web service, or the business layer itself could be located on a remote machine. Decoupling the HTTP layer from the business layer or the persistence layer gives a finer control on how to scale the system (more details about that are given further in this article). Anyway, the point is that the HTTP layer may essentially spend time waiting.

With that in mind, consider the following diagram showing how concurrent requests could be executed by a web application using a threaded execution model. That is, a model where each request is processed in its own thread.

 Play Framework EssentialsThreaded execution model

Several clients (shown on the left-hand side in the preceding diagram) perform queries that are processed by the application’s controller. On the right-hand side of the controller, the figure shows an execution thread corresponding to each action’s execution. The filled rectangles represent the time spent performing computations within a thread (for example, for processing data or computing a result), and the lines represent the time waiting for some remote data. Each action’s execution is distinguished by a particular color. In this fictive example, the action handling the first request may execute a query to a remote database, hence the line (illustrating that the thread waits for the database result) between the two pink rectangles (illustrating that the action performs some computation before querying the database and after getting the database result). The action handling the third request may perform a call to a distant web service and then a second one, after the response of the first one has been received; hence, the two lines between the green rectangles. And the action handling the last request may perform a call to a distant web service that streams a response of an infinite size, hence, the multiple lines between the purple rectangles.

The problem with this execution model is that each request requires the creation of a new thread. Threads have an overhead at creation, because they consume memory (essentially because each thread has its own stack), and during execution, when the scheduler switches contexts.

However, we can see that these threads spend a lot of time just waiting. If we could use the same thread to process another request while the current action is waiting for something, we could avoid the creation of threads, and thus save resources. This is exactly what the execution model used by Play—the evented execution model—does, as depicted in the following diagram:

Play Framework EssentialsEvented execution model

Here, the computation fragments are executed on two threads only. Note that the same action can have its computation fragments run by different threads (for example, the pink action). Also note that several threads are still in use, that’s why the code must be thread-safe. The time spent waiting between computing things is the same as before, and you can see that the time required to completely process a request is about the same as with the threaded model (for instance, the second pink rectangle ends at the same position as in the earlier figure, same for the third green rectangle, and so on).

A comparison between the threaded and evented models can be found in the master’s thesis of Benjamin Erb, Concurrent Programming for Scalable Web Architectures, 2012. An online version is available at http://berb.github.io/diploma-thesis/.

An attentive reader may think that I have cheated; the rectangles in the second figure are often thinner than their equivalent in the first figure. That’s because, in the first model, there is an overhead for scheduling threads and, above all, even if you have a lot of threads, your machine still has a limited number of cores effectively executing the code of your threads. More precisely, if you have more threads than your number of cores, you necessarily have threads in an idle state (that is, waiting). This means, if we suppose that the machine executing the application has only two cores, in the first figure, there is even time spent waiting in the rectangles!

Scaling up your server

The previous section raises the question of how to handle a higher number of concurrent requests, as depicted in the following diagram:

Play Framework EssentialsA server under an increasing load

The previous section explained how to avoid wasting resources to leverage the computing power of your server. But actually, there is no magic; if you want to compute even more things per unit of time, you need more computing power, as depicted in the following diagram:

Play Framework EssentialsScaling using more powerful hardware

One solution could be to have a more powerful server. But you could be smarter than that and avoid buying expensive hardware by studying the shape of the workload and make appropriate decisions at the software-level.

Indeed, there are chances that your workload varies a lot over time, with peaks and holes of activity. This information suggests that if you wanted to buy more powerful hardware, its performance characteristics would be drawn by your highest activity peak, even if it occurs very occasionally. Obviously, this solution is not optimal because you would buy expensive hardware even if you actually needed it only one percent of the time (and more powerful hardware often also means more power-consuming hardware).

A better way to handle the workload elasticity consists of adding or removing server instances according to the activity level, as depicted in the following diagram:

Play Framework EssentialsScaling using several server instances

This architecture design allows you to finely (and dynamically) tune your server capacity according to your workload. That’s actually the cloud computing model. Nevertheless, this architecture has a major implication on your code; you cannot assume that subsequent requests issued by the same client will be handled by the same server instance. In practice, it means that you must treat each request independently of each other; you cannot for instance, store a counter on a server instance to count the number of requests issued by a client (your server would miss some requests if one is routed to another server instance). In a nutshell, your server has to be stateless. Fortunately, Play is stateless, so as long as you don’t explicitly have a mutable state in your code, your application is stateless. Note that the first implementation I gave of the shop was not stateless; indeed the state of the application was stored in the server’s memory.

Embracing non-blocking APIs

In the first section of this article, I claimed the superiority of the evented execution model over the threaded execution model, in the context of web servers. That being said, to be fair, the threaded model has an advantage over the evented model: it is simpler to program with. Indeed, in such a case, the framework is responsible for creating the threads and the JVM is responsible for scheduling the threads, so that you don’t even have to think about this at all, yet your code is concurrently executed.

On the other hand, with the evented model, concurrency control is explicit and you should care about it. Indeed, the fact that the same execution thread is used to run several concurrent actions has an important implication on your code: it should not block the thread. Indeed, while the code of an action is executed, no other action code can be concurrently executed on the same thread.

What does blocking mean? It means holding a thread for too long a duration. It typically happens when you perform a heavy computation or wait for a remote response. However, we saw that these cases, especially waiting for remote responses, are very common in web servers, so how should you handle them? You have to wait in a non-blocking way or implement your heavy computations as incremental computations. In all the cases, you have to break down your code into computation fragments, where the execution is managed by the execution context. In the diagram illustrating the evented execution model, computation fragments are materialized by the rectangles. You can see that rectangles of different colors are interleaved; you can find rectangles of another color between two rectangles of the same color.

However, by default, the code you write forms a single block of execution instead of several computation fragments. It means that, by default, your code is executed sequentially; the rectangles are not interleaved! This is depicted in the following diagram:

Play Framework EssentialsEvented execution model running blocking code

The previous figure still shows both the execution threads. The second one handles the blue action and then the purple infinite action, so that all the other actions can only be handled by the first execution context. This figure illustrates the fact that while the evented model can potentially be more efficient than the threaded model, it can also have negative consequences on the performances of your application: infinite actions block an execution thread forever and the sequential execution of actions can lead to much longer response times.

So, how can you break down your code into blocks that can be managed by an execution context? In Scala, you can do so by wrapping your code in a Future block:

Future {
// This is a computation fragment
}

The Future API comes from the standard Scala library. For Java users, Play provides a convenient wrapper named play.libs.F.Promise:

Promise.promise(() -> {
// This is a computation fragment
});

Such a block is a value of type Future[A] or, in Java, Promise<A> (where A is the type of the value computed by the block). We say that these blocks are asynchronous because they break the execution flow; you have no guarantee that the block will be sequentially executed before the following statement. When the block is effectively evaluated depends on the execution context implementation that manages it. The role of an execution context is to schedule the execution of computation fragments. In the figure showing the evented model, the execution context consists of a thread pool containing two threads (represented by the two lines under the rectangles).

Actually, each time you create an asynchronous value, you have to supply the execution context that will manage its evaluation. In Scala, this is usually achieved using an implicit parameter of type ExecutionContext. You can, for instance, use an execution context provided by Play that consists, by default, of a thread pool with one thread per processor:

import play.api.libs.concurrent.Execution.Implicits.defaultContext

In Java, this execution context is automatically used by default, but you can explicitly supply another one:

Promise.promise(() -> { ... }, myExecutionContext);

Now that you know how to create asynchronous values, you need to know how to manipulate them. For instance, a sequence of several Future blocks is concurrently executed; how do we define an asynchronous computation depending on another one?

You can eventually schedule a computation after an asynchronous value has been resolved using the foreach method:

val futureX = Future { 42 }
futureX.foreach(x => println(x))

In Java, you can perform the same operation using the onRedeem method:

Promise<Integer> futureX = Promise.promise(() -> 42);
futureX.onRedeem((x) -> System.out.println(x));

More interestingly, you can eventually transform an asynchronous value using the map method:

val futureIsEven = futureX.map(x => x % 2 == 0)

The map method exists in Java too:

Promise<Boolean> futureIsEven = futureX.map((x) -> x % 2 == 0);

If the function you use to transform an asynchronous value returned an asynchronous value too, you would end up with an inconvenient Future[Future[A]] value (or a Promise<Promise<A>> value, in Java). So, use the flatMap method in that case:

val futureIsEven = futureX.flatMap(x => Future { x % 2 == 0 })

The flatMap method is also available in Java:

Promise<Boolean> futureIsEven = futureX.flatMap((x) -> {
Promise.promise(() -> x % 2 == 0)
});

The foreach, map, and flatMap functions (or their Java equivalent) all have in common to set a dependency between two asynchronous values; the computation they take as the parameter is always evaluated after the asynchronous computation they are applied to.

Another method that is worth mentioning is zip:

val futureXY: Future[(Int, Int)] = futureX.zip(futureY)

The zip method is also available in Java:

Promise<Tuple<Integer, Integer>> futureXY = futureX.zip(futureY);

The zip method returns an asynchronous value eventually resolved to a tuple containing the two resolved asynchronous values. It can be thought of as a way to join two asynchronous values without specifying any execution order between them.

If you want to join more than two asynchronous values, you can use the zip method several times (for example, futureX.zip(futureY).zip(futureZ).zip(…)), but an alternative is to use the Future.sequence function:

val futureXs: Future[Seq[Int]] =
Future.sequence(Seq(futureX, futureY, futureZ, …))

This function transforms a sequence of future values into a future sequence value. In Java, this function is named Promise.sequence.

In the preceding descriptions, I always used the word eventually, and it has a reason. Indeed, if we use an asynchronous value to manipulate a result sent by a remote machine (such as a database system or a web service), the communication may eventually fail due to some technical issue (for example, if the network is down). For this reason, asynchronous values have error recovery methods; for example, the recover method:

futureX.recover { case NonFatal(e) => y }

The recover method is also available in Java:

futureX.recover((throwable) -> y);

The previous code resolves futureX to the value of y in the case of an error.

Libraries performing remote calls (such as an HTTP client or a database client) return such asynchronous values when they are implemented in a non-blocking way. You should always be careful whether the libraries you use are blocking or not and keep in mind that, by default, Play is tuned to be efficient with non-blocking APIs.

It is worth noting that JDBC is blocking. It means that the majority of Java-based libraries for database communication are blocking.

Obviously, once you get a value of type Future[A] (or Promise<A>, in Java), there is no way to get the A value unless you wait (and block) for the value to be resolved. We saw that the map and flatMap methods make it possible to manipulate the future A value, but you still end up with a Future[SomethingElse] value (or a Promise<SomethingElse>, in Java). It means that if your action’s code calls an asynchronous API, it will end up with a Future[Result] value rather than a Result value. In that case, you have to use Action.async instead of Action, as illustrated in this typical code example:

val asynchronousAction = Action.async { implicit request =>
  service.asynchronousComputation().map(result => Ok(result))
}

In Java, there is nothing special to do; simply make your method return a Promise<Result> object:

public static Promise<Result> asynchronousAction() {
 service.asynchronousComputation().map((result) -> ok(result));
}

Managing execution contexts

Because Play uses explicit concurrency control, controllers are also responsible for using the right execution context to run their action’s code. Generally, as long as your actions do not invoke heavy computations or blocking APIs, the default execution context should work fine. However, if your code is blocking, it is recommended to use a distinct execution context to run it.

Play Framework EssentialsAn application with two execution contexts (represented by the black and grey arrows). You can specify in which execution context each action should be executed, as explained in this section

Unfortunately, there is no non-blocking standard API for relational database communication (JDBC is blocking). It means that all our actions that invoke code executing database queries should be run in a distinct execution context so that the default execution context is not blocked. This distinct execution context has to be configured according to your needs. In the case of JDBC communication, your execution context should be a thread pool with as many threads as your maximum number of connections.

The following diagram illustrates such a configuration:

Play Framework Essentials

This preceding diagram shows two execution contexts, each with two threads. The execution context at the top of the figure runs database code, while the default execution context (on the bottom) handles the remaining (non-blocking) actions.

In practice, it is convenient to use Akka to define your execution contexts as they are easily configurable. Akka is a library used for building concurrent, distributed, and resilient event-driven applications. This article assumes that you have some knowledge of Akka; if that is not the case, do some research on it. Play integrates Akka and manages an actor system that follows your application’s life cycle (that is, it is started and shut down with the application). For more information on Akka, visit http://akka.io.

Here is how you can create an execution context with a thread pool of 10 threads, in your application.conf file:

jdbc-execution-context {
thread-pool-executor {
   core-pool-size-factor = 10.0
   core-pool-size-max = 10
}
}

You can use it as follows in your code:

import play.api.libs.concurrent.Akka
import play.api.Play.current
implicit val jdbc =
  Akka.system.dispatchers.lookup("jdbc-execution-context")

The Akka.system expression retrieves the actor system managed by Play. Then, the execution context is retrieved using Akka’s API.

The equivalent Java code is the following:

import play.libs.Akka;
import akka.dispatch.MessageDispatcher;
import play.core.j.HttpExecutionContext;
MessageDispatcher jdbc =
   Akka.system().dispatchers().lookup("jdbc-execution-context");

Note that controllers retrieve the current request’s information from a thread-local static variable, so you have to attach it to the execution context’s thread before using it from a controller’s action:

play.core.j.HttpExecutionContext.fromThread(jdbc)

Finally, forcing the use of a specific execution context for a given action can be achieved as follows (provided that my.execution.context is an implicit execution context):

import my.execution.context
val myAction = Action.async {
Future { … }
}

The Java equivalent code is as follows:

public static Promise<Result> myAction() {
return Promise.promise(
   () -> { … },
   HttpExecutionContext.fromThread(myExecutionContext)
);
}

Does this feels like clumsy code? Buy the book to learn how to reduce the boilerplate!

Summary

This article detailed a lot of things on the internals of the framework. You now know that Play uses an evented execution model to process requests and serve responses and that it implies that your code should not block the execution thread. You know how to use future blocks and promises to define computation fragments that can be concurrently managed by Play’s execution context and how to define your own execution context with a different threading policy, for example, if you are constrained to use a blocking API.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here