Thread pool rejection policy

Preface

When it comes to java thread pools nothing is more familiar than the ExecutorService interface. jdk1.5 adds java.util.concurrent package under this api, which greatly simplifies the development of multi-threaded code. Whether you use FixedThreadPool or CachedThreadPool the implementation behind it is ThreadPoolExecutor. threadPoolExecutor is a typical product of cache pooling design, because the pool has a size, when the pool volume is not enough to carry, it involves a rejection policy. jdk There are already four thread pool rejection policies pre-defined. The following is a detailed message about the use of these policies in conjunction with scenarios, and what other rejection policies we can extend.

Timing of thread pools to trigger rejection policies

Unlike data source connection pools, thread pools have an additional blocking queue to buffer in addition to the initial size and pool maximum. Data source connection pools generally trigger a rejection policy when the number of requested connections exceeds the connection pool maximum, and the policy is generally to block for the set amount of time or just throw an exception.

When the number of tasks submitted is greater than the corePoolSize, it will be put into the queue buffer first, and only after the buffer is filled will it determine whether the currently running task is greater than the maxPoolSize, and if it is less than that, a new thread will be created to handle it. If it is larger than that, the rejection policy is triggered. To summarize: the current number of tasks submitted is larger than (maxPoolSize + queueCapacity), the rejection policy of the thread pool will be triggered.

JDK built-in 4 kinds of thread pool rejection policy

Before analyzing the thread pool rejection policy that comes with the JDK, let’s take a look at the rejection policy interface defined by the JDK.

as follows:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

The interface definition is clear, when the rejection policy is triggered, the thread pool will call the specific policy you set, the current submitted task and the thread pool instance itself passed to you to deal with, specific processing, different scenarios will have different considerations, the following look at the JDK for our built-in implementation of which.

CallerRunsPolicy (caller run policy)

    public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

Function: When the rejection policy is triggered, it is handled by the current thread that submitted the task as long as the thread pool is not closed.

Usage scenarios: Generally used in scenarios where failure is not allowed, performance requirements are not high, and concurrency is small, because the thread pool will not be closed in general, which means that the submitted task will definitely be run, but since it is executed by the caller thread itself, when the task is submitted multiple times, it will block the execution of subsequent tasks, and performance and efficiency will naturally be slow.

AbortPolicy (abort policy)

    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

Function: When the rejection policy is triggered, it directly throws an exception that rejects the execution, which means that the abort policy interrupts the current execution process.

The default policy in ThreadPoolExecutor is AbortPolicy, and the series ThreadPoolExecutor of the ExecutorService interface does not show the rejection policy, so the default is This. However, please note that the thread pool instance queue in the ExecutorService is unbounded, which means that the rejection policy is not triggered even if the memory is burst. When using this policy for your own custom thread pool instances, be sure to handle the exceptions thrown when the policy is triggered, as it will interrupt the current execution process.

DiscardPolicy (discard policy)

    public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

Function: Discard the task directly and silently, without triggering any action.

Usage Scenario: You can use it if the task you submit is not important. Because it is an empty implementation, it will silently swallow your task. So this policy is basically not used.

DiscardOldestPolicy (discard old task policy)

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

Function: If the thread pool is not closed, pop the element at the head of the queue and try to execute it

Usage Scenario: This policy still drops tasks, and drops them silently, but features old unexecuted tasks that are discarded and have a high priority to be executed. Based on this feature, I can think of a scenario where a message is published, and a modified message, and when the message is published and not yet executed, then the updated message comes again, and this time the version of the unexecuted message is lower than the version of the message submitted now can be discarded. Because there may be messages in the queue with lower message versions queued for execution, it is important to do a good job of comparing message versions when actually processing messages.

Third-party implementations of rejection policies

Thread rejection policy in dubbo

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {
       //...
    }
}

As you can see, when a thread rejection is triggered by dubbo’s worker thread, three main things are done. The principle is to make it as clear as possible to the user the real reason for triggering the thread rejection policy.

  • A warning-level log is output, which contains details of the thread pool’s setup parameters, as well as the current state of the thread pool, and some details of the currently rejected task. This log is a good example of log printing, as well as other log printing examples such as spring, which can be easily located thanks to such detailed logs.

  • Output the current thread stack details, which is very important to help us locate the problem

  • Continue to throw a deny exception to make this task fail, this inherits the JDK default deny policy feature

Thread pool rejection policy in Netty

    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        NewThreadRunsPolicy() {
            super();
        }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e);
            }
        }
    }

The implementation in Netty is very much like the CallerRunsPolicy in the JDK, shedding tasks. The difference is that CallerRunsPolicy is a task that is executed directly in the caller’s thread. Netty, on the other hand, creates a new thread to handle it. So, Netty’s implementation can be extended to support high efficiency and high performance scenarios compared to the caller execution policy’s usage surface. However, it should be noted that Netty’s implementation does not make any judgment constraint when creating a thread, which means that as long as the system has resources, it will create a new thread to handle it, and will not throw a thread creation failure exception until no new thread is available.

Thread pool rejection policy in activeMq

 new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
                    }

                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
                }
            });

The policy in activeMq is of the maximum effort task execution type. When the rejection policy is triggered, the task is re-stuffed into the task queue after trying for one minute, and an exception is thrown when the one-minute timeout is not successful.

The thread pool rejection policy in pinpoint

public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {
    private final RejectedExecutionHandler[] handlerChain;

    public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {
        Objects.requireNonNull(chain, "handlerChain must not be null");
        RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
        return new RejectedExecutionHandlerChain(handlerChain);
    }

    private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
        this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {
            rejectedExecutionHandler.rejectedExecution(r, executor);
        }
    }
}

pinpoint’s rejection policy implementation is unique and different from all other implementations. He defines a rejection policy chain that wraps a list of rejection policies, and when a rejection policy is triggered, the rejectedExecution in the policy chain is executed in turn.

Conclusion

This article introduces the definition of the java thread pool reject policy interface from the thread pool design idea, and the timing of the thread pool triggering the reject policy. It also describes the thread pool rejection policy implementation ideas and usage scenarios based on the four built-in rejection policies of the JDK and four third-party open source software rejection policies. I hope that reading this article will give you a deeper understanding of the java thread pool rejection policy and enable you to apply it more flexibly according to different usage scenarios.