Meetu Maltiar's Weblogs

Meetu's thoughts on technology and software development

Archive for September 24th, 2011

Using Akka Dispatchers In Scala

leave a comment »

Akka dispatchers are extremely important in Akka framework. They are directly responsible for optimal performance, throughput and scalability.

Akka supports dispatchers for both event-driven lightweight threads and thread-based Actors. For thread-based Actors each dispatcher is bound to a dedicated OS thread.

Default dispatcher is a single event-based dispatcher for all Actors created. The dispatcher used is this one:

Dispatchers.globalExecutorBasedEventDrivenDispatcher

For many cases it becomes mandatory to group Actors together for a dedicated dispatcher, then we can override the defaults and define our own dispatcher.

Setting the Dispatcher
Normally we set the dispatcher in Actor itself

class EchoActor extends Actor {
     self.dispatcher = ..... // set the dispatcher
}

Or we can set it in the ActorRef
actorRef.dispatcher = dispatcher

There are different kind of dispatchers

  • Thread-based
  • Event-based
  • Priority event-based
  • Work-stealing

Thread-based
It binds dedicated OS thread to each Actor. The messages are posted to LinkedBlockingQueue which feeds messages to dispatcher one by one. It has worst performance and scalability. We also cannot share it among actors. Although Actors do not block for threads in this case.
Code example

class EchoActor extends Actor {
     self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
     ....
}

Event-based
The ExecutorBasedEventDrivenDispatcher binds a set of Actors to a thread pool backed up by a BlockingQueue. The dispatcher must be shared among Actors. This dispatcher is highly configurable and here we can specify things like ‘type of queue’, ‘max items’ , ‘rejection-policy’.
Code example

class EchoActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(name)
 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
 .setCorePoolSize(16)
 .setMaxPoolSize(128)
 .setKeepAliveTimeInMillis(60000)
 .setRejectionPolicy(new CallerRunsPolicy)
 .build
 ....
}

Priority event-based
It is meant for handling messages when priorities are assigned to messages. It is done by using PriorityExecutorBasedEventDrivenDispatcher. It requires a PriorityGenerator as an attribute in its constructor.

Let’s look at an example where we have a PriorityExecutorBasedEventDrivenDispatcher used for a group of messages fired on an actor.

package com.meetu.akka.dispatcher

import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.dispatch.PriorityExecutorBasedEventDrivenDispatcher
import akka.dispatch.PriorityGenerator

object PriorityDispatcherExample extends App {
  val actor = Actor.actorOf(
    new Actor {
      def receive = {
        case x => println(x)
      }
    })

  val priority = PriorityGenerator {
    case "high priority" => 0
    case "low priority" => 100
    case _ => 50
  }

  actor.dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("foo", priority)

  actor.start
  actor.dispatcher.suspend(actor)

  actor ! "low priority"
  actor ! "others"
  actor ! "low priority"
  actor ! "high priority"

  actor.dispatcher.resume(actor)
  Actor.registry.shutdownAll
}

Read the rest of this entry »

Written by Meetu Maltiar

September 24, 2011 at 19:59

Posted in Akka, Scala

Tagged with ,

Follow

Get every new post delivered to your Inbox.

Join 42 other followers