24 min read

This article written by Aleksandar Prokopec, the author of Learning Concurrent Programming in Scala, helps you develop skills that are necessary to write correct and efficient concurrent programs. It teaches you about concurrency in Scala through a sequence of programs.

(For more resources related to this topic, see here.)

“The best theory is inspired by practice.”

                                         -Donald Knuth

We have studied a plethora of different concurrency facilities in this article. By now, you will have learned about dozens of different ways of starting concurrent computations and accessing shared data. Knowing how to use different styles of concurrency is useful, but it might not yet be obvious when to use which.

The goal of this article is to introduce the big picture of concurrent programming. We will study the use cases for various concurrency abstractions, see how to debug concurrent programs, and how to integrate different concurrency libraries in larger applications. In this article, we perform the following tasks:

  • Investigate how to deal with various kinds of bugs appearing in concurrent applications
  • Learn how to identify and resolve performance bottlenecks
  • Apply the previous knowledge about concurrency to implement a larger concurrent application, namely, a remote file browser

We start with an overview of the important concurrency frameworks that we learned about in this article, and a summary of when to use each of them.

Choosing the right tools for the job

In this section, we present an overview of the different concurrency libraries that we learned about. We take a step back and look at the differences between these libraries, and what they have in common. This summary will give us an insight into what different concurrency abstractions are useful for.

A concurrency framework usually needs to address several concerns:

  • It must provide a way to declare data that is shared between concurrent executions
  • It must provide constructs for reading and modifying program data
  • It must be able to express conditional execution, triggered when a certain set of conditions are fulfilled
  • It must define a way to start concurrent executions

Some of the frameworks from this article address all of these concerns; others address only a subset, and transfer part of the responsibility to another framework.

Typically, in a concurrent programming model, we express concurrently shared data differently from data intended to be accessed only from a single thread. This allows the JVM runtime to optimize sequential parts of the program more effectively. So far, we’ve seen a lot of different ways to express concurrently shared data, ranging from the low-level facilities to advanced high-level abstractions. We summarize different data abstractions in the following table:

Data abstraction

Datatype or annotation

Description

Volatile variables

(JDK)

@volatile

Ensure visibility and the happens-before relationship on class fields and local variables that are captured in closures.

Atomic variables

(JDK)

AtomicReference[T]

AtomicInteger

AtomicLong

Provide basic composite atomic operations, such as compareAndSet and incrementAndGet.

Futures and promises

(scala.concurrent)

Future[T]

Promise[T]

Sometimes called single-assignment variables, these express values that might not be computed yet, but will eventually become available.

Observables and subjects

(Rx)

Observable[T]

Subject[T]

Also known as first-class event streams, these describe many different values that arrive one after another in time.

Transactional references

(Scala Software Transactional Memory (STM))

Ref[T]

These describe memory locations that can only be accessed from within memory transactions. Their modifications only become visible after the transaction successfully commits.

The next important concern is providing access to shared data, which includes reading and modifying shared memory locations. Usually, a concurrent program uses special constructs to express such accesses. We summarize the different data access constructs in the following table:

Data abstraction

Data access constructs

Description

Arbitrary data

(JDK)

synchronized

 

Uses intrinsic object locks to exclude access to arbitrary shared data.

Atomic variables and classes

(JDK)

compareAndSet

Atomically exchanges the value of a single memory location. It allows implementing lock-free programs.

Futures and promises

(scala.concurrent)

value

tryComplete

Used to assign a value to a promise, or to check the value of the corresponding future. The value method is not a preferred way to interact with a future.

Transactional references (ScalaSTM)

atomic

orAtomic

single

Atomically modify the values of a set of memory locations. Reduces the risk of deadlocks, but disallow side effects inside the transactional block.

Concurrent data access is not the only concern of a concurrency framework. Concurrent computations sometimes need to proceed only after a certain condition is met. In the following table, we summarize different constructs that enable this:

Concurrency framework

Conditional execution constructs

Description

JVM concurrency

wait

notify

notifyAll

Used to suspend the execution of a thread until some other thread notifies that the conditions are met.

Futures and promises

onComplete

Await.ready

Conditionally schedules an asynchronous computation. The Await.ready method suspends the thread until the future completes.

Reactive extensions

subscribe

Asynchronously or synchronously executes a computation when an event arrives.

Software transactional memory

retry

retryFor

withRetryTimeout

Retries the current memory transaction when some of the relevant memory locations change.

Actors

receive

Executes the actor’s receive block when a message arrives.

Finally, a concurrency model must define a way to start a concurrent execution. We summarize different concurrency constructs in the following table:

Concurrency framework

Concurrency constructs

Description

JVM concurrency

Thread.start

Starts a new thread of execution.

Execution contexts

execute

Schedules a block of code for execution on a thread pool.

Futures and promises

Future.apply

Schedules a block of code for execution, and returns the future value with the result of the execution.

Parallel collections

par

Allows invoking data-parallel versions of collection methods.

Reactive extensions

Observable.create

observeOn

The create method defines an event source. The observeOn method schedules the handling of events on different threads.

Actors

actorOf

Schedules a new actor object for execution.

This breakdown shows us that different concurrency libraries focus on different tasks. For example, parallel collections do not have conditional waiting constructs, because a data-parallel operation proceeds on separate elements independently. Similarly, software transactional memory does not come with a construct to express concurrent computations, and focuses only on protecting access to shared data. Actors do not have special constructs for modeling shared data and protecting access to it, because data is encapsulated within separate actors and accessed serially only by the actor that owns it.

Having classified concurrency libraries according to how they model shared data and express concurrency, we present a summary of what different concurrency libraries are good for:

  • The classical JVM concurrency model uses threads, the synchronized statement, volatile variables, and atomic primitives for low-level tasks. Uses include implementing a custom concurrency utility, a concurrent data structure, or a concurrency framework optimized for specific tasks.
  • Futures and promises are best suited for referring to concurrent computations that produce a single result value. Futures model latency in the program, and allow composing values that become available later during the execution of the program. Uses include performing remote network requests and waiting for replies, referring to the result of an asynchronous long-running computation, or reacting to the completion of an I/O operation. Futures are usually the glue of a concurrent application, binding the different parts of a concurrent program together. We often use futures to convert single-event callback APIs into a standardized representation based on the Future type.
  • Parallel collections are best suited for efficiently executing data-parallel operations on large datasets. Usages include file searching, text processing, linear algebra applications, numerical computations, and simulations. Long-running Scala collection operations are usually good candidates for parallelization.
  • Reactive extensions are used to express asynchronous event-based programs. Unlike parallel collections, in reactive extensions, data elements are not available when the operation starts, but arrive while the application is running. Uses include converting callback-based APIs, modeling events in user interfaces, modeling events external to the application, manipulating program events with collection-style combinators, streaming data from input devices or remote locations, or incrementally propagating changes in the data model throughout the program.
  • Use STM to protect program data from getting corrupted by concurrent accesses. An STM allows building complex data models and accessing them with the reduced risk of deadlocks and race conditions. A typical use is to protect concurrently accessible data, while retaining good scalability between threads whose accesses to data do not overlap.
  • Actors are suitable for encapsulating concurrently accessible data, and seamlessly building distributed systems. Actor frameworks provide a natural way to express concurrent tasks that communicate by explicitly sending messages. Uses include serializing concurrent access to data to prevent corruption, expressing stateful concurrency units in the system, and building distributed applications like trading systems, P2P networks, communication hubs, or data mining frameworks.

Advocates of specific programming languages, libraries, or frameworks might try to convince you that their technology is the best for any task and any situation, often with the intent of selling it. Richard Stallman once said how computer science is the only industry more fashion-driven than women’s fashion. As engineers, we need to know better than to succumb to programming fashion and marketing propaganda. Different frameworks are tailored towards specific use cases, and the correct way to choose a technology is to carefully weigh its advantages and disadvantages when applied to a specific situation.

There is no one-size-fits-all technology. Use your own best judgment when deciding which concurrency framework to use for a specific programming task.

Sometimes, choosing the best-suited concurrency utility is easier said than done. It takes a great deal of experience to choose the correct technology. In many cases, we do not even know enough about the requirements of the system to make an informed decision. Regardless, a good rule of thumb is to apply several concurrency frameworks to different parts of the same application, each best suited for a specific task. Often, the real power of different concurrency frameworks becomes apparent when they are used together. This is the topic of the next section.

Putting it all together – a remote file browser

In this section, we use our knowledge about different concurrency frameworks to build a remote file browser. This larger application example illustrates how different concurrency libraries work together, and how to apply them to different situations. We will name our remote file browser ScalaFTP.

The ScalaFTP browser is divided into two main components: the server and the client process. The server process will run on the machine whose filesystem we want to manipulate. The client will run on our own computer, and comprise of a graphical user interface used to navigate the remote filesystem. To keep things simple, the protocol that the client and the server will use to communicate will not really be FTP, but a custom communication protocol. By choosing the correct concurrency libraries to implement different parts of ScalaFTP, we will ensure that the complete ScalaFTP implementation fits inside just 500 lines of code.

Specifically, the ScalaFTP browser will implement the following features:

  • Displaying the names of the files and the directories in a remote filesystem, and allow navigating through the directory structure
  • Copying files between directories in a remote filesystem
  • Deleting files in a remote filesystem

To implement separate pieces of this functionality, we will divide the ScalaFTP server and client programs into layers. The task of the server program is to answer to incoming copy and delete requests, and to answer queries about the contents of specific directories. To make sure that its view of the filesystem is consistent, the server will cache the directory structure of the filesystem. We divide the server program into two layers: the filesystem API and the server interface. The filesystem API will expose the data model of the server program, and define useful utility methods to manipulate the filesystem. The server interface will receive requests and send responses back to the client.

Since the server interface will require communicating with the remote client, we decide to use the Akka actor framework. Akka comes with remote communication facilities. The contents of the filesystem, that is, its state, will change over time. We are therefore interested in choosing proper constructs for data access.

In the filesystem API, we can use object monitors and locking to synchronize access to shared state, but we will avoid these due to the risk of deadlocks. We similarly avoid using atomic variables, because they are prone to race conditions. We could encapsulate the filesystem state within an actor, but note that this can lead to a scalability bottleneck:an actor would serialize all accesses to the filesystem state. Therefore, we decide to use the ScalaSTM framework to model the filesystem contents. An STM avoids the risk of deadlocks and race conditions, and ensures good horizontal scalability.

The task of the client program will be to graphically present the contents of the remote filesystem, and communicate with the server. We divide the client program into three layers of functionality. The GUI layer will render the contents of the remote filesystem and register user requests such as button clicks. The client API will replicate the server interface on the client side and communicate with the server. We will use Akka to communicate with the server, but expose the results of remote operations as futures. Finally, the client logic will be a gluing layer, which binds the GUI and the client API together.

The architecture of the ScalaFTP browser is illustrated in the following diagram, in which we indicate which concurrency libraries will be used by separate layers. The dashed line represents the communication path between the client and the server:

Learning Concurrent Programming in Scala

We now start by implementing the ScalaFTP server, relying on the bottom-up design approach. In the next section, we will describe the internals of the filesystem API.

Modeling the filesystem

We used atomic variables and concurrent collections to implement a non-blocking, thread-safe filesystem API, which allowed copying files and retrieving snapshots of the filesystem. In this section, we repeat this task using STM. We will see that it is much more intuitive and less error-prone to use an STM.

We start by defining the different states that a file can be in. The file can be currently created, in the idle state, being copied, or being deleted. We model this with a sealed State trait, and its four cases:

sealed trait State
case object Created extends State
case object Idle extends State
case class Copying(n: Int) extends State
case object Deleted extends State

A file can only be deleted if it is in the idle state, and it can only be copied if it is in the idle state or in the copied state. Since a file can be copied to multiple destinations at a time, the Copying state encodes how many copies are currently under way. We add the methods inc and dec to the State trait, which return a new state with one more or one fewer copy, respectively. For example, the implementation of inc and dec for the Copying state is as follows:

def inc: State = Copying(n + 1)
def dec: State = if (n > 1) Copying(n - 1) else Idle

Similar to the File class in the java.io package, we represent both the files and directories with the same entity, and refer to them more generally as files. Each file is represented by the FileInfo class that encodes the path, its name, its parent directory, and the date of the last modification to the file; a Boolean value denoting if the file is a directory, the size of the file, and its State object. The FileInfo class is immutable, and updating the state of the file will require creating a fresh FileInfo object:

case class FileInfo(path: String, name: String,
parent: String, modified: String, isDir: Boolean,
size: Long, state: State)

We separately define the factory methods apply and creating that take a File object and return a FileInfo object in the Idle or Created state, respectively.

Depending on where the server is started, the root of the ScalaFTP directory structure is a different subdirectory in the actual filesystem. A FileSystem object tracks the files in the given rootpath directory, using a transactional map called files:

class FileSystem(val rootpath: String) {
val files = TMap[String, FileInfo]()
}

We introduce a separate init method to initialize the FileSystem object. The init method starts a transaction, clears the contents of the files map, and traverses the files and directories under rootpath using the Apache Commons IO library. For each file and directory, the init method creates a FileInfo object and adds it to the files map, using its path as the key:

def init() = atomic { implicit txn =>
files.clear()
val rootDir = new File(rootpath)
val all = TrueFileFilter.INSTANCE
val fileIterator =
FileUtils.iterateFilesAndDirs(rootDir, all, all).asScala
for (file <- fileIterator) {
val info = FileInfo(file)
files(info.path) = info
}

Recall that the ScalaFTP browser must display the contents of the remote filesystem. To enable directory queries, we first add the getFileList method to the FileSystem class, which retrieves the files in the specified dir directory. The getFileList method starts a transaction and filters the files whose direct parent is equal to dir:

def getFileList(dir: String): Map[String, FileInfo] =
atomic { implicit txn =>
files.filter(_._2.parent == dir)
}

We implement the copying logic in the filesystem API with the copyFile method. This method takes a path to the src source file and the dest destination file, and starts a transaction. After checking whether the dest destination file exists or not, the copyFile method inspects the state of the source file entry, and fails unless the state is Idle or Copying. It then calls inc to create a new state with the increased copy count, and updates the source file entry in the files map with the new state. Similarly, the copyFile method creates a new entry for the destination file in the files map. Finally, the copyFile method calls the afterCommit handler to physically copy the file to disk after the transaction completes. Recall that it is not legal to execute side-effecting operations from within the transaction body, so the private copyOnDisk method is called only after the transaction commits:

def copyFile(src: String, dest: String) = atomic { implicit txn =>
val srcfile = new File(src)
val destfile = new File(dest)
val info = files(src)
if (files.contains(dest)) sys.error(s"Destination exists.")
info.state match {
case Idle | Copying(_) =>
files(src) = info.copy(state = info.state.inc)
files(dest) = FileInfo.creating(destfile, info.size)
Txn.afterCommit { _ => copyOnDisk(srcfile, destfile) }
src
}
}

The copyOnDisk method calls the copyFile method on the FileUtils class from the Apache Commons IO library. After the file transfer completes, the copyOnDisk method starts another transaction, in which it decreases the copy count of the source file and sets the state of the destination file to Idle:

private def copyOnDisk(srcfile: File, destfile: File) = {
FileUtils.copyFile(srcfile, destfile)
atomic { implicit txn =>
val ninfo = files(srcfile.getPath)
files(srcfile.getPath) = ninfo.copy(state = ninfo.state.dec)
files(destfile.getPath) = FileInfo(destfile)
}
}

The deleteFile method deletes a file in a similar way. It changes the file state to Deleted, deletes the file, and starts another transaction to remove the file entry:

def deleteFile(srcpath: String): String = atomic { implicit txn =>
val info = files(srcpath)
info.state match {
case Idle =>
files(srcpath) = info.copy(state = Deleted)
Txn.afterCommit { _ =>
FileUtils.forceDelete(info.toFile)
files.single.remove(srcpath)
}
srcpath
}
}

Modeling the server data model with the STM allows seamlessly adding different concurrent computations to the server program. In the next section, we will implement a server actor that uses the server API to execute filesystem operations.

Use STM to model concurrently accessible data, as an STM works transparently with most concurrency frameworks.

Having completed the filesystem API, we now proceed to the server interface layer of the ScalaFTP browser.

The Server interface

The server interface comprises of a single actor called FTPServerActor. This actor will receive client requests and respond to them serially. If it turns out that the server actor is the sequential bottleneck of the system, we can simply add additional server interface actors to improve horizontal scalability.

We start by defining the different types of messages that the server actor can receive. We follow the convention of defining them inside the companion object of the FTPServerActor class:

object FTPServerActor {
sealed trait Command
case class GetFileList(dir: String) extends Command
case class CopyFile(src: String, dest: String) extends Command
case class DeleteFile(path: String) extends Command
def apply(fs: FileSystem) = Props(classOf[FTPServerActor], fs)
}

The actor template of the server actor takes a FileSystem object as a parameter. It reacts to the GetFileList, CopyFile, and DeleteFile messages by calling the appropriate methods from the filesystem API:

class FTPServerActor(fileSystem: FileSystem) extends Actor {
val log = Logging(context.system, this)
def receive = {
case GetFileList(dir) =>
val filesMap = fileSystem.getFileList(dir)
val files = filesMap.map(_._2).to[Seq]
sender ! files
case CopyFile(srcpath, destpath) =>
Future {
Try(fileSystem.copyFile(srcpath, destpath))
} pipeTo sender
case DeleteFile(path) =>
Future {
Try(fileSystem.deleteFile(path))
} pipeTo sender
}
}

When the server receives a GetFileList message, it calls the getFileList method with the specified dir directory, and sends a sequence collection with the FileInfo objects back to the client. Since FileInfo is a case class, it extends the Serializable interface, and its instances can be sent over the network.

When the server receives a CopyFile or DeleteFile message, it calls the appropriate filesystem method asynchronously. The methods in the filesystem API throw exceptions when something goes wrong, so we need to wrap calls to them in Try objects. After the asynchronous file operations complete, the resulting Try objects are piped back as messages to the sender actor, using the Akka pipeTo method.

To start the ScalaFTP server, we need to instantiate and initialize a FileSystem object, and start the server actor. We parse the network port command-line argument, and use it to create an actor system that is capable of remote communication. For this, we use the remotingSystem factory method that we introduced. The remoting actor system then creates an instance of the FTPServerActor. This is shown in the following program:

object FTPServer extends App {
val fileSystem = new FileSystem(".")
fileSystem.init()
val port = args(0).toInt
val actorSystem = ch8.remotingSystem("FTPServerSystem", port)
actorSystem.actorOf(FTPServerActor(fileSystem), "server")
}

The ScalaFTP server actor can run inside the same process as the client application, in another process in the same machine, or on a different machine connected with a network. The advantage of the actor model is that we usually need not worry about where the actor runs until we integrate it into the entire application.

When you need to implement a distributed application that runs on different machines, use an actor framework.

Our server program is now complete, and we can run it with the run command from SBT. We set the actor system to use the port 12345:

run 12345

In the next section, we will implement the file navigation API for the ScalaFTP client, which will communicate with the server interface over the network.

Client navigation API

The client API exposes the server interfaces to the client program through asynchronous methods that return future objects. Unlike the server’s filesystem API, which runs locally, the client API methods execute remote network requests. Futures are a natural way to model latency in the client API methods, and to avoid blocking during the network requests.

Internally, the client API maintains an actor instance that communicates with the server actor. The client actor does not know the actor reference of the server actor when it is created. For this reason, the client actor starts in an unconnected state. When it receives the Start message with the URL of the server actor system, the client constructs an actor path to the server actor, sends out an Identify message, and switches to the connecting state. If the actor system is able to find the server actor, the client actor eventually receives the ActorIdentity message with the server actor reference. In this case, the client actor switches to the connected state, and is able to forward commands to the server. Otherwise, the connection fails and the client actor reverts to the unconnected state. The state diagram of the client actor is shown in the following figure:

Learning Concurrent Programming in Scala

We define the Start message in the client actor’s companion object:

object FTPClientActor {
case class Start(host: String)
}

We then define the FTPClientActor class and give it an implicit Timeout parameter. The Timeout parameter will be used later in the Akka ask pattern, when forwarding client requests to the server actor. The stub of the FTPClientActor class is as follows:

class FTPClientActor(implicit val timeout: Timeout)
extends Actor

Before defining the receive method, we define behaviors corresponding to different actor states. Once the client actor in the unconnected state receives the Start message with the host string, it constructs an actor path to the server, and creates an actor selection object. The client actor then sends the Identify message to the actor selection, and switches its behavior to connecting. This is shown in the following behavior method, named unconnected:

def unconnected: Actor.Receive = {
case Start(host) =>
val serverActorPath =
s"akka.tcp://FTPServerSystem@$host/user/server"
val serverActorSel = context.actorSelection(serverActorPath)
serverActorSel ! Identify(())
context.become(connecting(sender))
}

The connecting method creates a behavior given an actor reference to the sender of the Start message. We call this actor reference clientApp, because the ScalaFTP client application will send the Start message to the client actor. Once the client actor receives an ActorIdentity message with the ref reference to the server actor, it can send true back to the clientApp reference, indicating that the connection was successful. In this case, the client actor switches to the connected behavior. Otherwise, if the client actor receives an ActorIdentity message without the server reference, the client actor sends false back to the application, and reverts to the unconnected state:

def connecting(clientApp: ActorRef): Actor.Receive = {
case ActorIdentity(_, Some(ref)) =>
clientApp ! true
context.become(connected(ref))
case ActorIdentity(_, None) =>
clientApp ! false
context.become(unconnected)
}

The connected state uses the serverActor server actor reference to forward the Command messages. To do so, the client actor uses the Akka ask pattern, which returns a future object with the server’s response. The contents of the future are piped back to the original sender of the Command message. In this way, the client actor serves as an intermediary between the application, which is the sender, and the server actor. The connected method is shown in the following code snippet:

def connected(serverActor: ActorRef): Actor.Receive = {
case command: Command =>
(serverActor ? command).pipeTo(sender)
}

Finally, the receive method returns the unconnected behavior, in which the client actor is created:

def receive = unconnected

Having implemented the client actor, we can proceed to the client API layer. We model it as a trait with a connected value, the concrete methods getFileList, copyFile, and deleteFile, and an abstract host method. The client API creates a private remoting actor system and a client actor. It then instantiates the connected future that computes the connection status by sending a Start message to the client actor. The methods getFileList, copyFile, and deleteFile are similar. They use the ask pattern on the client actor to obtain a future with the response. Recall that the actor messages are not typed, and the ask pattern returns a Future[Any] object. For this reason, each method in the client API uses the mapTo future combinator to restore the type of the message:

trait FTPClientApi {
implicit val timeout: Timeout = Timeout(4 seconds)
private val props = Props(classOf[FTPClientActor], timeout)
private val system = ch8.remotingSystem("FTPClientSystem", 0)
private val clientActor = system.actorOf(props)
def host: String
val connected: Future[Boolean] = {
val f = clientActor ? FTPClientActor.Start
f.mapTo[Boolean]
}
def getFileList(d: String): Future[(String, Seq[FileInfo])] = {
val f = clientActor ? FTPServerActor.GetFileList(d)
f.mapTo[Seq[FileInfo]].map(fs => (d, fs))
}
def copyFile(src: String, dest: String): Future[String] = {
val f = clientActor ? FTPServerActor.CopyFile(src, dest)
f.mapTo[Try[String]].map(_.get)
}
def deleteFile(srcpath: String): Future[String] = {
val f = clientActor ? FTPServerActor.DeleteFile(srcpath)
f.mapTo[Try[String]].map(_.get)
}
}

Note that the client API does not expose the fact that it uses actors for remote communication. Moreover, the client API is similar to the server API, but the return types of the methods are futures instead of normal values. Futures encode the latency of a method without exposing the cause for the latency, so we often find them at the boundaries between different APIs. We can internally replace the actor communication between the client and the server with the remote Observable objects, but that would not change the client API.

In a concurrent application, use futures at the boundaries of the layers to express latency.

Now that we can programmatically communicate with the remote ScalaFTP server, we turn our attention to the user interface of the client program.

Summary

This article summarized the different concurrency libraries introduced to us. In this article, you learned how to choose the correct concurrent abstraction to solve a given problem. We learned to combine different concurrency abstractions together when designing larger concurrent applications.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here