Meetu Maltiar's Blog

Meetu's thoughts on technology and software development

Working With ZeroMQ Module In AKKA 2.0

leave a comment »


Akka framework supports ZeroMQ in its ZeroMQ extensions module. Akka is a great framework and we have our product built on its version 1.3x. We were looking at ZeroMQ support in our product and we stumbled on this issue. We found that ZeroMQ applications built on Akka extensions are running slower than an application built in plain scala.

Please read about our discussion thread on Akka user list. The issue is now accepted issue no #1963 you can view it here in assembla. The bug is now resolved great work by typesafe guys especially Roland Kuhn. This blog will deal with workaround till it is pushed in the new release. The workaround has a caveat that the Subscriber hangs on to the thread that created it. It is like the pinned dispatcher with allocated thread to the subscriber. Please look at the comment below.

I will create two simple PUB/SUB application one by using Akka based extension and one without it and then measure their throughput in order to compare their performance.

Let’s start the project by creating a sbt based project. Here is my build.sbt. It contains Akka 2.0 dependencies and that of ZeroMQ scala binding.

name := "Akka2Bench"

version := "1.0"

scalaVersion := "2.9.1"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
	"com.typesafe.akka" % "akka-actor" % "2.0",
	"com.typesafe.akka" % "akka-remote" % "2.0",
	"com.typesafe.akka" % "akka-zeromq" % "2.0",
	"com.typesafe.akka" % "akka-testkit" % "2.0",
	"com.typesafe.akka" % "akka-kernel" % "2.0"
)

libraryDependencies += "org.zeromq" %% "zeromq-scala-binding" % "0.0.5"

I use eclipse IDE so here is the entry for sbteclipse plugin in my project/plugin.sbt

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.0.0")

Let’s start with the Publisher. It will create a publisher socket and then bind to it. But we want it to be wrapped in Akka based Actor. I will use its lifecycle method preStart to create publisher socket and bind to it. Here is the code.

class Pub extends Actor {
  var contextZMQ: ZMQ.Context = null
  var publisherSocket: ZMQ.Socket = null

  override def preStart = {
    contextZMQ = ZMQ.context(1)
    publisherSocket = contextZMQ.socket(ZMQ.PUB)
    publisherSocket.bind("tcp://" + "127.0.0.1" + ":" + "1234")
    Thread.sleep(1000)
  }

  def receive = {
    case msg: String =>
      publisherSocket.send(msg.getBytes, 0)
  }
}

When we fire message to publisher; the receive block becomes active and message is sent on to publisher socket.

We also need a Subscriber that reads messages published by the publisher. Subscriber also needs to be an akka actor. Subscriber will create a Subscriber socket and then do a connect followed by subscribing to it. We will add this code in preStart lifecycle method. In the receive block it calls a recv method on the socket created; it is then capable of receiving messages from the publisher. Here is the code.

class Sub extends Actor {
  var contextZMQ: ZMQ.Context = null
  var subscriberSocket: ZMQ.Socket = null

  override def preStart = {
    contextZMQ = ZMQ.context(1)
    subscriberSocket = contextZMQ.socket(ZMQ.SUB)
    subscriberSocket.connect("tcp://" + "127.0.0.1" + ":" + "1234")
    subscriberSocket.subscribe("".getBytes())
    Thread.sleep(1000)
  }

  def receive = {
    case msg => handleMessages
  }

  private def handleMessages = {
    while (true) {
      val request = subscriberSocket.recv(0)
      val data = new String(request)
    }
  }
}

We wanted to measure its throughput so lets have a DiagnosticsActor which simply mentions throughput and elapsed time after all messages are processed by Subscriber. Number of messages are in ZMQWithoutAkkaExtensionApp object.

ZMQWithoutAkkaExtensionApp instantiates Publisher, Subscriber and DiagnosticsActor. It then loops on number of messages and sends messages to publisher. The publisher then in its receive block writes on the socket. Here is the code for DiagnosticsActor and ZMQWithoutAkkaExtensionApp.

object ZMQWithoutAkkaExtensionApp extends App {
  val numberOfMessages = 1000000
  val system = ActorSystem("zmq")
  val diagnostics = system.actorOf(Props[DiagnosticsActor])
  val publisher = system.actorOf(Props[Pub])
  val subscriber = system.actorOf(Props[Sub])
  subscriber ! "start"
  (1 to numberOfMessages).par.foreach { x: Int => publisher ! "hello"}
}

class Pub extends Actor {
  var contextZMQ: ZMQ.Context = null
  var publisherSocket: ZMQ.Socket = null

  override def preStart = {
    contextZMQ = ZMQ.context(1)
    publisherSocket = contextZMQ.socket(ZMQ.PUB)
    publisherSocket.bind("tcp://" + "127.0.0.1" + ":" + "1234")
    Thread.sleep(1000)
  }

  def receive = {
    case msg: String =>
      publisherSocket.send(msg.getBytes, 0)
  }
}

class DiagnosticsActor extends Actor {
  var startTime = 0.0
  var counter = 0
  def receive = {
    case msg =>
      counter = counter + 1
      if (counter == 1) startTime = System.currentTimeMillis
      if (counter == ZMQWithoutAkkaExtensionApp.numberOfMessages) {
        val elapsedTime = System.currentTimeMillis - startTime
        val throughput = (ZMQWithoutAkkaExtensionApp.numberOfMessages.toDouble * 1000.0) / elapsedTime
        println("Elapsed Time millis: " + elapsedTime)
        println("Throughput msgs/sec: " + throughput)
      }
  }
}

Here is the complete code listing of the Application:

import org.zeromq.ZMQ

import akka.actor.{ Actor, ActorSystem, Props }

object ZMQWithoutAkkaExtensionApp extends App {
  val numberOfMessages = 1000000
  val system = ActorSystem("zmq")
  val diagnostics = system.actorOf(Props[DiagnosticsActor])
  val publisher = system.actorOf(Props[Pub])
  val subscriber = system.actorOf(Props[Sub])
  subscriber ! "start"
  (1 to numberOfMessages).par.foreach { x: Int => publisher ! "hello"}
}

class Pub extends Actor {
  var contextZMQ: ZMQ.Context = null
  var publisherSocket: ZMQ.Socket = null

  override def preStart = {
    contextZMQ = ZMQ.context(1)
    publisherSocket = contextZMQ.socket(ZMQ.PUB)
    publisherSocket.bind("tcp://" + "127.0.0.1" + ":" + "1234")
    Thread.sleep(1000)
  }

  def receive = {
    case msg: String =>
      publisherSocket.send(msg.getBytes, 0)
  }
}

class Sub extends Actor {
  var contextZMQ: ZMQ.Context = null
  var subscriberSocket: ZMQ.Socket = null

  override def preStart = {
    contextZMQ = ZMQ.context(1)
    subscriberSocket = contextZMQ.socket(ZMQ.SUB)
    subscriberSocket.connect("tcp://" + "127.0.0.1" + ":" + "1234")
    subscriberSocket.subscribe("".getBytes())
    Thread.sleep(1000)
  }

  def receive = {
    case msg => handleMessages
  }

  private def handleMessages = {
    while (true) {
      val request = subscriberSocket.recv(0)
      val data = new String(request)
      ZMQWithoutAkkaExtensionApp.diagnostics ! data
    }
  }
}

class DiagnosticsActor extends Actor {
  var startTime = 0.0
  var counter = 0
  def receive = {
    case msg =>
      counter = counter + 1
      if (counter == 1) startTime = System.currentTimeMillis
      if (counter == ZMQWithoutAkkaExtensionApp.numberOfMessages) {
        val elapsedTime = System.currentTimeMillis - startTime
        val throughput = (ZMQWithoutAkkaExtensionApp.numberOfMessages.toDouble * 1000.0) / elapsedTime
        println("Elapsed Time millis: " + elapsedTime)
        println("Throughput msgs/sec: " + throughput)
      }
  }
}

Here is the Akka extension based version for ZeroMQ. Please read some documentation here if you find following the code difficult. Here, we have ZMQApplication: it starts publisher, subscriber and diagnostics as before and fires messages to publisher socket. Here is the complete code listing.

import akka.zeromq._
import akka.actor.{ ActorSystem, Actor, Props }

object ZMQApplication extends App {
  val numberOfMessages = 100000
  val system = ActorSystem("zmq")
  val diagnostics = system.actorOf(Props[Diagnostics])
  val pubSocket = ZeroMQExtension(system).newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1234"))
  val subSocket = ZeroMQExtension(system).newSocket(SocketType.Sub, Listener(system.actorOf(Props[Receiver])), Connect("tcp://127.0.0.1:1234"), Subscribe("foo.bar"))
  Thread.sleep(1000)
  (1 to numberOfMessages) foreach { x: Int => pubSocket ! ZMQMessage(Seq(Frame("foo.bar"), Frame("hello"))) }
}

class Receiver extends Actor {
  var startTime = 0.0
  var counter = 0
  def receive = {
    case Connecting =>
    case m: ZMQMessage => ZMQApplication.diagnostics ! "done"
    case _ =>
  }
}

class Diagnostics extends Actor {
  var startTime = 0.0
  var counter = 0
  def receive = {
    case msg =>
      counter = counter + 1
      if (counter == 1) startTime = System.currentTimeMillis
      if (counter == ZMQApplication.numberOfMessages) {
        val elapsedTime = System.currentTimeMillis - startTime
        val throughput = (ZMQApplication.numberOfMessages.toDouble * 1000.0) / elapsedTime
        println("Elapsed Time millis: " + elapsedTime)
        println("Throughput msgs/sec: " + throughput)
      }
  }
}

Complete code listing is on our github. It is a sbt based project. After you have the code on your box descend in the directory and execute following commands.

sbt "run-main com.knoldus.akka.bench.zmq.ZMQWithoutAkkaExtensionApp"

and

sbt "run-main com.knoldus.akka.bench.zmq.ZMQApplication"

here are my runs for both versions:

There is a significant difference in throughput as you will notice. The Akka extension based version is giving me around 3300 msgs/sec and the one without around 18,300 msgs/sec.

Written by Meetu Maltiar

April 4, 2012 at 10:20

Posted in Akka

Tagged with ,

Leave a comment