Inspecting the maxNrOfRetries of the OneForOne supervision strategy in Akka
I’ve recently become very fascinated by the actor model, and since we’re using Scala more at work I’m learning to work with Akka. I was learning about supervision strategies and tried to figure out what would happen if a child actor had restarted too many times when the parent utilized a OneForOne supervision strategy. Let’s look at a simple program to demonstrate what happens.
The child actor
We’ll start by creating the child actor:
// Our actor will throw this exception to trigger the
// restart
case object RestartException extends Exception
class TestFailActor extends Actor {
// When the child actor receives a message, it will
// throw an exception
override def receive: Receive = {
case msg =>
println("I received a message. I don't like" +
"messages. I'm going to fail!")
throw RestartException
}
}
This actor will receive any message, and react to it by throwing the RestartException
. When an actor throws an exception, a Failure message is sent to its supervisor (i.e. its parent).
The parent actor
Let’s take a look at the parent actor:
class TestFailActorParent extends Actor {
// When the parent receives a message it will just
// print it
override def receive: Receive = {
case msg => println("I'm the parent! I like" +
"messages. Carry on.")
}
// Let's define our supervision strategy, using the
// OneForOneStrategy
override val supervisorStrategy = OneForOneStrategy(
// The max number of restarts a child actor can do.
// If this limit is _exceeded_, the child actor is
// stopped:
maxNrOfRetries = 1,
withinTimeRange = 1 second
) {
// If we receive the restart exception, we print a
// message and then restart the actor
case RestartException =>
println("Received the RestartException in the" +
"child")
Restart
// If we receive any other exception, we'll escalate
case _: Exception =>
println("Received unknown exception in the parent")
Escalate
}
// Create our child actor
val child = context.actorOf(
Props[TestFailActor],
"child"
)
// Send the first message to the child. This will cause
// the child to throw an exception. The parent actor
// will then receive a Failure message from the child,
// and tell the child to restart.
child ! "test1"
Thread.sleep(1000)
// At this point, the child actor has restarted once.
// Send another message to the child. It will again
// cause the child to throw an exception. The parent
// actor then sees that we have restarted the actor
// once already, so this time it will stop the child.
child ! "test2"
Thread.sleep(1000)
// Now the child actor has been stopped.
// Try sending another message to the child actor.
// When we do this, it should not be received by
// anyone.
child ! "test3"
Thread.sleep(500)
}
That was a few lines of code, so let’s break it down into smaller parts.
We start by saying that when the parent receives a message, it will just print a message:
override def receive: Receive = {
case msg => println("I'm the parent! I like" +
"messages. Carry on.")
}
We then define a supervision strategy for this actor. The supervision strategy decides what should happen if one of the child actors should send us a Failure
message, indicating that it has thrown an exception. Here, we say that when a child actor has failed with a RestartException
, the child should be restarted.
override val supervisorStrategy = OneForOneStrategy(
// The max number of restarts a child actor can do.
// If this limit is _exceeded_, the child actor is
// stopped:
maxNrOfRetries = 1,
withinTimeRange = 1 second
) {
// If we receive the restart exception, we print a
// message and then restart the actor
case RestartException =>
println("Received the RestartException in the" +
"child")
Restart
// If we receive any other exception, we'll escalate
case _: Exception =>
println("Received unknown exception in the parent")
Escalate
}
Note the value we gave the maxNrOfRetries
parameter of the OneForOneStrategy
. This will have the parent actor allow a maximum of 1 retry for each child actor. If a child actor then exceeds this number, instead of being restarted, it will be stopped.
We then create the child actor:
val child = context.actorOf(
Props[TestFailActor],
"child"
)
And proceed to send messages to the child actor:
// Send the first message to the child. This will cause
// the child to throw an exception. The parent actor
// will then receive a Failure message from the child,
// and tell the child to restart.
child ! "test1"
Thread.sleep(1000)
// At this point, the child actor has restarted once.
// Send another message to the child. It will again
// cause the child to throw an exception. The parent
// actor then sees that we have restarted the actor
// once already, so this time it will stop the child.
child ! "test2"
Thread.sleep(1000)
// Now the child actor has been stopped.
// Try sending another message to the child actor.
// When we do this, it should not be received by
// anyone.
child ! "test3"
Thread.sleep(500)
And that’s it for the child actor.
Our ActorSystem
Our ActorSystem
will be very simple:
// Create our actorsystem
val system = ActorSystem("test-retries")
// Create an actor of the parent. The parent will then
// spawn a child actor and start to send messages
val parent = system.actorOf(
Props[TestFailActorParent],
"test-fail-actor"
)
// Set a delay so that our program gets a chance to
// finish before we terminate
Thread.sleep(5000)
system.terminate()
Here we simply create an ActorSystem
, create a parent actor, and then make sure to give the parent actor time to finish before we terminate the ActorSystem
.
So what happens when executing this program?
I’m glad you asked. Here’s the output of the program:
1 [test-retries-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
I received a message. I don't like messages. I'm going to fail!
Received the RestartException in the child
I received a message. I don't like messages. I'm going to fail!
Received the RestartException in the child
2566 [test-retries-akka.actor.default-dispatcher-4] ERROR akka.actor.OneForOneStrategy -
Retries$RestartException$
at Retries$RestartException$.<clinit>(Retries.scala)
at Retries$TestFailActor$$anonfun$receive$1.applyOrElse(Retries.scala:16)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at Retries$TestFailActor.aroundReceive(Retries.scala:11)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
at akka.actor.ActorCell.invoke(ActorCell.scala:557)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2569 [test-retries-akka.actor.default-dispatcher-4] ERROR akka.actor.OneForOneStrategy -
Retries$RestartException$
at Retries$RestartException$.<clinit>(Retries.scala)
at Retries$TestFailActor$$anonfun$receive$1.applyOrElse(Retries.scala:16)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at Retries$TestFailActor.aroundReceive(Retries.scala:11)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
at akka.actor.ActorCell.invoke(ActorCell.scala:557)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2571 [test-retries-akka.actor.default-dispatcher-4] INFO akka.actor.LocalActorRef - Message [java language=".lang.String"][/java] from Actor[akka://test-retries/user/test-fail-actor#410410813] to Actor[akka://test-retries/user/test-fail-actor/child#169309014] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test-retries/user/test-fail-actor/child#169309014]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Process finished with exit code 0
That makes sense, doesn’t it? What happens is this:
- The child actor gets sent a message. It reacts by throwing an exception.
- The parent sees that a child has failed with an exception. It sees that it hasn’t restarted before, so it tells the child to restart.
- The child actor gets sent a message. It reacts by throwing an exception.
- The parent sees that this child has restarted once before, and so tells the child to stop.
- The child actor gets sent a message. Akka then tells us that the message never gets delivered. In fact, it ends up as a dead letter.
Hopefully that’s helpful. The full program can be seen below, along with the build.sbt
file used.
The full program
import akka.actor.SupervisorStrategy.{Escalate, Restart}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}
import scala.concurrent.duration._ // To be able to say `1 second`
object Retries extends App {
// Our actor will throw this exception to trigger the
// restart
case object RestartException extends Exception
class TestFailActor extends Actor {
// When the child actor receives a message, it will
// throw an exception
override def receive: Receive = {
case msg =>
println("I received a message. I don't like" +
"messages. I'm going to fail!")
throw RestartException
}
}
class TestFailActorParent extends Actor {
// When the parent receives a message it will just
// print it
override def receive: Receive = {
case msg => println("I'm the parent! I like" +
"messages. Carry on.")
}
// Let's define our supervision strategy, using the
// OneForOneStrategy
override val supervisorStrategy = OneForOneStrategy(
// The max number of restarts a child actor can do.
// If this limit is _exceeded_, the child actor is
// stopped:
maxNrOfRetries = 1,
withinTimeRange = 1 second
) {
// If we receive the restart exception, we print a
// message and then restart the actor
case RestartException =>
println("Received the RestartException in the" +
"child")
Restart
// If we receive any other exception, we'll escalate
case _: Exception =>
println("Received unknown exception in the parent")
Escalate
}
// Create our child actor
val child = context.actorOf(
Props[TestFailActor],
"child"
)
// Send the first message to the child. This will cause
// the child to throw an exception. The parent actor
// will then receive a Failure message from the child,
// and tell the child to restart.
child ! "test1"
Thread.sleep(1000)
// At this point, the child actor has restarted once.
// Send another message to the child. It will again
// cause the child to throw an exception. The parent
// actor then sees that we have restarted the actor
// once already, so this time it will stop the child.
child ! "test2"
Thread.sleep(1000)
// Now the child actor has been stopped.
// Try sending another message to the child actor.
// When we do this, it should not be received by
// anyone.
child ! "test3"
Thread.sleep(500)
}
// Create our actorsystem
val system = ActorSystem("test-retries")
// Create an actor of the parent. The parent will then
// spawn a child actor and start to send messages
val parent = system.actorOf(
Props[TestFailActorParent],
"test-fail-actor"
)
// Set a delay so that our program gets a chance to
// finish before we terminate
Thread.sleep(5000)
system.terminate()
}
build.sbt
:
name := "test-retries"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.16",
"com.typesafe.akka" %% "akka-slf4j" % "2.5.16",
"org.slf4j" % "slf4j-log4j12" % "1.7.25"
)