Inspecting the maxNrOfRetries of the OneForOne supervision strategy in Akka

I’ve recently become very fascinated by the actor model, and since we’re using Scala more at work I’m learning to work with Akka. I was learning about supervision strategies, and tried to figure out what would happen if a child actor had restarted too many times when the parent utilized a OneForOne supervision strategy. Let’s look at a simple program to demonstrate what happens.

The child actor

We’ll start by creating the child actor:

  // Our actor will throw this exception to trigger the
  // restart
  case object RestartException extends Exception

  class TestFailActor extends Actor {
    // When the child actor receives a message, it will
    // throw an exception
    override def receive: Receive = {
      case msg =>
        println("I received a message. I don't like" +
          "messages. I'm going to fail!")
        throw RestartException
    }
  }

This actor will receive any message, and react to it by throwing the RestartException. When an actor throws an exception, a Failure message is sent to its supervisor (i.e. its parent).

The parent actor

Let’s take a look at the parent actor:

  class TestFailActorParent extends Actor {

    // When the parent receives a message it will just
    // print it
    override def receive: Receive = {
      case msg => println("I'm the parent! I like" +
        "messages. Carry on.")
    }

    // Let's define our supervision strategy, using the
    // OneForOneStrategy
    override val supervisorStrategy = OneForOneStrategy(
      // The max number of restarts a child actor can do.
      // If this limit is _exceeded_, the child actor is
      // stopped:
      maxNrOfRetries = 1,
      withinTimeRange = 1 second
    ) {
      // If we receive the restart exception, we print a
      // message and then restart the actor
      case RestartException =>
        println("Received the RestartException in the" +
          "child")
        Restart
      // If we receive any other exception, we'll escalate
      case _: Exception =>
        println("Received unknown exception in the parent")
        Escalate
    }

    // Create our child actor
    val child = context.actorOf(
      Props[TestFailActor],
      "child"
    )

    // Send the first message to the child. This will cause
    // the child to throw an exception. The parent actor
    // will then receive a Failure message from the child,
    // and tell the child to restart.
    child ! "test1"
    Thread.sleep(1000)

    // At this point, the child actor has restarted once.

    // Send another message to the child. It will again
    // cause the child to throw an exception. The parent
    // actor then sees that we have restarted the actor
    // once already, so this time it will stop the child.
    child ! "test2"
    Thread.sleep(1000)

    // Now the child actor has been stopped.

    // Try sending another message to the child actor.
    // When we do this, it should not be received by
    // anyone.
    child ! "test3"
    Thread.sleep(500)
  }

That was a few lines of code, so let’s break it down into smaller parts.

We start by saying that when the parent receives a message, it will just print a message:

    override def receive: Receive = {
      case msg => println("I'm the parent! I like" +
        "messages. Carry on.")
    }

We then define a supervision strategy for this actor. The supervision strategy decides what should happen if one of the child actors should send us a Failure message, indicating that it has thrown an exception. Here, we say that when a child actor has failed with a RestartException, the child should be restarted.

    override val supervisorStrategy = OneForOneStrategy(
      // The max number of restarts a child actor can do.
      // If this limit is _exceeded_, the child actor is
      // stopped:
      maxNrOfRetries = 1,
      withinTimeRange = 1 second
    ) {
      // If we receive the restart exception, we print a
      // message and then restart the actor
      case RestartException =>
        println("Received the RestartException in the" +
          "child")
        Restart
      // If we receive any other exception, we'll escalate
      case _: Exception =>
        println("Received unknown exception in the parent")
        Escalate
    }

Note the value we gave the maxNrOfRetries parameter of the OneForOneStrategy. This will have the parent actor allow a maximum of 1 retry for each child actor. If a child actor then exceeds this number, instead of being restarted, it will be stopped.

We then create the child actor:

    val child = context.actorOf(
      Props[TestFailActor],
      "child"
    )

And proceed to send messages to the child actor:

    // Send the first message to the child. This will cause
    // the child to throw an exception. The parent actor
    // will then receive a Failure message from the child,
    // and tell the child to restart.
    child ! "test1"
    Thread.sleep(1000)

    // At this point, the child actor has restarted once.

    // Send another message to the child. It will again
    // cause the child to throw an exception. The parent
    // actor then sees that we have restarted the actor
    // once already, so this time it will stop the child.
    child ! "test2"
    Thread.sleep(1000)

    // Now the child actor has been stopped.

    // Try sending another message to the child actor.
    // When we do this, it should not be received by
    // anyone.
    child ! "test3"
    Thread.sleep(500)

And that’s it for the child actor.

Our ActorSystem

Our ActorSystem will be very simple:

  // Create our actorsystem
  val system = ActorSystem("test-retries")

  // Create an actor of the parent. The parent will then
  // spawn a child actor and start to send messages
  val parent = system.actorOf(
    Props[TestFailActorParent],
    "test-fail-actor"
  )

  // Set a delay so that our program gets a chance to
  // finish before we terminate
  Thread.sleep(5000)

  system.terminate()

Here we simply create an ActorSystem, create a parent actor, and then make sure to give the parent actor time to finish before we terminate the ActorSystem.

So what happens when executing this program?

I’m glad you asked. Here’s the output of the program:

1    [test-retries-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
I received a message. I don't like messages. I'm going to fail!
Received the RestartException in the child
I received a message. I don't like messages. I'm going to fail!
Received the RestartException in the child
2566 [test-retries-akka.actor.default-dispatcher-4] ERROR akka.actor.OneForOneStrategy  - 
Retries$RestartException$
	at Retries$RestartException$.<clinit>(Retries.scala)
	at Retries$TestFailActor$$anonfun$receive$1.applyOrElse(Retries.scala:16)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at Retries$TestFailActor.aroundReceive(Retries.scala:11)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
	at akka.actor.ActorCell.invoke(ActorCell.scala:557)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2569 [test-retries-akka.actor.default-dispatcher-4] ERROR akka.actor.OneForOneStrategy  - 
Retries$RestartException$
	at Retries$RestartException$.<clinit>(Retries.scala)
	at Retries$TestFailActor$$anonfun$receive$1.applyOrElse(Retries.scala:16)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at Retries$TestFailActor.aroundReceive(Retries.scala:11)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
	at akka.actor.ActorCell.invoke(ActorCell.scala:557)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2571 [test-retries-akka.actor.default-dispatcher-4] INFO  akka.actor.LocalActorRef  - Message  from Actor[akka://test-retries/user/test-fail-actor#410410813] to Actor[akka://test-retries/user/test-fail-actor/child#169309014] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test-retries/user/test-fail-actor/child#169309014]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Process finished with exit code 0

That makes sense, doesn’t it? What happens is this:

  1. The child actor gets sent a message. It reacts by throwing an exception.
  2. The parent sees that a child has failed with an exception. It sees that it hasn’t restarted before, so it tells the child to restart.
  3. The child actor gets sent a message. It reacts by throwing an exception.
  4. The parent sees that this child has restarted once before, and so tells the child to stop.
  5. The child actor gets sent a message. Akka then tells us that the message never gets delivered. In fact, it ends up as a dead letter.

Hopefully that’s helpful. The full program can be seen below, along with the build.sbt file used.

The full program

import akka.actor.SupervisorStrategy.{Escalate, Restart}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}
import scala.concurrent.duration._ // To be able to say `1 second`

object Retries extends App {

  // Our actor will throw this exception to trigger the
  // restart
  case object RestartException extends Exception

  class TestFailActor extends Actor {
    // When the child actor receives a message, it will
    // throw an exception
    override def receive: Receive = {
      case msg =>
        println("I received a message. I don't like" +
          "messages. I'm going to fail!")
        throw RestartException
    }
  }

  class TestFailActorParent extends Actor {

    // When the parent receives a message it will just
    // print it
    override def receive: Receive = {
      case msg => println("I'm the parent! I like" +
        "messages. Carry on.")
    }

    // Let's define our supervision strategy, using the
    // OneForOneStrategy
    override val supervisorStrategy = OneForOneStrategy(
      // The max number of restarts a child actor can do.
      // If this limit is _exceeded_, the child actor is
      // stopped:
      maxNrOfRetries = 1,
      withinTimeRange = 1 second
    ) {
      // If we receive the restart exception, we print a
      // message and then restart the actor
      case RestartException =>
        println("Received the RestartException in the" +
          "child")
        Restart
      // If we receive any other exception, we'll escalate
      case _: Exception =>
        println("Received unknown exception in the parent")
        Escalate
    }

    // Create our child actor
    val child = context.actorOf(
      Props[TestFailActor],
      "child"
    )

    // Send the first message to the child. This will cause
    // the child to throw an exception. The parent actor
    // will then receive a Failure message from the child,
    // and tell the child to restart.
    child ! "test1"
    Thread.sleep(1000)

    // At this point, the child actor has restarted once.

    // Send another message to the child. It will again
    // cause the child to throw an exception. The parent
    // actor then sees that we have restarted the actor
    // once already, so this time it will stop the child.
    child ! "test2"
    Thread.sleep(1000)

    // Now the child actor has been stopped.

    // Try sending another message to the child actor.
    // When we do this, it should not be received by
    // anyone.
    child ! "test3"
    Thread.sleep(500)
  }

  // Create our actorsystem
  val system = ActorSystem("test-retries")

  // Create an actor of the parent. The parent will then
  // spawn a child actor and start to send messages
  val parent = system.actorOf(
    Props[TestFailActorParent], 
    "test-fail-actor"
  )

  // Set a delay so that our program gets a chance to
  // finish before we terminate
  Thread.sleep(5000)

  system.terminate()
}

build.sbt:

name := "test-retries"

version := "0.1"

scalaVersion := "2.12.6"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.16",
  "com.typesafe.akka" %% "akka-slf4j" % "2.5.16",
  "org.slf4j" % "slf4j-log4j12" % "1.7.25"
)

Developing a Bioconductor package with RStudio and Docker

If you’re going to develop a Bioconductor package you’ll soon discover that your package has to work on both the development version and the release version of Bioconductor. This means that your package has to build against two different versions of R. Which means you need two different R versions on your own machine to develop your package. This can be done, but not nicely. So what can we do?

Docker solves this problem for us: Instead of having to install R on your own system, you can use an existing Docker image with the appropriate R version already installed. Bioconductor makes our job even easier, as they have ready-made Docker images for both the development and the release version of Bioconductor. And to top that off, they have Docker images based the RStudio Docker images, which gives us our development environment of choice right in our browser. Let’s take a look at how these images work.

  1. Run docker run -it -p 8787:8787 bioconductor/devel_core2. This will start a container from the Docker image bioconductor/devel_core2, and setup the port 8787 on your machine to be routed to port 8787 inside the Docker container.
  2. Open http://localhost:8787/ in your browser. This will open the web frontend for RStudio which is running in the Docker container. You might have to login: The default username/password should be rstudio/rstudio.

As you can see, I’m presented with development version of R. A collection of Bioconductor packages have already been installed in this Docker image for our convenience. That was easy, wasn’t it? Let’s try the release version next:

  1. This time run docker run -it -p 8787:8787 bioconductor/release_core2. This will start a container from the Docker image bioconductor/release_core2, and again setup the port 8787 on your machine to be routed to port 8787 inside the Docker container.
  2. Open http://localhost:8787/ in your browser.

Look at that! Only a few keystrokes and we have the release version of R with Bioconductor packages ready to go right there in our browser.

This is nice as it is, but what if you want to work on your existing scripts or existing package? Let me show you how I usually work with these Docker images when developing chimeraviz:

  1. (This first step is optional. Just use your own R code if you have something already.) Clone the chimeraviz repository to somewhere on your computer: git clone https://github.com/stianlagstad/chimeraviz
  2. Go to the folder where you have your R code. The location of chimeraviz on my system is /home/stian/dev/chimeraviz, so I’ll go there.
  3. Run docker run -it -p 8787:8787 -v /home/stian/dev/chimeraviz/:/chimeraviz bioconductor/release_core2. This will start a Docker container from the bioconductor/release_core2 image as before, bit it will also create a link between the folder /home/stian/dev/chimeraviz on my system and the folder /chimeraviz inside the Docker container. The result? My code is available inside the container.
  4. Open http://localhost:8787 in your browser.
  5. Execute setwd("/chimeraviz") in RStudio.
  6. Down to the right inside RStudio, press “More” and then “Go to working directory”:

With these steps done, you’re ready to start fixing bugs in chimeraviz (of which there are none, surely!). When you’re done, you can execute devtools::check() to make sure everything is still working nicely, and then submit a pull request. 🙂

This is how I’m currently working when doing changes in chimeraviz. I do changes on the master branch, which has the development version, within the bioconductor/release_core2 Docker image. For the current release version, I checkout the RELEASE_3_6 branch, and do changes inside the bioconductor/devel_core2 Docker image. I don’t need R installed on my own system at all. If I’m lazy, I might even skip the devtools::check() step and let Travis check the build for me.

Hope this helps someone! Please do leave a comment if you have any suggestions that can improve the way I work with R.

Setting up a continuous integration pipeline for an R Bioconductor package with Travis

I’ve been thinking about setting up a continuous integration pipeline for the chimeraviz package for a while. Two things held me back:

  1. I thought it would be a hassle to make work.
  2. I was the only contributor, so I could just build it on my own machine.

Since I’ve recently received several pull requests to chimeraviz, the first point is no longer true, so I thought I’d finally challenge the second point.

What is Travis?

Travis CI is a continuous integration service used to build and test open source projects hosted on GitHub. Setting up R projects with Travis used to be a hassle, but now Travis has native support for the R language: https://docs.travis-ci.com/user/languages/r/. The best thing about it? It’s free when your repository is public. Since I have a GitHub repository for chimeraviz in addition to the Bioconductor git repository, Travis is a great fit.

Using Travis to build chimeraviz

To enable Travis builds for chimeraviz, I first had to sign in on Travis using my GitHub account. Once signed in, I simply had to flick a switch to enable Travis builds for my chimeraviz repository.

For Travis to know how to build chimeraviz, I had to create a .travis.yml configuration file. The getting started site for R had a few pointers, and after a few tries I got it working with this:

# R for travis: see documentation at https://docs.travis-ci.com/user/languages/r
language: r
sudo: false
cache: packages

r:
  - bioc-devel

warnings_are_errors: true

script:
  - Rscript -e "library(devtools); devtools::check()"

notifications:
  email:
    on_success: always
    on_failure: always

Note this part:

r:
  - bioc-devel

Here I specify which version of R should be used for my build. Luckily, Travis has R versions including Bioconductor for both its release (bioc-release) and development (bioc-devel) versions so I don’t have to do anything special to get Bioconductor packages installed.

With this in place, contributors submitting pull requests to chimeraviz will get feedback on whether their changes introduce any problems with the build:

The next step is to introduce the use of a code coverage tool. I’ll update this post with the additional information when I get that working as well.

UPDATE: Adding code coverage report with codecov.io was very easy – I just had to add this to the .travis.yml file:

script:
  - Rscript -e "library(devtools); devtools::check()"

r_github_packages:
  - r-lib/covr

The complete .travis.yml then looks like this:

# R for travis: see documentation at https://docs.travis-ci.com/user/languages/r
language: r
sudo: false
cache: packages

r:
  - bioc-devel

warnings_are_errors: true

script:
  - Rscript -e "library(devtools); devtools::check()"

r_github_packages:
  - r-lib/covr

after_success:
  - Rscript -e 'covr::codecov()'

notifications:
  email:
    on_success: always
    on_failure: always

And I’m able to show the code coverage on the README of the package:

As well as have these checks run automagically whenever someone submits a pull request: