Preface

Fork join framework is the introduction of java 7 framework, the introduction of this framework is mainly to improve the ability of parallel computing.

Fork join has two main steps, the first is fork, a large task into many small tasks, the second is join, the results of the first task join up to generate the final result. If there is no return value in the first step, join will wait until all the small tasks are finished.

Here we explain the fork join framework in detail from these three aspects.

ForkJoinPool

ForkJoinPool is an implementation of the ExecutorService that provides some convenient management methods for worker threads and thread pools.

1
public class ForkJoinPool extends AbstractExecutorService 

A work thread can only handle one task at a time, but instead of creating a separate thread for each task, ForkJoinPool uses a special data structure double-ended queue to store the tasks.

By default, a work thread takes tasks from the head of the queue assigned to it. If this queue is empty, then this work thread will take tasks from the tail of other task queues to execute, or from the global queue. This design makes full use of the work thread’s performance and improves concurrency.

Here is a look at how to create a ForkJoinPool.

The most common way is to use ForkJoinPool.commonPool() to create it. commonPool() provides a common default thread pool for all ForkJoinTask.

1
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

Another way is to use the constructor.

1
ForkJoinPool forkJoinPool = new ForkJoinPool(2);

The argument here is the parallelism level, and 2 means that the thread pool will use 2 processor cores.

ForkJoinWorkerThread

ForkJoinWorkerThread is the worker thread used in ForkJoinPool.

1
2
public class ForkJoinWorkerThread extends Thread
}

Unlike normal threads it defines two variables.

1
2
    final ForkJoinPool pool; // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

One is the ForkJoinPool to which this worker thread belongs. The other is a Queue that supports work-stealing mechanics.

Look at its run method again.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
   public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }

To put it simply, it takes the task from the Queue and executes it.

ForkJoinTask

ForkJoinTask is the type of task that runs in ForkJoinPool. We usually use two of its subclasses: RecursiveAction and RecursiveTask.

They both define a compute() method that needs to be implemented to achieve specific business logic. The difference is that RecursiveAction is only used to execute tasks, while RecursiveTask can have return values.

Since both classes come with Recursive, the specific implementation logic will also be related to recursion, so let’s take an example of using RecursiveAction to print a string.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger =
            Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
            processing(workload);
        }
    }

    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
                + Thread.currentThread().getName());
    }
}

The above example uses dichotomy to print the string.

Let’s look at another example of a RecursiveTask.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .mapToInt(ForkJoinTask::join)
                    .sum();
        } else {
            return processing(arr);
        }
    }

    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
                .filter(a -> a > 10 && a < 27)
                .map(a -> a * 10)
                .sum();
    }
}

Much like the above example, but here we need to have return values.

Submitting Task in ForkJoinPool

With the two tasks above, we can submit in the ForkJoinPool.

1
2
3
4
5
int[] intArray= {12,12,13,14,15};
        CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

        int result = forkJoinPool.invoke(customRecursiveTask);
        System.out.println(result);

In the above example, we use invoke to commit, and invoke will wait for the result of the task execution.

If we don’t use invoke, we can also replace it with fork() and join().

1
2
3
customRecursiveTask.fork();
        int result2= customRecursiveTask.join();
        System.out.println(result2);

fork() submits the task to the pool, but does not trigger execution. join() will actually execute and get the returned result.