Use of CountDownLatch in java concurrency

Introduction

In java concurrency, it is very important to control access to shared variables, sometimes we also want to control the order of execution of concurrent threads, for example: wait for all threads to finish executing before executing another thread, or wait for all threads to be ready before starting the execution of all threads, This time we can use CountDownLatch.

CountDownLatch contains a counter that is placed in the QueuedSynchronizer.

For an explanation of QueuedSynchronizer, you can see the previous articleAbstractQueuedSynchronizer implementation principle - 1

When the countdown() method is called, the counter will be decremented by one. Then await() is called to wait for the counter to go to zero.

private static final class Sync extends AbstractQueuedSynchronizer {
    ...
}

private final Sync sync;

    public void countDown() {
        sync.releaseShared(1);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

Here we give two examples of usage:

The main thread waits for all the child threads to finish before it starts running

Here we define the subthread class, in which we pass in a CountDownLatch to count, and then call the countDown method of that CountDownLatch before the subthread finishes. Finally, in the main thread, we call the await() method to wait for the end of the execution of the child thread.

@Slf4j
public class MainThreadWaitUsage implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch countDownLatch;

    public MainThreadWaitUsage(List<String> outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        outputScraper.add("Count down");
        countDownLatch.countDown();
    }
}

See how to call.

    @Test
    public void testCountDownLatch()
            throws InterruptedException {

        List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new MainThreadWaitUsage(outputScraper, countDownLatch)))
                .limit(5)
                .collect(toList());

        workers.forEach(Thread::start);
        countDownLatch.await();
        outputScraper.add("Latch released");
        log.info(outputScraper.toString());

    }

The result of the execution is as follows.

07:37:27.388 [main] INFO MainThreadWaitUsageTest - [Counted down, Counted down, Counted down, Counted down, Counted down, Latch released]

Wait for all threads to be ready before executing together

In the above example, we have the main thread waiting for the child threads, so in this example, we will see how the child threads wait together until they are ready and then execute together.

The idea is simple, after the child thread starts, the counter of the waiting child thread is subtracted by one, the counter is waited for in the main thread, and after the counter goes to zero, the main thread then notifies the child thread to run.

public class ThreadWaitThreadUsage implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter. private CountDownLatch completedThreadCounter;

    public ThreadWaitThreadUsage(
            List<String> outputScraper,
            CountDownLatch readyThreadCounter,
            CountDownLatch callingThreadBlocker,
            CountDownLatch completedThreadCounter) {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            outputScraper.add("Count down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

See how to call.

    @Test
    public void testCountDownLatch()
            throws InterruptedException {

        List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch readyThreadCounter = new CountDownLatch(5);
        CountDownLatch callingThreadBlocker = new CountDownLatch(1);
        CountDownLatch completedThreadCounter = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new ThreadWaitThreadUsage(
                        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
                .limit(5)
                .collect(toList());

        workers.forEach(Thread::start);
        readyThreadCounter.await();
        outputScraper.add("Workers ready");
        callingThreadBlocker.countDown();
        completedThreadCounter.await();
        outputScraper.add("Workers complete");

        log.info(outputScraper.toString());

    }

The output results are as follows.

07:41:47.861 [main] INFO ThreadWaitThreadUsageTest - [Workers ready, Counted down, Counted down, Counted down, Counted down, Counted down, Counted down, Workers complete]

Stop CountdownLatch’s await

If we call the await() method, it will wait until count=0 before finishing. However, if an exception occurs during thread execution, the countdown method may not execute. Then the await() method may wait indefinitely.

At this time we can use.

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }