JSR-166: The Java fork/join Framework

The JSR-166 are concurrent utilities that were included in Java 5.  The fork/join framework was a piece of it that didn’t make it into Java 5.  After all this time the fork/join framework is finally making it into JDK 7.  What surprised me about the framework is that it is so easy to use.

The fork/join framework is designed to make divide-and-conquer algorithms easy to parallelize.  More specifically, recursive algorithms where the control path branches out over a few paths and they each process an equal part of the data set.  The typical setup is a new class is created that extends either the RecursiveAction or RecursiveTask class.  The parameters that were sent into the recursive function become member variables in the newly defined class.  Then the recursive calls are replaced by invokeAll(…) rather than the calls to the function itself.

In writing this post, I kept going back for forth on whether I should use Fibonacci numbers as an example or something with more meat to it.  The computations done by each recursive call of a Fibonacci numbers algorithm is too small to matter, not only that, but there are much better non-parallel algorithms for Fibonacci numbers.  In the end, I decided on showing a merge sort.  It is used as the example in the fork/join documentation, but this will be a more complete example showing both the sequential algorithm and the changes made for the parallel version of the algorithm.  You’ll see that it’s not that hard.

First let me start by showing the source code for a typical MergeSort:

public class MergeSort {
 
    private static final int SIZE_THRESHOLD = 16;
 
    public static void sort(Comparable[] a) {
        sort(a, 0, a.length-1);
    }
 
    public static void sort(Comparable[] a, int lo, int hi) {
        if (hi - lo < SIZE_THRESHOLD) {
            insertionsort(a, lo, hi);
            return;
        }
 
        Comparable[] tmp = new Comparable[((hi - lo) / 2) + 1];
        mergeSort(a, tmp, lo, hi);
    }
 
    private static void mergeSort(Comparable[] a, Comparable[] tmp, int lo, int hi) {
        if (hi - lo < SIZE_THRESHOLD) {
            insertionsort(a, lo, hi);
            return;
        }
 
        int m = (lo + hi) / 2;
        mergeSort(a, tmp, lo, m);
        mergeSort(a, tmp, m + 1, hi);
        merge(a, tmp, lo, m, hi);
    }
 
    private static void merge(Comparable[] a, Comparable[] b, int lo, int m, int hi) {
        if (a[m].compareTo(a[m+1]) <= 0)
            return;
 
        System.arraycopy(a, lo, b, 0, m-lo+1);
 
        int i = 0;
        int j = m+1;
        int k = lo;
 
        // copy back next-greatest element at each time
        while (k < j && j <= hi) {
            if (b[i].compareTo(a[j]) <= 0) {
                a[k++] = b[i++];
            } else {
                a[k++] = a[j++];
            }
        }
 
        // copy back remaining elements of first half (if any)
        System.arraycopy(b, i, a, k, j-k);
 
    }
 
    private static void insertionsort(Comparable[] a, int lo, int hi) {
        for (int i = lo+1; i <= hi; i++) {
            int j = i;
            Comparable t = a[j];
            while (j > lo && t.compareTo(a[j - 1]) < 0) {
                a[j] = a[j - 1];
                --j;
            }
            a[j] = t;
        }
    }
}

Now here is the code for the parallel version of MergeSort:

public class ParallelMergeSort {
 
    private static final ForkJoinPool threadPool = new ForkJoinPool();
    private static final int SIZE_THRESHOLD = 16;
 
    public static void sort(Comparable[] a) {
        sort(a, 0, a.length-1);
    }
 
    public static void sort(Comparable[] a, int lo, int hi) {
        if (hi - lo < SIZE_THRESHOLD) {
            insertionsort(a, lo, hi);
            return;
        }
 
        Comparable[] tmp = new Comparable[a.length];
        threadPool.invoke(new SortTask(a, tmp, lo, hi));
    }
 
    /**
     * This class replaces the recursive function that was
     * previously here.
     */
    static class SortTask extends RecursiveAction {
        Comparable[] a;
        Comparable[] tmp;
        int lo, hi;
        public SortTask(Comparable[] a, Comparable[] tmp, int lo, int hi) {
            this.a = a;
            this.lo = lo;
            this.hi = hi;
            this.tmp = tmp;
        }
 
        @Override
        protected void compute() {
            if (hi - lo < SIZE_THRESHOLD) {
                insertionsort(a, lo, hi);
                return;
            }
 
            int m = (lo + hi) / 2;
            // the two recursive calls are replaced by a call to invokeAll
            invokeAll(new SortTask(a, tmp, lo, m), new SortTask(a, tmp, m+1, hi));
            merge(a, tmp, lo, m, hi);
        }
    }
 
    private static void merge(Comparable[] a, Comparable[] b, int lo, int m, int hi) {
        if (a[m].compareTo(a[m+1]) <= 0)
            return;
 
        System.arraycopy(a, lo, b, lo, m-lo+1);
 
        int i = lo;
        int j = m+1;
        int k = lo;
 
        // copy back next-greatest element at each time
        while (k < j && j <= hi) {
            if (b[i].compareTo(a[j]) <= 0) {
                a[k++] = b[i++];
            } else {
                a[k++] = a[j++];
            }
        }
 
        // copy back remaining elements of first half (if any)
        System.arraycopy(b, i, a, k, j-k);
 
    }
 
    private static void insertionsort(Comparable[] a, int lo, int hi) {
        for (int i = lo+1; i <= hi; i++) {
            int j = i;
            Comparable t = a[j];
            while (j > lo && t.compareTo(a[j - 1]) < 0) {
                a[j] = a[j - 1];
                --j;
            }
            a[j] = t;
        }
    }
}

As you can see the majority of the algorithm has remained intact.  As stated above a new class is created that extends RecursiveAction, and the parameters of the function are then passed into that class during creation.  One thing to take note, is that previously only half the size of the original array was created as secondary storage.  Now the entire length of the array is created as a temporary storage.  This is used to avoid different threads needing the same area of the array at the same time.

Changes to the algorithm may be needed, but it definitely helps in making it easier to move to parallel processing.  One other thing to note is the presence of the ForkJoinPool.  The default constructor looks at the processor and determines the appropriate level of parallelism for the task.

I have a quad core CPU, so the ForkJoinPool will spawn at least four threads if necessary. That said, I’ve seen in where only two threads are spawned because more than that was not necessary for the given task. The ForkJoinPool spawns more threads as deemed necessary without starting right at the maximum.

A complete API for the fork/join framework can be found here at the Concurrency JSR-166 Interest Site.  All that is needed for Java 6 is the jsr166y package.

Some other algorithms that are suited for parallelism that I’ve been thinking about are graph searching algorithms such as depth first and breadth first search.  Depending on whether they are done on a tree or a graph determines how much the underlying data structure will need to be changed to support the parallelism.  I plan to look at making a parallel version of the quicksort algorithm using this framework.  Most divide and conquer algorithms can be adapted fairly easily to be multi-threaded using this method, but remember for a performance benefit to be seen the task must be sufficiently large.

Posted on March 9, 2010 at 10:53 pm by Joe · Permalink
In: Java · Tagged with: ,

14 Responses

Subscribe to comments via RSS

  1. Written by Riccardo
    on April 27, 2010 at 12:12 pm
    Permalink

    Good performance, i compared your parallel merge sort to the standard java Arrays.sort() and the time needed is an half with two processors.

    Optimal!

  2. Written by Prabh
    on July 15, 2010 at 5:47 pm
    Permalink

    I wrote one program to check the speed up happens due to ParallelArray predefined sort() method. However, it is running even slower than the sequential algorithm. Am I missing something in my code ? If not, then whats the point having a sort function in ParallelArray class as opposed to Arrays.sort(). Actually Arrays.sort() is even faster than parallel code written using Java 7 invoke() method. Here’s the code:

    import java.util.Arrays;

    import jsr166y.*;
    import extra166y.*;

    public class TestParallelArray {
    public static void main(String[] args) {

    int size = 15000000;
    Integer[] data = new Integer[size];
    int[] temp = new int[size];
    int[] temp2 = new int[size];

    for (int i = 0; i < size; i++) {
    data[i] = new Integer((int)(Math.random()*10000000));
    temp[i] = data[i];
    temp2[i] = data[i];
    }

    ForkJoinPool forkJoinPool = new ForkJoinPool();
    ParallelArray parallelArray = ParallelArray.createFromCopy(data, forkJoinPool);

    TestParallelArray testParallelArray = new TestParallelArray();

    System.out.println(“Sorting now”);
    long start = System.currentTimeMillis();
    testParallelArray.shuttleSort(temp2, temp, 0, size);
    long finish = System.currentTimeMillis();
    System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Using sequential)”);

    start = System.currentTimeMillis();
    forkJoinPool.invoke(testParallelArray.new SortTask(temp2, temp, 0, size));
    finish = System.currentTimeMillis();
    System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Parallel using invoke command)”);

    start = System.currentTimeMillis();
    parallelArray = parallelArray.sort(); //Even if i convert this line to “parallelArray.sort()” clock time almost remains same
    finish = System.currentTimeMillis();
    System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Using Parallel Sort)”);

    start = System.currentTimeMillis();
    Arrays.sort(temp); //Even if i convert this line to “parallelArray.sort()” clock time almost remains same
    finish = System.currentTimeMillis();
    System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Using Arrays.sort())”);

    }

    private void shuttleSort(int[] from, int[] to, int low, int high) {
    if (high – low == 1) {
    return;
    }
    int middle = (low + high) / 2;
    shuttleSort(to, from, low, middle);
    shuttleSort(to, from, middle, high);

    int p = low;
    int q = middle;

    if (high – low >= 4 && from[middle – 1] <= from[middle]) {
    for (int i = low; i < high; i++) {
    to[i] = from[i];
    }
    return;
    }

    for (int i = low; i = high || (p < middle && from[p] = 4 && from[middle – 1] <= from[middle]) {
    for (int i = low; i < high; i++) {
    to[i] = from[i];
    }
    return;
    }

    for (int i = low; i = high || (p < middle && from[p] <= from[q])) {
    to[i] = from[p++];
    }
    else {
    to[i] = from[q++];
    }
    }
    }
    }

    }

    • Written by Joe
      on July 15, 2010 at 6:35 pm
      Permalink

      You’re not doing an apples to apples comparison.

      The problem lies here:
      Integer[] data = new Integer[size];
      int[] temp = new int[size];
      int[] temp2 = new int[size];

      It should be:
      Integer[] data = new Integer[size];
      Integer[] temp = new Integer[size];
      Integer[] temp2 = new Integer[size];

      You’re comparing an array of objects to an array of ints. Java operates faster on primitive types.

      Also, when benchmarking it’s a good idea to take the average of several runs.

  3. Written by Prabh
    on July 15, 2010 at 10:42 pm
    Permalink

    Thanks. It solved the problem

  4. Written by Merijn Vogel
    on August 8, 2011 at 10:19 am
    Permalink

    And sorting may not be the best test.

    When performing CPU-intensive processing on files, then this pattern is very useful to be able to use multiple processors:

    if (files.size() < 500) {
    processFiles(files);
    } else {
    List left = files.subList(0, split);
    List right = files.subList(split, files.size());
    invokeAll( new MRrecursiveAction(left), new MyRecursiveAction(right) );

    }

    • Written by Joe
      on August 8, 2011 at 9:32 pm
      Permalink

      That’s a good point. When looking into these types of articles, I generally look for parallel algorithms. Processing files in parallel seems like something that might be more useful to a wider audience.

  5. Written by Graham Seed
    on August 17, 2011 at 4:43 am
    Permalink

    Hi

    I implemented your sequential merge sort and setup a test that fills an array with random numbers:

    int size = (int)1e8;
    Integer[] iarray = new Integer[size];
    RandomNumber rn = new RandomNumber();
    for (int i=0; i<size; i++)
    {
    int number = (int)rn.numberInRange(0,size);
    iarray[i] = new Integer(number);
    }

    The random number generator is my own and simply returns a random number within a range.

    With the shown size of 1e08 for the array length I get the following exception:

    Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at hedgehog_samples.utility.MergeSortSample.main(MergeSortSample.java:44)

    I was wondering how you would perform such a test? Do I need to allocate a runtime parameter to allocate a larger block of memory?

    If I set the array size to 1e06 then it performs the sort almost immediately and hence to perform a proper test of this sort I need to have much larger array lengths.

    Thanks.

    • Written by Joe
      on August 17, 2011 at 10:32 pm
      Permalink

      Graham,

      I did some experimentation, and it looks like you’ll need to let the Java heap grow to about 3GB. This can be done with the command line option: -Xmx3072M.

      After doing that, it was able to finish allocating all of the integers.

  6. Written by Graham Seed
    on August 18, 2011 at 6:19 am
    Permalink

    Hi Joe

    Thanks for your reply. I added the -Xmx3g command to the vm command-line options in NetBeans and it ran successfully.

    What I don’t get is that the default runtime heap is 64m. It requires 3,000m, so the 100e6 array of Integers requires 2,936m which is equivalent to ~29bytes per element. With each int requiring 4bytes do you know why it requires ~29bytes per element?

    I noticed when compiling your sequential MergeSort that I get unchecked call warnings on all of the calls to compareTo(). I also noticed that you use Comparable without using generics although Comparable is a generic type. Thus, I rewrote the MergeSort class to have declaration:

    public class MergeSort<T extends Comparable>
    {
    //…

    The static method syntax does support generics but the syntax gets very messy so I decided to add type T to the class.

    I replaced every instance of “Comparable” with “T”.

    The only difficulty arose in method sort() with:

    Comparable[] tmp = new Comparable[((hi – lo) / 2) + 1];

    having to be replaced by:

    T[] tmp = (T[])Array.newInstance(a.getClass().getComponentType(), ((hi – lo) / 2) + 1);

    which is still an unchecked cast! And required:

    @SuppressWarnings(“unchecked”)

    before the method declaration.

    Thus, it would appear that there is no escape from the unchecked call/cast!!

    Graham

    • Written by Joe
      on August 19, 2011 at 1:06 am
      Permalink

      It’s probably around 29 bytes per element because of Object overhead. Each Integer is an object and not stored directly in the array. If it is an array of primitive ints, it would take less space and probably be faster.

  7. Written by java blog
    on September 26, 2012 at 5:44 am
    Permalink

    Fork Join framework is best way to get most out of multiple processors available on today’s modern and expensive servers. It can also be used to implement Map reduce pattern. Here is another way of using fork join framework in Java

  8. Written by Lonna
    on July 6, 2014 at 9:32 am
    Permalink

    I see a lot of interesting posts on your blog. You have to spend a
    lot of time writing, i know how to save you a lot of
    work, there is a tool that creates unique, SEO friendly posts in couple of seconds, just type in google – laranita’s
    free content source

  9. Written by weight loss
    on January 7, 2015 at 2:36 am
    Permalink

    Interesting articles you post here, i have shared this post on my facebook

Subscribe to comments via RSS