In this article by Piyush Mishra, author of the book Akka Cookbook, we will learn about supervision and monitoring of Akka actors.
(For more resources related to this topic, see here.)
Using supervision and monitoring, we can write fault-tolerant systems, which can run continuously for days, months, and years without stopping. Fault tolerance is a property of the systems which are intended to be always responsive rather than failing completely in case of a failure. Such systems are known as fault tolerance systems or resilient systems.
In simple words, a fault-tolerant system is one which is destined to continue as more or less fully operational, with perhaps a reduction in throughput or an increase in response time because of partial failure of its components.
Even if a components fails, the whole system never gets shut down, instead, it remains operational and responsive with just a decreased throughput.
Similarly, while designing a distributed system, we need to care about what would happen if one or more it’s components go down. So, the system design should itself be such that the system is able to take appropriate action to resolve the issue.
In this article, we will cover the following recipe:
- Creating child actors of a parent actor
- Overriding the life cycle hooks of an actor
- Sending messages to actors and collecting responses
Creating child actors of a parent actor
In this recipe, we will learn how to create child actors of an actor. Akka follows a tree-like structure to create actors, and it is also the recommended practice.
By following such practices, we can handle failures in actors as the parent can take care of it. Lets see how to do it.
Getting ready
We need to import the Hello-Akka project in the IDE of our choice. The Akka actor dependency that we added in build.sbt is sufficient for most of the recipes in this article, so we will skip the Getting ready section in our further recipes.
How to do it…
- Create a file named ParentChild.scala in package com.packt.chapter2.
- Add the following imports to the top of the file:
import akka.actor.{ActorSystem, Props, Actor}
- Create messages for sending to actors.
case object CreateChild case class Greet(msg: String)
- Define a child actor as follows:
class ChildActor extends Actor { def receive = { case Greet(msg) => println(s"My parent[${self.path.parent}] greeted to me [${self.path}] $msg") } }
- Define a parent actor as follows, and create a child actor in its context:
class ParentActor extends Actor { def receive = { case CreateChild => val child = context.actorOf(Props[ChildActor], "child") child ! Greet("Hello Child") } }
- Create an application object as shown next:
object ParentChild extends App { val actorSystem = ActorSystem("Supervision") val parent = actorSystem.actorOf(Props[ParentActor], "parent") parent ! CreateChild } Run the preceding application, and you will get the following output: My parent[akka://Supervision/user/parent] greeted to me [akka://Supervision/user/parent/child] Hello Child
How it works…
In this recipe, we created a child actor, which receives a message, Greet, from the parent actor. We see the parent actor create a child actor using context.actorOf. This method creates a child actor under the parent actor. We can see the path of the actor in the output clearly.
Overriding life cycle hooks of an actor
Since we are talking about supervision and monitoring of actors, you should understand the life cycle hooks of an actor. In this recipe, you will learn how to override the life cycle hooks of an actor when it starts, stops, prestarts, and postrestarts.
How to do it…
- Create a file called ActorLifeCycle.scala in package com.packt.chapter2.
- Add the following imports to the top of the file:
import akka.actor import akka.actor.SupervisorStrategy import akka.util.Timeout. import scala.concurrent.Await import scala.concurrent.duration import akka.pattern.ask
- Create the following messages to be sent to the actors:
case object Error case class StopActor(actorRef: ActorRef)
- Create an actor as follows, and override the life cycle methods:
class LifeCycleActor extends Actor { var sum = 1 override def preRestart(reason: Throwable, message: Option[Any]): Unit = { println(s"sum in preRestart is $sum") } override def preStart(): Unit = println(s"sum in preStart is $sum") def receive = { case Error => throw new ArithmeticException() case _ => println("default msg") } override def postStop(): Unit = { println(s"sum in postStop is ${sum * 3}") } override def postRestart(reason: Throwable): Unit = { sum = sum * 2 println(s"sum in postRestart is $sum") } }
- Create a supervisor actor as follows:
class Supervisor extends Actor { override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Restart case t => super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate) } def receive = { case (props: Props, name: String) => sender ! context.actorOf(props, name) case StopActor(actorRef) => context.stop(actorRef) } }
- Create a test application as shown next, and run the application.
object ActorLifeCycle extends App { implicit val timeout = Timeout(2 seconds) val actorSystem = ActorSystem("Supervision") val supervisor = actorSystem.actorOf(Props[Supervisor], "supervisor") val childFuture = supervisor ? (Props(new LifeCycleActor), "LifeCycleActor") val child = Await.result(childFuture.mapTo[ActorRef], 2 seconds) child ! Error Thread.sleep(1000) supervisor ! StopActor(child) }
- Create another test application as follows, and run it.
object ActorLifeCycle extends App { implicit val timeout = Timeout(2 seconds) val actorSystem = ActorSystem("Supervision") val supervisor = actorSystem.actorOf(Props[Supervisor], "supervisor") val childFuture = supervisor ? (Props(new LifeCycleActor), "LifeCycleActor") val child = Await.result(childFuture.mapTo[ActorRef], 2 seconds) child ! Error Thread.sleep(1000) supervisor ! StopActor(child) } On running the preceding test application, you will get the following output: sum in preStart is 1 sum in preRestart is 1 sum in postRestart is 2 [ERROR] [07/01/2016 00:49:57.568] [Supervision-akka.actor.default-dispatcher-5] [akka://Supervision/user/supervisor/LifeCycleActor] null java.lang.ArithmeticException at com.packt.chapter2.LifeCycleActor$ $anonfun$receive$2.applyOrElse(ActorLifeCycle.scala:51) sum in postStop is 6
How it works…
In this preceding recipe, we create an actor, which maintains sum as a state, and we modify its life cycle hooks. We create this actor under the parent supervisor, which handles the ArthimaticException in the child actor. Let’s see what happens in life cycle hooks.
When an actor starts, it calls the preStart method, so we see the following output:
"sum in preStart is 1".
When an actor throws an exception, it sends a message to the supervisor, and the supervisor handles the failure by restarting that actor. It clears out the accumulated state of the actor, creates a fresh new actor means, and then restores the last value assigned to the state of old actor to the preRestart value.
After that postRestart method is called, and whenever the actor stops, the supervisor calls the postStop.
Sending messages to actors and collecting responses
In this recipe, you will learn how a parent sends messages to its child, and collects responses from them.
To step through this recipe, we need to import the Hello-Akka project in the IDE.
How to do it…
- Create a file, SendMesagesToChilds.scala, in package com.packt.chapter2.
- Add the following imports to the top of the file:
import akka.actor.{ Props, ActorSystem, Actor, ActorRef }
- Create messages to be sent to the actors as follows:
case class DoubleValue(x: Int) case object CreateChild case object Send case class Response(x: Int)
- Define a child actor. It doubles the value sent to it.
class DoubleActor extends Actor { def receive = { case DoubleValue(number) => println(s"${self.path.name} Got the number $number") sender ! Response(number * 2) } }
- Define a parent actor. It creates child actors in its context, sends messages to them, and collects responses from them.
class ParentActor extends Actor { val random = new scala.util.Random var childs = scala.collection.mutable.ListBuffer[ActorRef]() def receive = { case CreateChild => childs ++= List(context.actorOf(Props[DoubleActor])) case Send => println(s"Sending messages to child") childs.zipWithIndex map { case (child, value) => child ! DoubleValue(random.nextInt(10)) } case Response(x) => println(s"Parent: Response from child $ {sender.path.name} is $x") } }
- Create a test application as follows, and run it:
object SendMessagesToChild extends App { val actorSystem = ActorSystem("Hello-Akka") val parent = actorSystem.actorOf(Props[ParentActor], "parent") parent ! CreateChild parent ! CreateChild parent ! CreateChild parent ! Send } On running the preceding test application, you will get the following output: $b Got the number 6 $a Got the number 5 $c Got the number 8 Parent: Response from child $a is 10 Parent: Response from child $b is 12 Parent: Response from child $c is 16
How it works…
In this last recipe, we create a child actor called DoubleActor, which doubles the value it gets. We also create a parent actor, which creates a child actor when it receives a CreateChild message, and maintains it in the list.
When the parent actor receives the message Send, it sends a random number to the child, and the child, in turn, sends a response to the parent.
Summary
In this article, you learned how to supervise and monitor Akka actors as well as create child actors of an actor. We also discussed how to override the life cycle hooks of an actor. Lastly, you learned how a parent sends messages to its child and collects responses from them.
Resources for Article:
Further resources on this subject:
- Introduction to Akka [article]
- Creating First Akka Application [article]
- Making History with Event Sourcing [article]