Archive for the ‘Scala’ Category
Bojug Meetup: Getting Started With Scala
I presented this session on Getting Started With Scala at Bangalore Java Open Users Group. There were three presentations that day:
- JDK 1.8 by Bhavana Hindupur from Goldmansachs
- Java Concurrency package by Vaibhav Choudhary and Srinivasan Raghavan from Oracle
- Getting Started With Scala by me from Cisco
Sessions were well received and they were interactive. Thanks to the organisers for such a nice meet-up. Special thanks to Vineet Reynolds for helping me out that day.
My talk was meant for developers who wanted to get started on Scala. Sole objective was to get people started on creating a project in Scala, write some code using collections and test it using ScalaTest. Below are the slides and here are code samples on my github.
FitNesse With Scala
This time I gave a knowledge session on FitNesse. We have been using it for BDD and acceptance tests for projects at Knoldus. FitNesse is excellent for increasing collaboration between developers, testers and customers.
Making FitNesse work with Scala does not require any special configuration. It works out of the box. In this presentation you will learn using FitNesse for a Scala project with a DoFixture example. All code for this session is at Meetu’s github.
Akka Futures In Scala With A Simple Example
We use Akka Futures extensively for a product built to handle huge load of streaming data. We have been early adopters of Akka and been using it right from its Akka 1.1.X days. We found Futures as simple yet powerful tool to add concurrent behavior in our application.
This post emphasizes importance of Akka Futures with a short and easy to understand example application. Please read excellent documentation on Akka Futures for more details.
Ok, now lets look at a simple example. Lets say we have an identity function. We sleep in it for a while to replicate long running nature. Here is the Scala code for the method.
def timeTakingIdentityFunction(number: Int) = { // we sleep for 3 seconds and return number Thread.sleep(3000) number }
Suppose, we have an application in which we call timeTakingIdentityFunction method three times, gather their return in three variables and execute sum on them. Here is the code listing.
object SumApplication extends App { val startTime = System.currentTimeMillis val number1 = timeTakingIdentityFunction(1) val number2 = timeTakingIdentityFunction(2) val number3 = timeTakingIdentityFunction(3) val sum = number1 + number2 + number3 val elapsedTime = ((System.currentTimeMillis - startTime) / 1000.0) println("Sum of 1, 2 and 3 is " + sum + " calculated in " + elapsedTime + " seconds") def timeTakingIdentityFunction(number: Int) = { // we sleep for 3 seconds and return number Thread.sleep(3000) number } }
When we run this we will get around nine seconds to execute it. But what will happen when we do not block on timeTakingIdentityFunction?
Using Akka Futures we can do exactly that. When we do not block on timeTakingIdentityFunction and continue processing, we get performance boost. Now lets look at the Akka Futures version.
We can use Future directly, we wrap method call in a Future. In our example, we can do this in code.
val timeTakingIdentityFunctionFuture = Future(timeTakingIdentityFunction(1))
Since we are making three calls on timeTakingIdentityFunction. We can collect Future[Int] in three variables i.e. future1, future2, future3
val future1 = Future(timeTakingIdentityFunction(1)) val future2 = Future(timeTakingIdentityFunction(2)) val future3 = Future(timeTakingIdentityFunction(3))
Futures are monadic. We can compose them using for expressions. The future obtained by composing them can be used for calculating sum.
We will use onSuccess callback for this. Here is the code snippet.
val future = for { x <- future1 y <- future2 z <- future3 } yield (x + y + z) future onSuccess { case sum => val elapsedTime = ((System.currentTimeMillis - startTime) / 1000.0) println("Sum of 1, 2 and 3 is " + sum + " calculated in " + elapsedTime + " seconds") }
We composed a new Future out of three. The new composed Future is of type Future[Int]. We issued a callback to gather sum of three numbers. Here is the complete example.
import akka.dispatch.Future import akka.actor.ActorSystem object SumApplicationWithFutures extends App { implicit val system = ActorSystem("future") val startTime = System.currentTimeMillis val future1 = Future(timeTakingIdentityFunction(1)) val future2 = Future(timeTakingIdentityFunction(2)) val future3 = Future(timeTakingIdentityFunction(3)) val future = for { x <- future1 y <- future2 z <- future3 } yield (x + y + z) future onSuccess { case sum => val elapsedTime = ((System.currentTimeMillis - startTime) / 1000.0) println("Sum of 1, 2 and 3 is " + sum + " calculated in " + elapsedTime + " seconds") } def timeTakingIdentityFunction(number: Int) = { // we sleep for 3 seconds and return number Thread.sleep(3000) number } }
Only thing we have added here is implicit ActorSystem. Future requires an execution context. If ActorSystem is in implicit scope we are fine.
When we run Future version of the application it runs in about three seconds!!
Akka Futures can add benefit by adding concurrency in our applications. For example we might need to make three blocking Rest API calls to achieve a functionality. Depending on our use case we can avoid issuing blocking calls, but rather use Futures.
We used Futures directly in our example application. Since Futures are monadic we composed them using for expressions. We also used callback method to collect the result. And using them we got three times improvement in our code. Most importantly code is simple, concise and expressive.
There is more concise way to do this. We can construct List of Future[Int] and call Future.sequence to create or compose a single Future of List[Int]. Code is left as an exercise 🙂
Latest version of Akka 2.1.0 for Scala 2.10 has unified version of Future in Scala. This post pertains to Akka 2.0.5 version, concepts are still the same.
Simple Data Structures In Scala
This session was presented at Knoldus knolx session. We went about discussing data structures. I talked about simple data structures like Queue and Binary Search Tree and their possible implementation in Scala. The idea was to know about a bit of Functional data structures and their implementation in Scala. I then discussed Binary Search Trees and their traversals.
Here is the presentation.
Working With Elasticsearch In Scala
Elasticsearch is an open-source, restful, distributed, search engine built on top of apache-lucene. You can read it more on their website.
We have an application built in scala with sbt as our build tool. Now we required to have a search capability on the output created by the application. We chose to use elasticsearch for our application.
In this post, we will learn to use elasticsearch java api in Scala. The scenario will be that we will index a json to elasticsearch. Then search the json document in elasticsearch in order to validate that it got indexed.
Let’s start by installing it. Installing Elasicsearch is simple, download it from link and unzip it. To run elasticsearch descend to unzipped directory and execute:
elasticsearch -f
“-f” tells elasticsearch to run in foreground. So, we will be able to see logs here.
Knolx Session: Category Theory In Scala
This session was presented at Knoldus knowledge session. There is a wonderful post on Category Theory by Debashish Ghosh. This presentation is inspired by his post here.
Here is the presentation.
Knolx Sesion: Akka 2.0 Reloaded
Akka allows us to write concurrent, fault tolerant and scalable applications. We recently migrated our product from Akka 1.3x to Akka 2.x. The new version is quite different from 1.3x versions. It is not merely an API change but an overall change. We have to also think differently to develop applications. This session was presented at Knolx Session at Knoldus. This talk gently introduces Akka 2.0 with simple and easy to run examples.
The presentation is inspired from Viktor Klang’s talk at NE Scala symposium.
My Knolx Session: Introducing Scala
This talk was given by me at Knoldus. At Knoldus we organize KnolX sessions so that our learning spreads in our organisation. This KnolX session had Scala flavor written all over it. We believe in experiential learning. This session has code examples which Knolders tried while I was explaining it.
I combined sbt, Scala, Scala collections and ScalaTest together in this presentation. The objective was that by the end of Knolx session we can create a project using SBT, write Scala code in it and finally test it. I made simple easy to understand examples which guys could play with.
Here is the presentation I gave. It has an assignment at the end which you can try.
Using Akka Dispatchers In Scala
Akka dispatchers are extremely important in Akka framework. They are directly responsible for optimal performance, throughput and scalability.
Akka supports dispatchers for both event-driven lightweight threads and thread-based Actors. For thread-based Actors each dispatcher is bound to a dedicated OS thread.
Default dispatcher is a single event-based dispatcher for all Actors created. The dispatcher used is this one:
Dispatchers.globalExecutorBasedEventDrivenDispatcher
For many cases it becomes mandatory to group Actors together for a dedicated dispatcher, then we can override the defaults and define our own dispatcher.
Setting the Dispatcher
Normally we set the dispatcher in Actor itself
class EchoActor extends Actor { self.dispatcher = ..... // set the dispatcher }
Or we can set it in the ActorRef
actorRef.dispatcher = dispatcher
There are different kind of dispatchers
- Thread-based
- Event-based
- Priority event-based
- Work-stealing
Thread-based
It binds dedicated OS thread to each Actor. The messages are posted to LinkedBlockingQueue which feeds messages to dispatcher one by one. It has worst performance and scalability. We also cannot share it among actors. Although Actors do not block for threads in this case.
Code example
class EchoActor extends Actor { self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) .... }
Event-based
The ExecutorBasedEventDrivenDispatcher binds a set of Actors to a thread pool backed up by a BlockingQueue. The dispatcher must be shared among Actors. This dispatcher is highly configurable and here we can specify things like ‘type of queue’, ‘max items’ , ‘rejection-policy’.
Code example
class EchoActor extends Actor { self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(name) .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) .setCorePoolSize(16) .setMaxPoolSize(128) .setKeepAliveTimeInMillis(60000) .setRejectionPolicy(new CallerRunsPolicy) .build .... }
Priority event-based
It is meant for handling messages when priorities are assigned to messages. It is done by using PriorityExecutorBasedEventDrivenDispatcher. It requires a PriorityGenerator as an attribute in its constructor.
Let’s look at an example where we have a PriorityExecutorBasedEventDrivenDispatcher used for a group of messages fired on an actor.
package com.meetu.akka.dispatcher import akka.actor.Actor.actorOf import akka.actor.Actor import akka.dispatch.PriorityExecutorBasedEventDrivenDispatcher import akka.dispatch.PriorityGenerator object PriorityDispatcherExample extends App { val actor = Actor.actorOf( new Actor { def receive = { case x => println(x) } }) val priority = PriorityGenerator { case "high priority" => 0 case "low priority" => 100 case _ => 50 } actor.dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("foo", priority) actor.start actor.dispatcher.suspend(actor) actor ! "low priority" actor ! "others" actor ! "low priority" actor ! "high priority" actor.dispatcher.resume(actor) Actor.registry.shutdownAll }
Manage Akka Actors With Supervisors
We are building a concurrent and massively scalable system using Akka framework. Akka uses light-weight Actor paradigm of managing concurrency.
Most of the times a bunch of Akka actors are required for a common functionality; therefore we do tend to manage them as a group.
This is where Akka Supervisor comes into the picture. Akka Supervisors are responsible for starting, stopping and monitoring child Akka actors.
Lets look now how to use supervisor over Akka Actors. Just to keep things simple we have an Actor which simply echoes the message it receives. Here is the code listing for EchoActor:
/** * An Actor that echoes everything you send to it */ class EchoActor extends Actor { def receive = { case msg => { println(msg) } } }
We will build supervision on a bunch of EchoActor, lets say our supervisor is EchoActorSupervisor. But before we actually look at its code listing we understand its constituent elements first.
The Supervisor first require SupervisorConfig. This SupervisorConfig in turn requires fault handling strategy for the group of actors. They are:
- AllForOneStrategy
- OneForOneStrategy
The AllForOne fault handler will restart all the components and OneForOneStrategy fault handler will restart just the component which died. Fault Handler takes as parameter a List of Exceptions which will be handled, maximum number of restart tries and within-time in millis
Lets have a look how does our SupervisorConfig looks like:
... val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) ...
Supervisor can now be created by passing in SupervisorConfig created in previous listing:
... val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) val supervisor = Supervisor(supervisorConfig) ...
now we can link and unlink actors to the supervisor. In our case since we are using EchoActor we can link two Actor Reference to the supervisor. In the code below we are linking and starting two EchoActor.
... val echoActor1 = actorOf[EchoActor] val echoActor2 = actorOf[EchoActor] val supervisorConfig = SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 500), Nil) val supervisor = Supervisor(supervisorConfig) supervisor.link(echoActor1) supervisor.link(echoActor2) echoActor1.start echoActor2.start ...