Meetu Maltiar's Blog

Meetu's thoughts on technology and software development

Archive for the ‘Akka’ Category

Bojug Meetup: Introducing Akka

leave a comment »


I presented this session on Introducing Akka at Bangalore Java Users Group. There was only one presentation today. The best part was that it was highly interactive session. We enjoyed it thoroughly. It lasted for more that two hours!!

This talk is inspired by Jonas Boner and Viktor Klang awesome presentations!!

My talk covered basics of Scala, Actors and Supervision. Code examples are found on my github. Here are the slides.

Written by Meetu Maltiar

June 28, 2014 at 20:15

Posted in Akka

Tagged with ,

Akka Futures In Scala With A Simple Example

with one comment


We use Akka Futures extensively for a product built to handle huge load of streaming data. We have been early adopters of Akka and been using it right from its Akka 1.1.X days. We found Futures as simple yet powerful tool to add concurrent behavior in our application.

This post emphasizes importance of Akka Futures with a short and easy to understand example application. Please read excellent documentation on Akka Futures for more details.

Ok, now lets look at a simple example. Lets say we have an identity function. We sleep in it for a while to replicate long running nature. Here is the Scala code for the method.

def timeTakingIdentityFunction(number: Int) = {
    // we sleep for 3 seconds and return number
    Thread.sleep(3000)
    number
}

Suppose, we have an application in which we call timeTakingIdentityFunction method three times, gather their return in three variables and execute sum on them. Here is the code listing.

object SumApplication extends App {
  val startTime = System.currentTimeMillis
  val number1 = timeTakingIdentityFunction(1)
  val number2 = timeTakingIdentityFunction(2)
  val number3 = timeTakingIdentityFunction(3)
  val sum = number1 + number2 + number3
  val elapsedTime = ((System.currentTimeMillis - startTime) / 1000.0)
  println("Sum of 1, 2 and 3 is " + sum + " calculated in " + elapsedTime + " seconds")
 
  def timeTakingIdentityFunction(number: Int) = {
    // we sleep for 3 seconds and return number
    Thread.sleep(3000)
    number
  }
}

When we run this we will get around nine seconds to execute it. But what will happen when we do not block on timeTakingIdentityFunction?

Using Akka Futures we can do exactly that. When we do not block on timeTakingIdentityFunction and continue processing, we get performance boost. Now lets look at the Akka Futures version.

We can use Future directly, we wrap method call in a Future. In our example, we can do this in code.

val timeTakingIdentityFunctionFuture = Future(timeTakingIdentityFunction(1))

Since we are making three calls on timeTakingIdentityFunction. We can collect Future[Int] in three variables i.e. future1, future2, future3

val future1 = Future(timeTakingIdentityFunction(1))
val future2 = Future(timeTakingIdentityFunction(2))
val future3 = Future(timeTakingIdentityFunction(3))

Futures are monadic. We can compose them using for expressions. The future obtained by composing them can be used for calculating sum.

We will use onSuccess callback for this. Here is the code snippet.

 val future = for {
    x <- future1
    y <- future2
    z <- future3
  } yield (x + y + z)
 
future onSuccess {
    case sum =>
      val elapsedTime = ((System.currentTimeMillis - startTime) / 1000.0)
      println("Sum of 1, 2 and 3 is " + sum + " calculated in " + elapsedTime + " seconds")
  }

We composed a new Future out of three. The new composed Future is of type Future[Int]. We issued a callback to gather sum of three numbers. Here is the complete example.

import akka.dispatch.Future
import akka.actor.ActorSystem
 
object SumApplicationWithFutures extends App {
  implicit val system = ActorSystem("future")
  val startTime = System.currentTimeMillis
  val future1 = Future(timeTakingIdentityFunction(1))
  val future2 = Future(timeTakingIdentityFunction(2))
  val future3 = Future(timeTakingIdentityFunction(3))
 
  val future = for {
    x <- future1
    y <- future2
    z <- future3
  } yield (x + y + z)
 
  future onSuccess {
    case sum =>
      val elapsedTime = ((System.currentTimeMillis - startTime) / 1000.0)
      println("Sum of 1, 2 and 3 is " + sum + " calculated in " + elapsedTime + " seconds")
  }
 
  def timeTakingIdentityFunction(number: Int) = {
    // we sleep for 3 seconds and return number
    Thread.sleep(3000)
    number
  }
}

Only thing we have added here is implicit ActorSystem. Future requires an execution context. If ActorSystem is in implicit scope we are fine.

When we run Future version of the application it runs in about three seconds!!

Akka Futures can add benefit by adding concurrency in our applications. For example we might need to make three blocking Rest API calls to achieve a functionality. Depending on our use case we can avoid issuing blocking calls, but rather use Futures.

We used Futures directly in our example application. Since Futures are monadic we composed them using for expressions. We also used callback method to collect the result. And using them we got three times improvement in our code. Most importantly code is simple, concise and expressive.

There is more concise way to do this. We can construct List of Future[Int] and call Future.sequence to create or compose a single Future of List[Int]. Code is left as an exercise 🙂

Latest version of Akka 2.1.0 for Scala 2.10 has unified version of Future in Scala. This post pertains to Akka 2.0.5 version, concepts are still the same.

Written by Meetu Maltiar

August 2, 2013 at 08:09

Posted in Akka, Scala

Tagged with ,

Working With Akka FSM

leave a comment »


We can think Akka FSM as a set of relations. This is from Akka FSM documentation.
State(S) x Event(E) -> Actions(A), State(S’)
We can read it as: When an Actor is in a State(S) and an event(E) occurs. Then Actor performs Actions(A) and transitions to state(S’)

Let’s have a look at a simple example to understand how it works.

FSM actors are some sort of combination of their state and data. State are one of the states an FSM actor will be at a given snapshot of time. Data on the other hand represents state machine’s internal state. They will end up being type parameters used when we code FSM actor.

We begin with an example of State which can either be RED or GREEN. Below is the code defining them.

sealed trait State
case object RED extends State
case object GREEN extends State

For this simple example we may not need to work at all on Data. We could have used Int but we can use our definition as well. Let’s use Data class definition defined below.

case class Data

For an FSM actor that alternate to RED and GREEN we need to pass Event’s to it. For our scenario one event is enough to transition. Below is its message definition.

case class AlternateColor

So, we have built class definitions for Akka FSM actor. They are: states actor can be in, events that it will need to handle and the data. Now we can direct our attention to FSM actor.

To create FSM actor we need to mixin FSM trait to an actor. It takes in type parameters for State and Data. Below is the pseudocode of how we can use it.

class ColorFSMActor extends Actor with FSM[State, Data] {
  startWith(<"one of possible states">, <"some initial data like empty List">)

  when(<"one of possible states">) {
    // Some Code
  }

  // Transition elided
  when(<"one of possible states">) {
    // Some Code
  }

  ....

  onTransition {
    case <"one of possible states"> -> <"one of other possible states"> =>
      // Some Code
    .....
  }
  
  initialize
}

In our case we can start with a given state RED. We have two states, so we have two when block for each state. One for RED and one for GREEN.

Whenever FSM actor gets an Event of AlternateColor it changes its state to the other. There is also onTransition block that we can use. There is an arrow operator we can put between pair of states. For our case we can output a message via print line when state transitions from one to other.

Below is the complete implementation of the desired behavior.

class ColorFSMActor extends Actor with FSM[State, Data] {
  startWith(RED, Data())

  when(RED) {
    case Event(AlternateColor, _) => goto(GREEN)
  }

  when(GREEN) {
    case Event(AlternateColor, _) => goto(RED)
  }

  onTransition {
    case RED -> GREEN => println("I am in RED and going to GREEN")
    case GREEN -> RED => println("I am in GREEN and going to RED")
  }

  initialize
}

To see FSM actor in action we can create actor and send messages to it. We will see the print log of transition from RED to GREEN and from GREEN to RED. Below is the code snippet.

  val system = ActorSystem("fsm")
  val colorFSMActor = system.actorOf(Props[ColorFSMActor])
  (1 to 10) foreach { i => colorFSMActor ! AlternateColor }

you should see following in the logs:

I am in RED and going to GREEN
I am in GREEN and going to RED
I am in RED and going to GREEN
I am in GREEN and going to RED
I am in RED and going to GREEN
I am in GREEN and going to RED
I am in RED and going to GREEN
I am in GREEN and going to RED
I am in RED and going to GREEN
I am in GREEN and going to RED

This is a simple example but is useful for general understanding of Akka FSM actors. There are excellent use cases for FSM’s for example throttling messages. For more detailed introduction please also visit awesome Akka FSM documentation.

Written by Meetu Maltiar

August 1, 2013 at 08:50

Posted in Akka

Tagged with , ,

Knolx Sesion: Akka 2.0 Reloaded

leave a comment »


Akka allows us to write concurrent, fault tolerant and scalable applications. We recently migrated our product from Akka 1.3x to Akka 2.x. The new version is quite different from 1.3x versions. It is not merely an API change but an overall change. We have to also think differently to develop applications. This session was presented at Knolx Session at Knoldus. This talk gently introduces Akka 2.0 with simple and easy to run examples.

The presentation is inspired from Viktor Klang’s talk at NE Scala symposium.

Written by Meetu Maltiar

November 1, 2012 at 10:48

Posted in Akka, Scala

Tagged with ,

Working With ZeroMQ Module In AKKA 2.0

leave a comment »


Akka framework supports ZeroMQ in its ZeroMQ extensions module. Akka is a great framework and we have our product built on its version 1.3x. We were looking at ZeroMQ support in our product and we stumbled on this issue. We found that ZeroMQ applications built on Akka extensions are running slower than an application built in plain scala.

Please read about our discussion thread on Akka user list. The issue is now accepted issue no #1963 you can view it here in assembla. The bug is now resolved great work by typesafe guys especially Roland Kuhn. This blog will deal with workaround till it is pushed in the new release. The workaround has a caveat that the Subscriber hangs on to the thread that created it. It is like the pinned dispatcher with allocated thread to the subscriber. Please look at the comment below.

I will create two simple PUB/SUB application one by using Akka based extension and one without it and then measure their throughput in order to compare their performance.

Let’s start the project by creating a sbt based project. Here is my build.sbt. It contains Akka 2.0 dependencies and that of ZeroMQ scala binding.

name := "Akka2Bench"

version := "1.0"

scalaVersion := "2.9.1"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
	"com.typesafe.akka" % "akka-actor" % "2.0",
	"com.typesafe.akka" % "akka-remote" % "2.0",
	"com.typesafe.akka" % "akka-zeromq" % "2.0",
	"com.typesafe.akka" % "akka-testkit" % "2.0",
	"com.typesafe.akka" % "akka-kernel" % "2.0"
)

libraryDependencies += "org.zeromq" %% "zeromq-scala-binding" % "0.0.5"

I use eclipse IDE so here is the entry for sbteclipse plugin in my project/plugin.sbt

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.0.0")

Let’s start with the Publisher. It will create a publisher socket and then bind to it. But we want it to be wrapped in Akka based Actor. I will use its lifecycle method preStart to create publisher socket and bind to it. Here is the code.

Read the rest of this entry »

Written by Meetu Maltiar

April 4, 2012 at 10:20

Posted in Akka

Tagged with ,

Using Akka Dispatchers In Scala

leave a comment »


Akka dispatchers are extremely important in Akka framework. They are directly responsible for optimal performance, throughput and scalability.

Akka supports dispatchers for both event-driven lightweight threads and thread-based Actors. For thread-based Actors each dispatcher is bound to a dedicated OS thread.

Default dispatcher is a single event-based dispatcher for all Actors created. The dispatcher used is this one:

Dispatchers.globalExecutorBasedEventDrivenDispatcher

For many cases it becomes mandatory to group Actors together for a dedicated dispatcher, then we can override the defaults and define our own dispatcher.

Setting the Dispatcher
Normally we set the dispatcher in Actor itself

class EchoActor extends Actor {
     self.dispatcher = ..... // set the dispatcher
}

Or we can set it in the ActorRef

actorRef.dispatcher = dispatcher

There are different kind of dispatchers

  • Thread-based
  • Event-based
  • Priority event-based
  • Work-stealing

Thread-based
It binds dedicated OS thread to each Actor. The messages are posted to LinkedBlockingQueue which feeds messages to dispatcher one by one. It has worst performance and scalability. We also cannot share it among actors. Although Actors do not block for threads in this case.
Code example

class EchoActor extends Actor {
     self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
     ....
}

Event-based
The ExecutorBasedEventDrivenDispatcher binds a set of Actors to a thread pool backed up by a BlockingQueue. The dispatcher must be shared among Actors. This dispatcher is highly configurable and here we can specify things like ‘type of queue’, ‘max items’ , ‘rejection-policy’.
Code example

class EchoActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(name)
 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
 .setCorePoolSize(16)
 .setMaxPoolSize(128)
 .setKeepAliveTimeInMillis(60000)
 .setRejectionPolicy(new CallerRunsPolicy)
 .build
 ....
}

Priority event-based
It is meant for handling messages when priorities are assigned to messages. It is done by using PriorityExecutorBasedEventDrivenDispatcher. It requires a PriorityGenerator as an attribute in its constructor.

Let’s look at an example where we have a PriorityExecutorBasedEventDrivenDispatcher used for a group of messages fired on an actor.

package com.meetu.akka.dispatcher

import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.dispatch.PriorityExecutorBasedEventDrivenDispatcher
import akka.dispatch.PriorityGenerator

object PriorityDispatcherExample extends App {
  val actor = Actor.actorOf(
    new Actor {
      def receive = {
        case x => println(x)
      }
    })

  val priority = PriorityGenerator {
    case "high priority" => 0
    case "low priority" => 100
    case _ => 50
  }

  actor.dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("foo", priority)

  actor.start
  actor.dispatcher.suspend(actor)

  actor ! "low priority"
  actor ! "others"
  actor ! "low priority"
  actor ! "high priority"

  actor.dispatcher.resume(actor)
  Actor.registry.shutdownAll
}

Read the rest of this entry »

Written by Meetu Maltiar

September 24, 2011 at 19:59

Posted in Akka, Scala

Tagged with ,

Manage Akka Actors With Supervisors

leave a comment »


We are building a concurrent and massively scalable system using Akka framework. Akka uses light-weight Actor paradigm of managing concurrency.

Most of the times a bunch of Akka actors are required for a common functionality; therefore we do tend to manage them as a group.

This is where Akka Supervisor comes into the picture. Akka Supervisors are responsible for starting, stopping and monitoring child Akka actors.

Lets look now how to use supervisor over Akka Actors. Just to keep things simple we have an Actor which simply echoes the message it receives. Here is the code listing for EchoActor:

/**
 * An Actor that echoes everything you send to it
 */
class EchoActor extends Actor {
  def receive = {
    case msg => {
      println(msg)
    }
  }
}

We will build supervision on a bunch of EchoActor, lets say our supervisor is EchoActorSupervisor. But before we actually look at its code listing we understand its constituent elements first.

The Supervisor first require SupervisorConfig. This SupervisorConfig in turn requires fault handling strategy for the group of actors. They are:

  • AllForOneStrategy
  • OneForOneStrategy

The AllForOne fault handler will restart all the components and OneForOneStrategy fault handler will restart just the component which died. Fault Handler takes as parameter a List of Exceptions which will be handled, maximum number of restart tries and within-time in millis

Lets have a look how does our SupervisorConfig looks like:

...
val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil)
...

Supervisor can now be created by passing in SupervisorConfig created in previous listing:

...
val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil)
val supervisor = Supervisor(supervisorConfig)
...

now we can link and unlink actors to the supervisor. In our case since we are using EchoActor we can link two Actor Reference to the supervisor. In the code below we are linking and starting two EchoActor.

...
val echoActor1 = actorOf[EchoActor]
val echoActor2 = actorOf[EchoActor]

val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil)
val supervisor = Supervisor(supervisorConfig)

supervisor.link(echoActor1)
supervisor.link(echoActor2)

echoActor1.start
echoActor2.start
...

Read the rest of this entry »

Written by Meetu Maltiar

September 21, 2011 at 11:45

Posted in Akka, Scala

Tagged with ,

Starting Akka Project With SBT 0.10

leave a comment »


I was starting with Akka project with SBT but found that the latest SBT is quite different from before.

I tried to create AKKA project with latest SBT but got stuck. Old SBT used to ask to create a new project in case it did not find any in the directory. With new SBT it is not the case. If you want to know how to go about creating new Akka project with SBT read on.

After installing SBT if we type in sbt in the command prompt in an empty directory, this is what we are likely to see.

In order to create project execute the following commands in the sbt session.

> set name := "AkkaQuickStart"
> set version := "1.0"
> set scalaVersion := "2.9.0-1"
> session save
> exit

We should get the following output if we type the above mentioned commands in sbt session.

SBT creates files and directories when we executed the commands. It creates build.sbt and it contains the same values we typed in sbt session. Other directories like target and project are of little consequence to us.

Project directory will become important later when we will try to add sbteclipse plugin. My project directory contains the following subdirectories and files.

Read the rest of this entry »

Written by Meetu Maltiar

August 29, 2011 at 10:53

Posted in Akka, Scala

Tagged with , ,