Manage Akka Actors With Supervisors
We are building a concurrent and massively scalable system using Akka framework. Akka uses light-weight Actor paradigm of managing concurrency.
Most of the times a bunch of Akka actors are required for a common functionality; therefore we do tend to manage them as a group.
This is where Akka Supervisor comes into the picture. Akka Supervisors are responsible for starting, stopping and monitoring child Akka actors.
Lets look now how to use supervisor over Akka Actors. Just to keep things simple we have an Actor which simply echoes the message it receives. Here is the code listing for EchoActor:
/**
* An Actor that echoes everything you send to it
*/
class EchoActor extends Actor {
def receive = {
case msg => {
println(msg)
}
}
}
We will build supervision on a bunch of EchoActor, lets say our supervisor is EchoActorSupervisor. But before we actually look at its code listing we understand its constituent elements first.
The Supervisor first require SupervisorConfig. This SupervisorConfig in turn requires fault handling strategy for the group of actors. They are:
- AllForOneStrategy
- OneForOneStrategy
The AllForOne fault handler will restart all the components and OneForOneStrategy fault handler will restart just the component which died. Fault Handler takes as parameter a List of Exceptions which will be handled, maximum number of restart tries and within-time in millis
Lets have a look how does our SupervisorConfig looks like:
... val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) ...
Supervisor can now be created by passing in SupervisorConfig created in previous listing:
... val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) val supervisor = Supervisor(supervisorConfig) ...
now we can link and unlink actors to the supervisor. In our case since we are using EchoActor we can link two Actor Reference to the supervisor. In the code below we are linking and starting two EchoActor.
... val echoActor1 = actorOf[EchoActor] val echoActor2 = actorOf[EchoActor] val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) val supervisor = Supervisor(supervisorConfig) supervisor.link(echoActor1) supervisor.link(echoActor2) echoActor1.start echoActor2.start ...
If you like a shorthand way of doing it where you can start and link the actors in a go, here is the code listing for that.
...
val echoActor1 = actorOf[EchoActor]
val echoActor2 = actorOf[EchoActor]
val supervisorConfig = SupervisorConfig(AllForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(echoActor1, Permanent) :: Supervise(echoActor2, Permanent) :: Nil)
val supervisor = Supervisor(supervisorConfig)
...
Since, we have provided Fault Tolerance Strategy we will also like to see it working in action. For that Echo Actor should die after throwing an Exception.
Let Echo Actor be changed to throw an Exception when it has an ErrorInducingMessage. For this to occur we will introduce a simple case class ErrorInducingMessage and we will change EchoActor to die when it gets this message. Here I also override a lifecycle method postRestart in EchoActor to show that actor came up after throwing exception.
The code listing for described changes:
/**
* An Actor that echoes everything you send to it
*/
class EchoActor extends Actor {
def receive = {
case ErrorInducingMessage => throw new Exception("Dying due to error inducing message")
case msg => println(msg)
}
override def postRestart(err:Throwable) = {
println("I am back again...")
}
}
case class ErrorInducingMessage
To show the fault handling strategy in action we will send one message each to actors. Then we send one ErrorInducingMessage to one of the actor and finally we send one message each again. Lets combine all of this together.
package com.meetu.akka
import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.actor.Supervisor
import akka.config.Supervision.OneForOneStrategy
import akka.config.Supervision.SupervisorConfig
object Application extends App {
val echoActor1 = actorOf[EchoActor]
val echoActor2 = actorOf[EchoActor]
val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil)
val supervisor = Supervisor(supervisorConfig)
supervisor.link(echoActor1)
supervisor.link(echoActor2)
echoActor1.start
echoActor2.start
echoActor1 ! "Hello actor 1"
echoActor2 ! "Hello actor 2"
echoActor1 ! ErrorInducingMessage
Thread.sleep(1000)
echoActor1 ! "Hello actor 1"
echoActor2 ! "Hello actor 2"
}
/**
* An Actor that echoes everything you send to it
*/
class EchoActor extends Actor {
def receive = {
case ErrorInducingMessage => throw new Exception("Dying due to error inducing message")
case msg => println(msg)
}
override def postRestart(err:Throwable) = {
println("I am back again...")
}
}
case class ErrorInducingMessage
For want of shorter code where we do not explicitly start the actors here is the complete code listing.
package com.meetu.akka
import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.actor.Supervisor
import akka.config.Supervision.AllForOneStrategy
import akka.config.Supervision.Permanent
import akka.config.Supervision.Supervise
import akka.config.Supervision.SupervisorConfig
object Application extends App {
val echoActor1 = actorOf[EchoActor]
val echoActor2 = actorOf[EchoActor]
val supervisorConfig = SupervisorConfig(AllForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(echoActor1, Permanent) :: Supervise(echoActor2, Permanent) :: Nil)
val supervisor = Supervisor(supervisorConfig)
echoActor1 ! "Hello actor 1"
echoActor2 ! "Hello actor 2"
echoActor1 ! ErrorInducingMessage
Thread.sleep(1000)
echoActor1 ! "Hello actor 1"
echoActor2 ! "Hello actor 2"
}
/**
* An Actor that echoes everything you send to it
*/
class EchoActor extends Actor {
def receive = {
case ErrorInducingMessage => throw new Exception("Dying due to error inducing message")
case msg => println(msg)
}
override def postRestart(err: Throwable) = {
println("I am back again...")
}
}
case class ErrorInducingMessage
When we run this we see that there are two messages from each actors. There is some error trace as expected. Then we see that one of the actor is back again. Finally we see that two messages are printed by each actors.
Fault tolerance is important in the sense that we are sure of whether all actors managed by a supervisor are dead or none of them are dead. Instead of preventing from things to go wrong we think about recovery from a crash. This in sense encourages non-defensive programming.
The Akka code for this can be downloaded from here. This is a sbt project and code can be executed by executing following at command prompt
sbt run

