Archive for September 2011
Using Akka Dispatchers In Scala
Akka dispatchers are extremely important in Akka framework. They are directly responsible for optimal performance, throughput and scalability.
Akka supports dispatchers for both event-driven lightweight threads and thread-based Actors. For thread-based Actors each dispatcher is bound to a dedicated OS thread.
Default dispatcher is a single event-based dispatcher for all Actors created. The dispatcher used is this one:
Dispatchers.globalExecutorBasedEventDrivenDispatcher
For many cases it becomes mandatory to group Actors together for a dedicated dispatcher, then we can override the defaults and define our own dispatcher.
Setting the Dispatcher
Normally we set the dispatcher in Actor itself
class EchoActor extends Actor {
self.dispatcher = ..... // set the dispatcher
}
Or we can set it in the ActorRef
actorRef.dispatcher = dispatcher
There are different kind of dispatchers
- Thread-based
- Event-based
- Priority event-based
- Work-stealing
Thread-based
It binds dedicated OS thread to each Actor. The messages are posted to LinkedBlockingQueue which feeds messages to dispatcher one by one. It has worst performance and scalability. We also cannot share it among actors. Although Actors do not block for threads in this case.
Code example
class EchoActor extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
....
}
Event-based
The ExecutorBasedEventDrivenDispatcher binds a set of Actors to a thread pool backed up by a BlockingQueue. The dispatcher must be shared among Actors. This dispatcher is highly configurable and here we can specify things like ‘type of queue’, ‘max items’ , ‘rejection-policy’.
Code example
class EchoActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(name)
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.build
....
}
Priority event-based
It is meant for handling messages when priorities are assigned to messages. It is done by using PriorityExecutorBasedEventDrivenDispatcher. It requires a PriorityGenerator as an attribute in its constructor.
Let’s look at an example where we have a PriorityExecutorBasedEventDrivenDispatcher used for a group of messages fired on an actor.
package com.meetu.akka.dispatcher
import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.dispatch.PriorityExecutorBasedEventDrivenDispatcher
import akka.dispatch.PriorityGenerator
object PriorityDispatcherExample extends App {
val actor = Actor.actorOf(
new Actor {
def receive = {
case x => println(x)
}
})
val priority = PriorityGenerator {
case "high priority" => 0
case "low priority" => 100
case _ => 50
}
actor.dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("foo", priority)
actor.start
actor.dispatcher.suspend(actor)
actor ! "low priority"
actor ! "others"
actor ! "low priority"
actor ! "high priority"
actor.dispatcher.resume(actor)
Actor.registry.shutdownAll
}
Manage Akka Actors With Supervisors
We are building a concurrent and massively scalable system using Akka framework. Akka uses light-weight Actor paradigm of managing concurrency.
Most of the times a bunch of Akka actors are required for a common functionality; therefore we do tend to manage them as a group.
This is where Akka Supervisor comes into the picture. Akka Supervisors are responsible for starting, stopping and monitoring child Akka actors.
Lets look now how to use supervisor over Akka Actors. Just to keep things simple we have an Actor which simply echoes the message it receives. Here is the code listing for EchoActor:
/**
* An Actor that echoes everything you send to it
*/
class EchoActor extends Actor {
def receive = {
case msg => {
println(msg)
}
}
}
We will build supervision on a bunch of EchoActor, lets say our supervisor is EchoActorSupervisor. But before we actually look at its code listing we understand its constituent elements first.
The Supervisor first require SupervisorConfig. This SupervisorConfig in turn requires fault handling strategy for the group of actors. They are:
- AllForOneStrategy
- OneForOneStrategy
The AllForOne fault handler will restart all the components and OneForOneStrategy fault handler will restart just the component which died. Fault Handler takes as parameter a List of Exceptions which will be handled, maximum number of restart tries and within-time in millis
Lets have a look how does our SupervisorConfig looks like:
... val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) ...
Supervisor can now be created by passing in SupervisorConfig created in previous listing:
... val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) val supervisor = Supervisor(supervisorConfig) ...
now we can link and unlink actors to the supervisor. In our case since we are using EchoActor we can link two Actor Reference to the supervisor. In the code below we are linking and starting two EchoActor.
... val echoActor1 = actorOf[EchoActor] val echoActor2 = actorOf[EchoActor] val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) val supervisor = Supervisor(supervisorConfig) supervisor.link(echoActor1) supervisor.link(echoActor2) echoActor1.start echoActor2.start ...
