Sunday, June 07, 2009

Scala Concurrency

I got to see Scala creator Martin Odersy speak twice last week. First time was at the June BASE meeting, and the second was at the Scala LiftOff. At both meetings, he stated that he wants Scala to be the best language for concurrent programming. He also mentioned two types of concurrency. One is the type you hear about most often, where processes work in parallel and need to simultaneously affect some shared data or global state. This is the kind of problem that becomes hard to deal with using primitives common in Java and C++, but can be a little easier using Actors in Scala. The other kind is where a big process needs to be split up into smaller ones that can be run in parallel. It wasn't clear to me if Actors were a good fit for this or not, or maybe improvement is needed. So I decided to experiment with this.

I took one of the algorithms I wrote for my JavaOne talk on language performance, and I used Actors to do some processing parallel. In my original "driver" code, I took a list of files and sequentially passed each file to the sort class. In the new code, I created an Actor to pass the file to, and the Actor then fanned out the processing to an internal list of Actors that could sort the words in the file. When sorter Actors finish, they send back the results to the "master" Actor. Here is the code:

// messages
case class FileToSort(fileName:String)
case class SortResults(fileName:String, words:List[String])

class WordSortActor(master:Actor) extends Actor{
start
def sortedWords(fileName:String) = {
val dataFile = new File(fileName)
var words:List[String] = Nil
Source.fromFile(dataFile).getLines.foreach(
_.split(" ").foreach(words ::= _))
words.sort(_.toLowerCase < _.toLowerCase)
}
def act {
loop {
react {
case FileToSort(fileName:String) => {
println("Sorting file = " + fileName +
" on thread " + Thread.currentThread.getName)
val words = sortedWords(fileName)
master ! SortResults(fileName,words)
// why doesn't sender work instead?
//sender ! SortResults(fileName, words)
}
}
}
}
}

class SortMaster(docRoot:String, numActors:Int) extends Actor{
private
var sorted : List[List[String]] = Nil
var workers:List[ScalaWordSort] = Nil
var latch:CountDownLatch = null
start

def beginSorting {
workers = (for (i <- 0 until numActors)
yield new WordSortActor(this)).toList
val dir = new File(docRoot)
var cnt = 0
dir.list.foreach(file =>{
workers(cnt % numActors) ! FileToSort(docRoot + file)
cnt += 1
})
latch = new CountDownLatch(cnt)
}

def waitUntilDone = latch.await

def act {
loop {
react {
case SortResults(fileName:String, words:List[String]) => {
sorted ::= words
latch.countDown
println("Received results for file=" + fileName)
}
}
}
}
}


So I don't know if this is a good use of Actors or not. Seems like you might need to tune the number of threads in the thread pool sitting behind the pool of Actors. It seems like the pattern might be useful. It takes advantage of Actors for combining the results done in parallel. The results of each sort are sent back to the master, who could then do useful things. In this example, it only dumps the results into a list, but you could imagine some kind of coordination, like adding to a map, that would require locking in a typical Java implementation.

One other strange thing, in the WordSortActor, I tried to send the results back to the sender, but this did not seem to work. I had to keep an explicit reference to the master Actor, and then it worked fine. One other annoying thing that came out of this little exercise ... I discovered how hard it is to debug Actors. I was using IntelliJ, and it was completely useless to set a break point in an Actor. This is not the case with Java threads. I did not try Eclipse or NetBeans.

8 comments:

Václav Pech said...

This is a very interesting example of actor use, in my opinion.

Regarding the issue with sending replies, although I'm not familiar with the implementation details of Scala actors, I'd guess it is because you don't send the FileToSort messages from within the SortMaster's act method (meaning the SortMaster's thread), but from an external thread, which happens to invoke the beginSorting() method.

Václav Pech said...

For those interested in Groovy, I've created an identical program using GParallelizer. Check it out at http://code.google.com/p/gparallelizer/wiki/ActorsExamples?ts=1244481738&updated=ActorsExamples
Thank you, Michael, for inspiration.

Ilya said...

Michael,

What version of SCala-plugin for Intellij did you use? The bug with breakpoints you've mentioned seems to be related to the wrong treatment of compiled partial functions and it was fixed a couple of versions ago.

Ilya

Michael Galpin said...

You are right Ilya. My IntelliJ Scala plugin was old. I updated, and now can debug actors in various threads! Still not sure why sending the response to sender() is not working. With the debugger I can tell it is an ActorProxy. I can send it a message, the messages goes in its mailbox, but there it stays. Anyways, sorry for any confusion and thanks for the help.

jkwatson said...

I've always thought that "actors as threads" can't ever really scale, since Java threads are so heavy-weight. Do scala actors support an ultra-light-weight erlang style process model?

Václav Pech said...

Of course, Scala as well as GParallelizer actors detach actors from threads and so you can have many more actors than actual threads.
In fact the example given in the original post can be well served with a single thread, although it allows for virtually unlimited number of actors to participate in the algorithm.

Václav Pech said...

Michael,

I have your example working just fine with direct replies. The important info is to call beginSorting() from SortMaster's act() method, not from the main thread, otherwise your replies go to the main thread and not to the SortMaster.

Generic Cialis said...

Scala is very complicated if you don't use Buy Cialis to stay erect all day long and do this job!