Java Multithreaded Concurrent FutureTask

Java’s multithreading mechanism essentially accomplishes two things, asynchronous computation and concurrency. Concurrency is solved by a series of APIs that address thread safety; asynchronous computation, on the other hand, is commonly used with Runnable and Callable in conjunction with threads.

FutureTask is a cancelable asynchronous callable API based on the Runnable implementation.

Basic usage

Future represents the result of an asynchronous computation, and is executed via the ExecutorService’s Future<? > submit(Runnable task) method of the ExecutorService, which is used as the return value.

interface ArchiveSearcher { String search(String target); }
class App {
    ExecutorService executor = ... ;
    ArchiveSearcher searcher = ... ;
    void showSearch(String target) throws InterruptedException {
       Callable<String> task = () -> searcher.search(target);
       Future<String> future = executor.submit(task); // get the result of execution
       displayOtherThings(); // do other things while searching
       try {
         displayText(future.get()); // use future
       } catch (ExecutionException ex) { cleanup(); return; }
    }
}

The FutureTask class is an implementation of Future that implements Runnable, and therefore can be executed by an Executor. For example, the above construct with submit can be replaced with :

class App {
    ExecutorService executor = ... ;
    ArchiveSearcher searcher = ... ;
    void showSearch(String target) throws InterruptedException {
       Callable<String> task = () -> searcher.search(target);
       // key two-line replacement
       FutureTask<String> future = new FutureTask<>(task);
       executor.execute(future);
       displayOtherThings(); // do other things while searching
       try {
         displayText(future.get()); // use future
       } catch (ExecutionException ex) { cleanup(); return; }
    }
}

Code analysis

Inheritance relations

class.png

Future

Future represents the result of an asynchronous computation. Defines the ability to check if the computation is complete, wait for it to complete, and retrieve the result of the computation. The get method can only be used to retrieve the result once the computation is complete, and will block the thread if necessary until the Future computation is complete. Cancellation is performed by the cancel method. Additional methods are provided to determine whether a task is completed normally or is cancelled. Once the computation is complete, it cannot be canceled. If you are using Future for cancellability and do not want to provide a usable result, you can declare a task of the form Future<? > and return null as the result of the task.

Before describing the capabilities defined in Future, let’s look at its internal class for representing Future state, and the state retrieval method.

public interface Future<V> {
    enum State {
        // The task has not completed.
        RUNNING,
        // The task completed with a result. @see Future#resultNow()
        SUCCESS,
        // The task completed with an exception. @see Future#exceptionNow()
        FAILED,
        // The task was cancelled. @see #cancel(boolean)
        CANCELLED
    }

    default State state() {
        if (!isDone()) // Determine if it is running based on isDone()
            RUNNING;
        if (isCancelled()) // cancelled according to isCancelled()
            return State.CANCELLED;
        boolean interrupted = false;
        try {
            while (true) { // dead loop polling
                try {
                    get(); // may throw InterruptedException when done
                    return State.SUCCESS; 
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    return State.FAILED;
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }
}

The default implementation of Future’s state retrieval is based on isDone(), isCancelled() and constant polling of the get() method for the return value.

When get() returns a normal result, state() returns State.SUCCESS; when an InterruptedException is thrown, the thread will eventually be operated to execute the method that tried to interrupt; when other exceptions are thrown, State.FAILED is returned.

Other methods defined in Future include.

package java.util.concurrent;

public interface Future<V> {
		// Cancel operation
    boolean cancel(boolean mayInterruptIfRunning);
		// Check if the operation is canceled
    boolean isCancelled();
		// Check for completion
    boolean isDone();
		// method to get the result of the calculation
    V get() throws InterruptedException, ExecutionException;
		// method to get the result of a calculation with a timeout limit
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
  	// Return the result immediately
  	default V resultNow()
  	// Throw an exception right away
  	default Throwable exceptionNow()
}

where resultNow() and exceptionNow() come with the default implementation of.

		default V resultNow() {
        if (!isDone())
            throw new IllegalStateException("Task has not completed");
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return get();
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    throw new IllegalStateException("Task completed with exception");
                } catch (CancellationException e) {
                    throw new IllegalStateException("Task was cancelled");
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }
  • Future is still running and throws an IllegalStateException directly.
  • Execute a poll, call get() to try to return the result of the computation, and if get() throws an exception, throw an IllegalStateException with a different message or perform the interrupt thread action depending on the exception.
    default Throwable exceptionNow() {
        if (!isDone())
            throw new IllegalStateException("Task has not completed");
        if (isCancelled())
            throw new IllegalStateException("Task was cancelled");
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    get();
                    throw new IllegalStateException("Task completed with a result");
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    return e.getCause();
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }
  • Future is still running and throws IllegalStateException directly.
  • Future checks to see if it has been cancelled, and throws IllegalStateException if it has.
  • Execute polling, call get() method, if it can finish executing normally, also throw IllegalStateException with message “Task completed with a result”; get() if it throws InterruptedException, then execute thread interrupt operation; other exceptions is thrown normally.

That’s the whole picture of Future.

RunnableFuture

The RunnableFuture interface implements both the Runnable and Future interfaces.

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /* * Sets this Future to the result of the result of the Future.
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     * Sets this Future to the result of its computation * unless it has been cancelled.
     */
    void run();
}

The Runnable interface is commonly used to implement threaded operations and is very familiar and simple to use.

This interface represents a Future that can be Runnable. The successful execution of the run method means that the execution of the Future is complete and the result of its computation can be obtained.

This interface is only available after JDK 1.6.

FutureTask

FutureTask is a direct implementation of RunnableFuture, which represents a cancelable asynchronous computation task. Based on our analysis of Future and familiarity with Runnable, we can understand its role: a Runnable that can be canceled and can retrieve the running state, and can be used with threads to interrupt thread execution. When the task is not completed, it will cause blocking. And it can also be used with Executor.

Status

FutureTask also defines its own state internally.

public class FutureTask<V> implements RunnableFuture<V> {
	private volatile int state;
    private static final int NEW = 0; // New
    private static final int COMPLETING = 1; // finishing
    private static final int NORMAL = 2; // normal completion
    private static final int EXCEPTIONAL = 3; // Exceptional
    private static final int CANCELLED = 4; // cancelled
    private static final int INTERRUPTING = 5; // interrupted
    private static final int INTERRUPTED = 6; // interrupted
  
	@Override
    public State state() {
        int s = state;
        while (s == COMPLETING) {
            // wait for transition to NORMAL or EXCEPTIONAL
            Thread.yield();
            s = state;
        }
        switch (s) {
            case NORMAL:
                return State.SUCCESS;
            case EXCEPTIONAL:
                return State.FAILED;
            case CANCELLED:
            case INTERRUPTING:
            case INTERRUPTED:
                return State.CANCELLED;
            default:
                return State.RUNNING;
        }
    }
}

The FutureTask has 7 states, initially NEW, and only in the set, setException and cancel methods does the running state change to the final state. During completion, the state may be COMPLETING (when the result is being set) or INTERRUPTING (only if the runner is interrupted to satisfy the transient value of cancel(true)).

The possible state transitions are.

NEW -> COMPLETING -> NORMAL // normal completion
NEW -> COMPLETING -> EXCEPTIONAL // exception thrown
NEW -> CANCELLED // cancel
NEW -> INTERRUPTING -> INTERRUPTED // interrupt

property

The following analyzes its properties.

    /* The underlying call; runs as null */
    private Callable<V> callable;
    /* The result returned by get() or the exception thrown */
    private Object outcome; // non-volatile, protected by state reads/writes
    /* The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /* The Treiber stack of waiting threads */
    private volatile WaitNode waiters;

Internal Classes

Let’s take a look at this WaitNode, which is an internal class of FutureTask.

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

A linked table structure to sort the waiting threads.

Constructed Methods

Finally, the methods are analyzed, starting with the constructor method.

    // Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion. 
     * Runnable Success is the return of the given result result
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

Success is to return the given result resFutureTask receives a Callable or a Runnable as an argument, the Runnable will encapsulate it all and save it to the property callable, then update the state of the FutureTask to NEW.

Analyze the methods implemented in the Future interface one by one.

Retrieves the FutureTask status
    public boolean isCancelled() {
        return state >= CANCELLED; // greater than or equal to 4, cancelled, interrupted, interrupted
    }

    public boolean isDone() {
        return state ! = NEW; // not new means the execution is finished
    }
cancel operation
    // mayInterruptIfRunning indicates whether the final cancellation is by interrupt or by cancellation.
		public boolean cancel(boolean mayInterruptIfRunning) {
        if (! (state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // try to set CANCELLED or INTERRUPTING 
            return false;
        try { // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t ! = null)
                        t.interrupt(); // cancel the task by interrupting
                } finally { // final state
                    STATE.setRelease(this, INTERRUPTED); // update the interrupt state
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

The purpose of finishCompletion() is to wake up all the waiting threads by LockSupport and remove them from the waiting list, then call done() and finally empty the callable. This is equivalent to releasing resources after a successful cancellation.

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }

done() is an empty implementation for subclasses to customize.

protected void done() { }
Calculation results
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

There are two methods involved: the awaitDone method and the report method.

The awaitDone method.

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - if nanos <= 0L, Return in time, no allocation or nanoTime required
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {  // COMPLETING = 1
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // transient, in progress
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q); // Thread interrupt, remove waiting threads
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            else if (timed) { // Setting the timeout period
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

Block the current thread and wait for the result of the calculation asynchronously by using CAS and LockSupport’s suspend/wake operations.

There is a removeWaiter method, which internally iterates through waiters and removes timeouts and interrupted waiters.

When the asynchronous logic is finished, the report method is called.

    // return the result or throw an exception for the completed task
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

An outcome is used here, which is an Object type, and as a result it can be set by the set method.

        // Set the outcome of the future to the given value, unless the future has been set or cancelled.
		// This method is called internally by the run method after successful completion of the calculation.
		protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = v;
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }
Get results or exceptions immediately

Both of these methods return the expected result or exception via the outcome pre-defined return value.

    public V resultNow() {
        switch (state()) {    // Future.State
            case SUCCESS:
                @SuppressWarnings("unchecked")
                V result = (V) outcome;
                return result;
            case FAILED:
                throw new IllegalStateException("Task completed with exception");
            case CANCELLED:
                throw new IllegalStateException("Task was cancelled");
            default:
                throw new IllegalStateException("Task has not completed");
        }
    }

    @Override
    public Throwable exceptionNow() {
        switch (state()) {    // Future.State
            case SUCCESS:
                throw new IllegalStateException("Task completed with a result");
            case FAILED:
                Object x = outcome;
                return (Throwable) x;
            case CANCELLED:
                throw new IllegalStateException("Task was cancelled");
            default:
                throw new IllegalStateException("Task has not completed");
        }
    }
run method group

Finally, the run method that implements Runnable.

    public void run() {
      	// Ensure that the NEW state and RUNNER successfully set the current thread
        if (state ! = NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable; // the Callable to be executed
            if (c ! = null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call(); // execute Callable 
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // To prevent concurrent calls to run, the runner must be non-null until the state is determined
            runner = null;
            // The state must be re-read after the runner is empty to prevent a break in the leak
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

There are two methods involved here, the first is setException(ex) :

    // causes this future to report a {@link ExecutionException} with the given throwable object as its cause, unless the future has been set or cancelled.
    protected void setException(Throwable t) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = t;
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

The other is the handlePossibleCancellationInterrupt method.

    /**
     * Ensure that any interrupt from a possible cancel(true) is delivered to the task only on run or runAndReset.
     */
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;
        // We want to clear all interrupts that may have been received from cancel(true).
        // However, allowing the use of interrupts as a separate mechanism for communication between a task and its caller doesn't have a way to just clear cancel interrupts.
        // Thread.interrupted();
    }

Finally, the runAndReset method.

    protected boolean runAndReset() {
        if (state ! = NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return false;
        boolean ran = false; // flag indicates the end of normal execution
        int s = state;
        try {
            Callable<V> c = callable;
            if (c ! = null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state; // 
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW; // When normal execution ends and state is NEW to begin with, it is ready to run and reset.
    }

Executes a computation without setting its result, then resets that future to its initial state, or not if the computation encounters an exception or is cancelled. This is designed for tasks that are essentially executed multiple times.

Both run and runAndReset use a RUNNER , which is finally revealed as follows.

    private static final VarHandle STATE;
    private static final VarHandle RUNNER;
    private static final VarHandle WAITERS;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            STATE = l.findVarHandle(FutureTask.class, "state", int.class);
            RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
            WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }

        // Reduce the risk of rare disastrous classloading in first call to
        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
        Class<? > ensureLoaded = LockSupport.class;
    }

MethodHandles.lookup() creates a MethodHandles.Lookup object that can create methods with all access rights, including public, protected, private, default.

VarHandle is mainly used to dynamically manipulate elements of arrays or member variables of objects. VarHandle gets its instances through MethodHandles, and then calls the methods of VarHandle to dynamically manipulate the elements of the specified array or the member variables of the specified object.