Archive for September 24th, 2011
Using Akka Dispatchers In Scala
Akka dispatchers are extremely important in Akka framework. They are directly responsible for optimal performance, throughput and scalability.
Akka supports dispatchers for both event-driven lightweight threads and thread-based Actors. For thread-based Actors each dispatcher is bound to a dedicated OS thread.
Default dispatcher is a single event-based dispatcher for all Actors created. The dispatcher used is this one:
Dispatchers.globalExecutorBasedEventDrivenDispatcher
For many cases it becomes mandatory to group Actors together for a dedicated dispatcher, then we can override the defaults and define our own dispatcher.
Setting the Dispatcher
Normally we set the dispatcher in Actor itself
class EchoActor extends Actor {
self.dispatcher = ..... // set the dispatcher
}
Or we can set it in the ActorRef
actorRef.dispatcher = dispatcher
There are different kind of dispatchers
- Thread-based
- Event-based
- Priority event-based
- Work-stealing
Thread-based
It binds dedicated OS thread to each Actor. The messages are posted to LinkedBlockingQueue which feeds messages to dispatcher one by one. It has worst performance and scalability. We also cannot share it among actors. Although Actors do not block for threads in this case.
Code example
class EchoActor extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
....
}
Event-based
The ExecutorBasedEventDrivenDispatcher binds a set of Actors to a thread pool backed up by a BlockingQueue. The dispatcher must be shared among Actors. This dispatcher is highly configurable and here we can specify things like ‘type of queue’, ‘max items’ , ‘rejection-policy’.
Code example
class EchoActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(name)
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.build
....
}
Priority event-based
It is meant for handling messages when priorities are assigned to messages. It is done by using PriorityExecutorBasedEventDrivenDispatcher. It requires a PriorityGenerator as an attribute in its constructor.
Let’s look at an example where we have a PriorityExecutorBasedEventDrivenDispatcher used for a group of messages fired on an actor.
package com.meetu.akka.dispatcher
import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.dispatch.PriorityExecutorBasedEventDrivenDispatcher
import akka.dispatch.PriorityGenerator
object PriorityDispatcherExample extends App {
val actor = Actor.actorOf(
new Actor {
def receive = {
case x => println(x)
}
})
val priority = PriorityGenerator {
case "high priority" => 0
case "low priority" => 100
case _ => 50
}
actor.dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("foo", priority)
actor.start
actor.dispatcher.suspend(actor)
actor ! "low priority"
actor ! "others"
actor ! "low priority"
actor ! "high priority"
actor.dispatcher.resume(actor)
Actor.registry.shutdownAll
}
