17 min read

This article by Aleksandar Prokopec, author of the book Learning Concurrent Programming in Scala – Second Edition, explains the concepts of asynchronous programming in Scala. Asynchronous programming helps you eliminate blocking; instead of suspending the thread whenever a resource is not available, a separate computation is scheduled to proceed once the resource becomes available.

In a way, many of the concurrency patterns seen so far support asynchronous programming; thread creation and scheduling execution context tasks can be used to start executing a computation concurrent to the main program flow. Still, it is not straightforward to use these facilities directly when avoiding blocking or composing asynchronous computations. In this article, we will focus on two abstractions in Scala that are specifically tailored for this task—futures and promises. More specifically, we will study the following topics:

  • Starting asynchronous computations and using Future objects
  • Using Promise objects to interface
  • Blocking threads inside asynchronous computations
  • Alternative future frameworks

(For more resources related to this topic, see here.)

Futures

The parallel executions in a concurrent program proceed on entities called threads. At any point, the execution of a thread can be temporarily suspended until a specific condition is fulfilled. When this happens, we say that the thread is blocked. Why do we block threads in the first place in concurrent programming? One of the reasons is that we have a finite amount of resources; so, multiple computations that share these resources sometimes need to wait. In other situations, a computation needs specific data to proceed, and if that data is not yet available, threads responsible for producing the data could be slow or the source of the data could be external to the program. A classic example is waiting for the data to arrive over the network. Let’s assume that we have a getWebpage method that returns that webpage’s contents when given a url string with the location of the webpage:

def getWebpage(url: String): String

The return type of the getWebpage method is String; the method must return a string with the webpage’s contents. Upon sending an HTTP request, though, the webpage’s contents are not available immediately. It takes some time for the request to travel over the network to the server and back before the program can access the document. The only way for the method to return the contents of the webpage as a string value is to wait for the HTTP response to arrive. However, this can take a relatively long period of time from the program’s point of view even with a high-speed Internet connection, the getWebpage method needs to wait. Since the thread that called the getWebpage method cannot proceed without the contents of the webpage, it needs to pause its execution; therefore, the only way to correctly implement the getWebpage method is for blocking.

We already know that blocking can have negative side effects, so can we change the return value of the getWebpage method to some special value that can be returned immediately? The answer is yes. In Scala, this special value is called a future. The future is a placeholder, that is, a memory location for the value. This placeholder does not need to contain a value when the future is created; the value can be placed into the future eventually by getWebpage. We can change the signature of the getWebpage method to return a future as follows:

def getWebpage(url: String): Future[String]

Here, the Future[String] type means that the future object can eventually contain a String value. We can now implement getWebpage without blocking—we can start the HTTP request asynchronously and place the webpage’s contents into the future when they become available. When this happens, we can say that the getWebpage method completes the future. Importantly, after the future is completed with some value, that value can no longer change.

The Future[T] type encodes latency in the program—use it to encode values that will become available later during execution.

This removes blocking from the getWebpage method, but it is not clear how the calling thread can extract the content of the future. Polling is one non-blocking way of extracting the content. In the polling approach, the calling thread calls a special method for blocking until the value becomes available. While this approach does not eliminate blocking, it transfers the responsibility of blocking from the getWebpage method to the caller thread. Java defines its own Future type to encode values that will become available later. However, as a Scala developer, you should use Scala’s futures instead; they allow additional ways of handling future values and avoid blocking, as we will soon see.

When programming with futures in Scala, we need to distinguish between future values and future computations. A future value of the Future[T] type denotes some value of the T type in the program that might not be currently available, but could become available later. Usually, when we say a future, we really mean a future value. In the scala.concurrent package, futures are represented with the Future[T] trait:

trait Future[T]

By contrast, a future computation is an asynchronous computation that produces a future value. A future computation can be started by calling the apply method on the Future companion object. This method has the following signature in the scala.concurrent package:

def apply[T](b: =>T)(implicit e: ExecutionContext): Future[T]

This method takes a by-name parameter of the T type. This is the body of the asynchronous computation that results in some value of type T. It also takes an implicit ExecutionContext parameter, which abstracts over where and when the thread gets executed. Recall that Scala’s implicit parameters can either be specified when calling a method, in the same way as normal parameters, or they can be left out; in this case, the Scala compiler searches for a value of the ExecutionContext type in the surrounding scope. Most Future methods take an implicit execution context. Finally, the Future.apply method returns a future of the T type. This future is completed with the value resulting from the asynchronous computation, b.

Starting future computations

Let’s see how to start a future computation in an example. We first import the contents of the scala.concurrent package. We then import the global execution context from the Implicits object. This makes sure that the future computations execute on the global context—the default execution context you can use in most cases:

import scala.concurrent._
import ExecutionContext.Implicits.global
object FuturesCreate extends App {
  Future { log("the future is here") }
  log("the future is coming")
  Thread.sleep(1000)
}

The order in which the log method calls (in the future computation and the main thread) execute is nondeterministic. The Future singleton object followed by a block is syntactic sugar for calling the Future.apply method. In the following example, we can use the scala.io.Source object to read the contents of our build.sbt file in a future computation. The main thread calls the isCompleted method on the future value, buildFile, returned from the future computation. Chances are that the build file was not read so fast, so isCompleted returns false. After 250 milliseconds, the main thread calls isCompleted again, and this time, isCompleted returns true. Finally, the main thread calls the value method, which returns the contents of the build file:

import scala.io.Source
object FuturesDataType extends App {
  val buildFile: Future[String] = Future {
    val f = Source.fromFile("build.sbt")
    try f.getLines.mkString("n") finally f.close()
  }
  log(s"started reading the build file asynchronously")
  log(s"status: ${buildFile.isCompleted}")
  Thread.sleep(250)
  log(s"status: ${buildFile.isCompleted}")
  log(s"value: ${buildFile.value}")
}

In this example, we used polling to obtain the value of the future. The Future singleton object’s polling methods are non-blocking, but they are also nondeterministic; the isCompleted method will repeatedly return false until the future is completed. Importantly, completion of the future is in a happens-before relationship with the polling calls. If the future completes before the invocation of the polling method, then its effects are visible to the thread after the polling completes.

Shown graphically, polling looks as shown in the following figure:

Learning Concurrent Programming in Scala - Second Edition

Polling diagram

Polling is like calling your potential employer every five minutes to ask if you’re hired. What you really want to do is hand in a job application and then apply for other jobs instead of busy-waiting for the employer’s response. Once your employer decides to hire you, he will give you a call on the phone number you left him. We want futures to do the same; when they are completed, they should call a specific function that we left for them.

Promises

Promises are objects that can be assigned a value or an exception only once. This is why promises are sometimes also called single-assignment variables. A promise is represented with the Promise[T] type in Scala. To create a promise instance, we use the Promise.apply method on the Promise companion object:

def apply[T](): Promise[T]

This method returns a new promise instance. Like the Future.apply method, the Promise.apply method returns immediately; it is non-blocking. However, the Promise.apply method does not start an asynchronous computation, it just creates a fresh Promise object. When the Promise object is created, it does not contain a value or an exception. To assign a value or an exception to a promise, we use the success or failure method, respectively.

Perhaps you have noticed that promises are very similar to futures. Both futures and promises are initially empty and can be completed with either a value or an exception. This is intentional—every promise object corresponds to exactly one future object. To obtain the future associated with a promise, we can call the future method on the promise. Calling this method multiple times always returns the same future object.

A promise and a future represent two aspects of a single-assignment variable. The promise allows you to assign a value to the future object, whereas the future allows you to read that value.

In the following code snippet, we create two promises, p and q, that can hold string values. We then install a foreach callback on the future associated with the p promise and wait for 1 second. The callback is not invoked until the p promise is completed by calling the success method. We then fail the q promise in the same way and install a failed.foreach callback:

object PromisesCreate extends App {
  val p = Promise[String]
  val q = Promise[String]
  p.future foreach { case x => log(s"p succeeded with '$x'") }
  Thread.sleep(1000)
  p success "assigned"
  q failure new Exception("not kept")
  q.future.failed foreach { case t => log(s"q failed with $t") }
  Thread.sleep(1000)
}

Alternatively, we can use the complete method and specify a Try[T] object to complete the promise. Depending on whether the Try[T] object is a success or a failure, the promise is successfully completed or failed. Importantly, after a promise is either successfully completed or failed, it cannot be assigned an exception or a value again in any way. Trying to do so results in an exception. Note that this is true even when there are multiple threads simultaneously calling success or complete. Only one thread completes the promise, and the rest throw an exception.

Assigning a value or an exception to an already completed promise is not allowed and throws an exception.

We can also use the trySuccess, tryFailure, and tryComplete methods that correspond to success, failure, and complete states, respectively, but return a Boolean value to indicate whether the assignment was successful. Recall that using the Future.apply method and callback methods with referentially transparent functions results in deterministic concurrent programs. As long as we do not use the trySuccess, tryFailure, and tryComplete methods, and none of the success, failure, and complete methods ever throws an exception, we can use promises and retain determinism in our programs.

We now have everything we need to implement our custom Future.apply method. We call it the myFuture method in the following example. The myFuture method takes a b by-name parameter that is the asynchronous computation. First, it creates a p promise. Then, it starts an asynchronous computation on the global execution context. This computation tries to evaluate b and complete the promise. However, if the b body throws a nonfatal exception, the asynchronous computation fails the promise with that exception. In the meanwhile, the myFuture method returns the future immediately after starting the asynchronous computation:

import scala.util.control.NonFatal
object PromisesCustomAsync extends App {
  def myFuture[T](b: =>T): Future[T] = {
    val p = Promise[T]
    global.execute(new Runnable {
      def run() = try {
        p.success(b)
      } catch {
        case NonFatal(e) => p.failure(e)
      }
    })
    p.future
  }
  val f = myFuture { "naa" + "na" * 8 + " Katamari Damacy!" }
  f foreach { case text => log(text) }
}

This is a common pattern when producing futures. We create a promise, let some other computation complete that promise, and return the corresponding future. However, promises were not invented just for our custom future’s myFuture computation method. In the following sections, we will study use cases in which promises are useful.

Futures and blocking

Futures and asynchronous computations mainly exist to avoid blocking, but in some cases, we cannot live without it. It is, therefore, valid to ask how blocking interacts with futures.

There are two ways to block with futures. The first way is to wait until a future is completed. The second way is by blocking from within an asynchronous computation. We will study both the topics in this section.

Awaiting futures

In rare situations, we cannot use callbacks or future combinators to avoid blocking. For example, the main thread that starts multiple asynchronous computations has to wait for these computations to finish. If an execution context uses daemon threads, as is the case with the global execution context, the main thread needs to block to prevent the JVM process from terminating.

In these exceptional circumstances, we use the ready and result methods on the Await object from the scala.concurrent package. The ready method blocks the caller thread until the specified future is completed. The result method also blocks the caller thread, but returns the value of the future if it was completed successfully or throws the exception in the future if the future is failed.

Both the methods require specifying a timeout parameter; the longest duration that the caller should wait for the completion of the future before a TimeoutException method is thrown. To specify a timeout, we import the scala.concurrent.duration package. This allows us to write expressions such as 10.seconds:

import scala.concurrent.duration._
object BlockingAwait extends App {
  val urlSpecSizeFuture = Future {
    val specUrl = "http://www.w3.org/Addressing/URL/url-spec.txt"
    Source.fromURL(specUrl).size
  }
  val urlSpecSize = Await.result(urlSpecSizeFuture, 10.seconds)
  log(s"url spec contains $urlSpecSize characters")
}

In this example, the main thread starts a computation that retrieves the URL specification and then awaits. By this time, the World Wide Web Consortium (W3C) is worried that a DOS attack is under way, so this is the last time we download the URL specification.

Blocking in asynchronous computations

Waiting for the completion of a future is not the only way to block. Some legacy APIs do not use callbacks to asynchronously return results. Instead, such APIs expose the blocking methods. After we call a blocking method, we lose control over the thread; it is up to the blocking method to unblock the thread and return the control back.

Execution contexts are often implemented using thread pools. By starting future computations that block, it is possible to reduce parallelism and even cause deadlocks. This is illustrated in the following example, in which 16 separate future computations call the sleep method, and the main thread waits until they complete for an unbounded amount of time:

val startTime = System.nanoTime
val futures = for (_ <- 0 until 16) yield Future {
  Thread.sleep(1000)
}
for (f <- futures) Await.ready(f, Duration.Inf)
val endTime = System.nanoTime
log(s"Total time = ${(endTime - startTime) / 1000000} ms")
log(s"Total CPUs = ${Runtime.getRuntime.availableProcessors}")

Assume that you have eight cores in your processor. This program does not end in one second. Instead, a first batch of eight futures started by Future.apply will block all the worker threads for one second, and then another batch of eight futures will block for another second. As a result, none of our eight processor cores can do any useful work for one second.

Avoid blocking in asynchronous computations, as it can cause thread starvation.

If you absolutely must block, then the part of the code that blocks should be enclosed within the blocking call. This signals to the execution context that the worker thread is blocked and allows it to temporarily spawn additional worker threads if necessary:  

val futures = for (_ <- 0 until 16) yield Future {
    blocking {
      Thread.sleep(1000)
    }
  }

With the blocking call around the sleep call, the global execution context spawns additional threads when it detects that there is more work than the worker threads. All 16 future computations can execute concurrently and the program terminates after one second.

The Await.ready and Await.result statements block the caller thread until the future is completed and are in most cases used outside asynchronous computations. They are blocking operations. The blocking statement is used inside asynchronous code to designate that the enclosed block of code contains a blocking call. It is not a blocking operation by itself.

Alternative future frameworks

Scala futures and promises API resulted from an attempt to consolidate several different APIs for asynchronous programming, among them, legacy Scala futures, Akka futures, Scalaz futures, and Twitter’s Finagle futures. Legacy Scala futures and Akka futures have already converged to the futures and promises API that you’ve learned about so far in this article. Finagle’s com.twitter.util.Future type is planned to eventually implement the same interface as scala.concurrent.Future package, while the Scalaz scalaz.concurrent.Future type implements a slightly different interface. In this section, we give a brief of Scalaz futures.

To use Scalaz, we add the following dependency to the build.sbt file:

libraryDependencies +=
  "org.scalaz" %% "scalaz-concurrent" % "7.0.6"

We now encode an asynchronous tombola program using Scalaz. The Future type in Scalaz does not have the foreach method. Instead, we use its runAsync method, which asynchronously runs the future computation to obtain its value and then calls the specified callback:

import scalaz.concurrent._
object Scalaz extends App {
  val tombola = Future {
    scala.util.Random.shuffle((0 until 10000).toVector)
  }
  tombola.runAsync { numbers =>
    log(s"And the winner is: ${numbers.head}")
  }
  tombola.runAsync { numbers =>
    log(s"... ahem, winner is: ${numbers.head}")
  }
}

Unless you are terribly lucky and draw the same permutation twice, running this program reveals that the two runAsync calls print different numbers. Each runAsync call separately computes the permutation of the random numbers. This is not surprising, as Scalaz futures have the pull semantics, in which the value is computed each time some callback requests it, in contrast to Finagle and Scala futures’ push semantics, in which the callback is stored, and applied if and when the asynchronously computed value becomes available.

To achieve the same semantics, as we would have with Scala futures, we need to use the start combinator that runs the asynchronous computation once, and caches its result:

val tombola = Future {
  scala.util.Random.shuffle((0 until 10000).toVector)
} start

With this change, the two runAsync calls use the same permutation of random numbers in the tombola variable and print the same values.

We will not dive further into the internals of alternate frameworks. The fundamentals about futures and promises that you learned about in this article should be sufficient to easily familiarize yourself with other asynchronous programming libraries, should the need arise.

Summary

This article presented some powerful abstractions for asynchronous programming. We saw how to encode latency with the Future type. You learned that futures and promises are closely tied together and that promises allow interfacing with legacy callback-based systems. In cases, where blocking was unavoidable, you learned how to use the Await object and the blocking statement. Finally, you learned that the Scala Async library is a powerful alternative for expressing future computations more concisely.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here