Preface

In Java, threads are the counterpart of system threads and are used to handle a range of system resources. The number of threads that can be opened is limited in both windows and linux, so if you create unlimited threads in your java program, you will encounter a situation where no threads can be created.

CPU cores are limited and if there are multiple threads running at the same time, the CPU will rotate according to the priority of the threads and allocate a specific amount of CPU time to each thread. So more threads are not better.

There are two interfaces that represent the management of ThreadPool: ExecutorService and Executor.

The steps we take to run a thread are generally like this:

  • Create an ExecutorService.
  • Submit the task to the ExecutorService.
  • The ExecutorService schedules the thread to run the task.

Draw a diagram to represent.

threadPool.png

Here I will talk about how to use ThreadPool in java.

Executors, Executor and ExecutorService

Executors provide a set of easy methods to help us create ThreadPool.

The Executor interface defines a method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public interface Executor {

    /*
     * Executes the given command at some time in the future.
     * may execute in a new thread, in a pooled thread, or in the calling
     The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation.
     *@param command the runnings.
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService inherits from Executor and provides more thread pool operations. It is a complement to Executor.

According to the principle of interface implementation separation, we usually use ExecutorService or Executor in java code instead of specific implementation classes.

Let’s look at how to create an Executor and ExecutorService through Executors.

1
2
3
4
5
6
        Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> log.info("in Executor"));

        ExecutorService executorService= Executors.newCachedThreadPool();
        executorService.submit(()-> log.info("in ExecutorService"));
        executorService.shutdown();

ThreadPoolExecutor

ThreadPoolExecutor is an implementation of the ExecutorService interface, which can add a more fine-grained configuration to the thread pool, specifically it can control these three parameters: corePoolSize, maximumPoolSize, and keepAliveTime.

PoolSize is the number of threads in the pool, and corePoolSize represents the minimum number of threads to be initialized and maintained in the pool.

If there are too many threads waiting, maximumPoolSize can be set to provide the maximum number of threads in the pool, so that the pool will create more threads for task execution.

keepAliveTime is the time that extra threads will wait for unassigned tasks. After that time, the thread will be reclaimed by the thread pool.

Let’s look at how to create a ThreadPoolExecutor.

1
2
3
4
5
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
        threadPoolExecutor.submit(()->log.info("submitted through threadPoolExecutor"));
        threadPoolExecutor.shutdown();

In the above example we create the ThreadPoolExecutor by using the constructor of the ThreadPoolExecutor.

Generally speaking Executors already have many implementations of ThreadPoolExecutor built in, let’s look at the following example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
ThreadPoolExecutor executor1 =
                (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        log.info("executor1 poolsize {}",executor1.getPoolSize());
        log.info("executor1 queuesize {}", executor1.getQueue().size());
        executor1.shutdown();

In the above example we Executors.newFixedThreadPool(2) to create a ThreadPoolExecutor.

In the above example we have submitted 3 tasks, but our pool size is only 2. So there is still a 1 that cannot be executed immediately and needs to wait in the queue.

Let’s look at another example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
ThreadPoolExecutor executor2 =
                (ThreadPoolExecutor) Executors.newCachedThreadPool();
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });

        log.info("executor2 poolsize {}", executor2.getPoolSize());
        log.info("executor2 queue size {}", executor2.getQueue().size());
        executor2.shutdown();

In the above example we used Executors.newCachedThreadPool() to create a ThreadPoolExecutor. After running it we can see that poolsize is 3 and queue size is 0. This means that newCachedThreadPool will automatically increase pool size automatically.

If the thread is not activated in 60 seconds or so, it will be retracted.

The Queue here is a SynchronousQueue, since insertion and removal are basically done at the same time, so the queue size here is basically 0.

ScheduledThreadPoolExecutor

There is also a very common ScheduledThreadPoolExecutor, which inherits from ThreadPoolExecutor, and implements the ScheduledExecutorService interface.

1
2
3
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService

Let’s see how to use.

1
2
3
4
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.schedule(() -> {
            log.info("Hello JavaIsland");
        }, 500, TimeUnit.MILLISECONDS);

In the above example, we defined a timed task that will be executed after 500 milliseconds.

Earlier we also talked about two other very common methods of the ScheduledExecutorService.

  • scheduleAtFixedRate - Takes the start time as the interval.
  • scheduleWithFixedDelay - at the end time interval.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CountDownLatch lock = new CountDownLatch(3);

        ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(5);
        ScheduledFuture<? > future = executor2.scheduleAtFixedRate(() -> {
            log.info("in ScheduledFuture");
            lock.countDown();
        }, 500, 100, TimeUnit.MILLISECONDS);

        lock.await(1000, TimeUnit.MILLISECONDS);
        future.cancel(true);

ForkJoinPool

ForkJoinPool is a new framework introduced in java 7, which we will explain in detail in a later articleThe fork join framework in java. Here is a brief introduction.

ForkJoinPool is mainly used to generate a large number of tasks to do algorithmic operations. If you use threads to do it, it will consume a lot of threads. But this problem does not arise in the fork/join framework.

In fork/join, any task can generate a large number of sub-tasks, and then wait for the sub-task to finish by using join().

Here we give an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
static class TreeNode {

    int value;

    Set<TreeNode> children;

    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}

Define a TreeNode, then iterate over all values and add them up to.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class CountingTask extends RecursiveTask<Integer> {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
                .map(childNode -> new CountingTask(childNode).fork()).mapToInt(ForkJoinTask::join).sum();
    }
}

Here is the code to call.

1
2
3
4
5
6
7
8
    public static void main(String[] args) {
        TreeNode tree = new TreeNode(5,
                new TreeNode(3), new TreeNode(2,
                new TreeNode(2), new TreeNode(8)));

        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        int sum = forkJoinPool.invoke(new CountingTask(tree));
    }