λ Tony's blog λ
The weblog of Tony Morris

Actor concurrency for Java

Posted on July 25, 2008, in Programming

Functional Java 2.8 contains a concurrency API that implements Actors as seen in Erlang and Scala. This allows a user to take advantage of multiple core machines in their Java code.

Runar has written articles explain how to use the API – it’s pretty darn easy for a client – just don’t look under the hood ;)

Here is an example of a parallel fibonacci that uses a few hundred virtual threads (unlike the ping/pong example that uses… wait for it… millions of virtual threads!). On my quad-core machine, the fibonacci computation speeds up by about 6 times (45 seconds serially to about 7 seconds when using actors).

The example uses Java 7 BGGA syntax (imports omitted) and after compilation, runs fine on any 1.5 JVM. This example is also available with Java 1.5 source code in the Functional Java release.

/**
 * Parallel Fibonacci numbers.
 * Based on a Haskell example by Don Stewart.
 * Author: Runar
 */
public class Fibs {

  private static final int CUTOFF = 35;

  public static void main(final String[] args) throws Exception {
    if (args.length < 1)
      throw error("This program takes an argument: number_of_threads");

    final int threads = Integer.parseInt(args[0]);
    final ExecutorService pool = Executors.newFixedThreadPool(threads);
    final Strategy<unit> su = Strategy.executorStrategy(pool);
    final Strategy<Promise<integer>> spi = Strategy.executorStrategy(pool);

    final Actor<List<integer>> out = actor(su, { List<integer> fs => {
      int i = 0;
      for (List<integer> ns = fs; ns.isNotEmpty(); ns = ns.tail()) {
        System.out.println(MessageFormat.format("n={0}=>{1}", i, ns.head()));
        i++;
      }
      pool.shutdown();
    }});

    System.out.println("Calculating Fibonacci sequence in parallel...");

    final F<Integer, Promise<integer>> fib = { Integer n => (n < CUTOFF) ?
        promise(su, P.p(serialFib(n))) :
        fib.f(n - 1).bind(join(su, P1.curry(fib).f(n - 2)), { int a => { int b => a + b }} ) };

    join(su, fmap(Promise.<integer>sequence(su)).f(spi.parMap(fib).f(range(0, 46)))).to(out);
  }

  public static int serialFib(final int n) {
    if (n < 2)
      return n;
    else return serialFib(n - 1) + serialFib(n - 2);
  }
}