Categories: ProgrammingTutorials

Streaming and the Actor Model – Akka Streams!

9 min read

In this article by Piyush Mishra, author of the Akka Cookbook, we will learn about the streaming and the actor model with Akka streams.

(For more resources related to this topic, see here.)

Akka is a popular toolkit designed to ease the pain of dealing with concurrency and distributed systems. It provides easy APIs to create reactive, fault-tolerant, scalable, and concurrent applications, thanks to the actor model. The actor model was introduced by Carl Hewitt in the 70s, and it has been successfully implemented by different programming languages, frameworks, or toolkits, such as Erlang or Akka.

The concepts around the actor model are simple. All actors are created inside an actor system. Every actor has a unique address within the actor system, a mailbox, a state (in the case of being a stateful actor) and a behavior. The only way of interacting with an actor is by sending messages to it using its address. Messages will be stored in the mailbox until the actor is ready to process them. Once it is ready, the actor will pick one message at a time and will execute its behavior against the message. At this point, the actor might update its state, create new actors, or send messages to other already-created actors.

Akka provides all this and many other features, thanks to the vast ecosystem around the core component, such as Akka Cluster, Akka Cluster Sharding, Akka Persistence, Akka HTTP, or Akka Streams. We will dig a bit more into the later one. Streaming framework and toolkits are gaining momentum lately. This is motivated by the massive number of connected devices that are generating new data constantly that needs to be consumed, processed, analyzed, and stored. This is basically the idea of Internet of Things (IoT) or the newer term Internet of Everything. Some time ago, the Akka team decided that they could build a Streaming library leveraging all the power of Akka and the actor model: Akka Streams.

Akka Streams uses Akka actors as its foundation to provide a set of easy APIs to create back-pressured streams. Each stream consists of one or more sources, zero or more flows, and one or more sinks. All these different modules are also known as stages in the Akka Streams terminology. The best way to understand how a stream works is to think about it as a graph. Each stage (source, flow, or sink) has zero or more input ports and zero or more output ports. For instance, a source has zero input ports and one output port. A flow has one input port and one output port. And finally, a sink has one input port and zero output ports. To have a runnable stream, we need to ensure that all ports of all our stages are connected. Only then, we can run our stream to process some elements:

Akka Streams provides a rich set of predefined stages to cover the most common streaming functions. However, if a use case requires a new custom stage, it is also possible to create it from scratch or extend an existing one. The full list of predefined stages can be found at http://doc.akka.io/docs/akka/current/scala/stream/stages-overview.html.

Now that we know about the different components Akka Streams provides, it is a good moment to introduce the actor materializer. As we mentioned earlier, Akka is the foundation of Akka Streams. This means the code you define in the high-level API is eventually run inside an actor. The actor materializer is the entity responsible to create these low-level actors. By default, all processing stages get created within the same actor. This means only one element at a time can be processed by your stream. It is also possible to indicate that you want to have a different actor per stage, therefore having the possibility to process multiple messages at the same time. You can indicate this to the materializer by calling the async method in the proper stage. There are also asynchronous predefined stages. For performance reasons, Akka Streams batches messages when pushing them to the next stage to reduce overhead.

After this quick introduction, let’s start putting together some code to create and run a stream. We will use the Scala build tool (famously known as sbt) to retrieve the Akka dependencies and run our code. To begin with, we need a build.sbt file with the following content:

name := "akka-async-streams"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "com.typesafe.akka" % "akka-actor_2.11" % "2.4.17"
libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.17"

Once we have the file ready, we need to run sbt update to let sbt fetch the required dependencies. Our first stream will push a list of words, capitalize each of them, and log the resulting values. This can easily be achieved by doing the following:

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

val stream = Source(List("hello","from","akka","streams!"))
 .map(_.capitalize)
 .to(Sink.foreach(actorSystem.log.info))

stream.run()

In this small code snippet, we can see how our stream has one source with a list of strings, one flow that is capitalizing each stream, and finally one sink logging the result. If we run our code, we should see the following in the output:

[INFO] [default-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(default)] Hello
[INFO] [default-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(default)] From
[INFO] [default-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(default)] Akka
[INFO] [default-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(default)] Streams!

The execution of this stream is happening synchronously and ordered. In our next example, we will do the same stream; however, we can see how all stages are modular:

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

val source = Source(List("hello","from","akka","streams!"))
val sink = Sink.foreach(actorSystem.log.info)
val capitalizer = Flow[String].map(_.capitalize)
val stream = source.via(capitalizer).to(sink)
stream.run()

In this code snippet, we can see how stages can be treated as immutable modules. We see that we can use the via helper method to provide a flow stage in a stream. This stream is still running synchronously. To run it asynchronously, we can take advantage of the mapAsync flow. For this, let’s create a small actor that will do the capitalization for us:

class Capitalizer extends Actor with ActorLogging {
  def receive = {
    case str : String =>
      log.info(s"Capitalizing $str")
      sender ! str.capitalize
  }
}

Once we have our actor defined, we can set up our asynchronous stream. For this, we will create a round robin pool of capitalizer actors. Then, we will use the ask pattern to send a message to an actor and wait for a response. This happens using the operator? The stream definition will be something like this:

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val askTimeout = Timeout(5 seconds)

val capitalizer = actorSystem.actorOf(Props[Capitalizer].withRouter(RoundRobinPool(10)))
val source = Source(List("hello","from","akka","streams!"))
val sink = Sink.foreach(actorSystem.log.info)
val flow = Flow[String].mapAsync(parallelism = 5)(elem => (capitalizer ? elem).mapTo[String])
val stream = source.via(flow).to(sink)
stream.run()

If we execute this small piece of code, we can see something similar:

[INFO] [default-akka.actor.default-dispatcher-16] [akka://default/user/$a/$a] Capitalizing hello
[INFO] [default-akka.actor.default-dispatcher-15] [akka://default/user/$a/$b] Capitalizing from
[INFO] [default-akka.actor.default-dispatcher-6] [akka://default/user/$a/$c] Capitalizing akka
[INFO] [default-akka.actor.default-dispatcher-14] [akka://default/user/$a/$d] Capitalizing streams!
[INFO] [default-akka.actor.default-dispatcher-14] [akka.actor.ActorSystemImpl(default)] Hello
[INFO] [default-akka.actor.default-dispatcher-14] [akka.actor.ActorSystemImpl(default)] From
[INFO] [default-akka.actor.default-dispatcher-14] [akka.actor.ActorSystemImpl(default)] Akka
[INFO] [default-akka.actor.default-dispatcher-14] [akka.actor.ActorSystemImpl(default)] Streams!

We can see how each word is being processed by a different capitalizer actor ($a/$b/$c/$d) and by different threads (default-dispatcher 16,15,6 and 14). Even if these executions are happening asynchronously in the pool of actors, the stream is still maintaining the order of the elements. If we do not need to maintain order and we are looking for a faster approach, where an element can be pushed to the next stage in the stream as soon as it is ready, we can use mapAsyncUnordered:

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val askTimeout = Timeout(5 seconds)

val capitalizer = actorSystem.actorOf(Props[Capitalizer].withRouter(RoundRobinPool(10)))
val source = Source(List("hello","from","akka","streams!"))
val sink = Sink.foreach(actorSystem.log.info)
val flow = Flow[String].mapAsyncUnordered(parallelism = 5)(elem => (capitalizer ? elem).mapTo[String])
val stream = source.via(flow).to(sink)
stream.run()

When running this code, we can see that the order is not preserved and the capitalized words arrive to the sink differently every time we execute our code. Consider the following example:

[INFO] [default-akka.actor.default-dispatcher-10] [akka://default/user/$a/$b] Capitalizing from
[INFO] [default-akka.actor.default-dispatcher-4] [akka://default/user/$a/$d] Capitalizing streams!
[INFO] [default-akka.actor.default-dispatcher-13] [akka://default/user/$a/$c] Capitalizing akka
[INFO] [default-akka.actor.default-dispatcher-14] [akka://default/user/$a/$a] Capitalizing hello
[INFO] [default-akka.actor.default-dispatcher-12] [akka.actor.ActorSystemImpl(default)] Akka
[INFO] [default-akka.actor.default-dispatcher-12] [akka.actor.ActorSystemImpl(default)] From
[INFO] [default-akka.actor.default-dispatcher-12] [akka.actor.ActorSystemImpl(default)] Hello
[INFO] [default-akka.actor.default-dispatcher-12] [akka.actor.ActorSystemImpl(default)] Streams!

Akka Streams also provides a graph DSL to define your stream. In this DSL, it is possible to connect stages just using the ~> operator:

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val askTimeout = Timeout(5 seconds)

val capitalizer = actorSystem.actorOf(Props[Capitalizer].withRouter(RoundRobinPool(10)))
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
  val source = Source(List("hello","from","akka","streams!"))
  val sink = Sink.foreach(actorSystem.log.info)
  val flow = Flow[String].mapAsyncUnordered(parallelism = 5)(elem => (capitalizer ? elem).mapTo[String])
  source ~> flow ~> sink

  ClosedShape
})
graph.run()

These code snippets show only a few features of the vast available options inside the Akka Streams framework. Actors can be seamlessly integrated with streams. This brings a whole new set of possibilities to process things in a stream fashion. We have seen how we can preserve or avoid order of elements, either synchronously or asynchronously. In addition, we saw how to use the graph DSL to define our stream.

Summary

In this article, we covered the concept of the actor model and the core components of Akka. We also described the stages in Akka Streams and created an example code for stream.

If you want to learn more about Akka, Akka Streams, and all other modules around them, you can find useful and handy recipes like these ones in the Akka Cookbook at https://www.packtpub.com/application-development/akka-cookbook. 

Resources for Article:


Further resources on this subject:


Packt

Share
Published by
Packt

Recent Posts

Top life hacks for prepping for your IT certification exam

I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…

3 years ago

Learn Transformers for Natural Language Processing with Denis Rothman

Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…

3 years ago

Learning Essential Linux Commands for Navigating the Shell Effectively

Once we learn how to deploy an Ubuntu server, how to manage users, and how…

3 years ago

Clean Coding in Python with Mariano Anaya

Key-takeaways:   Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…

3 years ago

Exploring Forms in Angular – types, benefits and differences   

While developing a web application, or setting dynamic pages and meta tags we need to deal with…

3 years ago

Gain Practical Expertise with the Latest Edition of Software Architecture with C# 9 and .NET 5

Software architecture is one of the most discussed topics in the software industry today, and…

3 years ago