AbstractQueuedSynchronizer implementation principle - 2

Preface

In the previous articleAbstractQueuedSynchronizer implementation principle - 1, we explained the acquisition and release of exclusive synchronization state in AbstractQueuedSynchronizer, and here we start to explain the acquisition and release of shared synchronization state in AbstractQueuedSynchronizer.

Shared Synchronous State Acquisition and Release

Shared lock as the name implies is that multiple threads can share a lock, use acquireShared in the synchronizer to get the shared lock (synchronous state), the source code of the method is as follows.

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

First try to get the shared lock by tryAcquireShared, the method is a template method in the synchronizer just throw an unsupported operation exception, need developers to implement their own, while the return value of the method has three different types respectively represent three different states, the meaning of the following.

  1. less than 0 means the current thread failed to obtain a lock
  2. equal to 0 means that the current thread succeeded in acquiring the lock, but the subsequent threads will fail to acquire the lock without releasing it, which means that the lock is the last lock in the shared mode.
  3. greater than 0 means the current thread succeeded in acquiring the lock and there are still locks left to acquire

When the method tryAcquireShared returns less than 0, which means that the lock acquisition failed, the method doAcquireShared will be executed to follow up the method.

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

method first calls the addWaiter method to encapsulate the current thread and the node whose wait status is shared module and add it to the wait synchronization queue, you can find that the nextWaiter property of the node in shared mode is a fixed value Node.SHARED. If the return value is greater than or equal to 0, then the setHeadAndPropagate method is called to update the head node and propagate backward to wake up the successor node if there are available resources. If the interrupt has occurred, the current thread is interrupted and the method returns at the end. If the return value is less than 0, it means that the lock acquisition failed and the current thread needs to be hung to block or continue to spin to acquire the shared lock. Here is the implementation of the setHeadAndPropagate method.

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
        /* Try to signal next queued node if:
        * Propagation was indicated by caller,
        /* or was recorded (as h.waitStatus either before
        * or after setHead) by a previous operation
        * (note: this uses sign-check of waitStatus because
        PROPAGATE status may transition to SIGNAL.) * and
        * and
        * The next node is waiting in shared mode,
        * or we don't know, because it appears null
        * The conservatism in both
        * The conservatism in both of these checks may cause
        * unnecessary wake-ups, but only when there are multiple
        * racing acquires/releases, so most need signals now or soon
        * anyway.
        */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

First set the node that currently gets the lock as the head node, then the method parameter propagate > 0 means that the return value of the previous tryAcquireShared method is greater than 0, which means that there is still a remaining shared lock to get, then get the successor node of the current node and wake up the node when the successor node is a shared node to try to get the lock, doReleaseShared method is the main logic for synchronizer shared lock release.

The synchronizer provides the releaseShared method for releasing a shared lock, the source code of which is shown below.

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

First call the tryReleaseShared method to try to release the shared lock, the method returns false which means the lock release failed, the method ends with false, otherwise it means the lock was successfully released, then execute the doReleaseShared method to wake up the successor node and check if it can propagate backwards, etc. Continue with the method as follows.

private void doReleaseShared() {
        /*
        * Ensure that a release propagates, even if there are other
        * This proceeds in the usual
        This proceeds in the usual * way of trying to unparkSuccessor of head if it needs
        * But if it does not, status is set to PROPAGATE to
        But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues.
        But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added
        * Also, unlike other uses of
        Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status
        Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking.
        */
    for (;;) {
        Node h = head;
        if (h ! = null && h ! = tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                SIGNAL, 0)) { if (!compareAndSetWaitStatus(h, Node.
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}

As you can see, unlike exclusive lock release, in shared mode, state synchronization and release can be performed simultaneously, with atomicity guaranteed by CAS, and the loop will continue if the head node changes. Every time a shared node wakes up in shared mode, the head node points to it, so that all subsequent nodes that have access to the shared lock are guaranteed to wake up.

How to customize synchronization components

Most of the synchronizer-based classes in the JDK aggregate one or more classes that inherit from the synchronizer, use the template methods provided by the synchronizer to customize the internal synchronization state management, and then use this internal class to implement the synchronization state management function, which in fact uses the template pattern in a way. For example, the reentrant lock ReentrantLock, read/write lock ReentrantReadWriteLock, semaphore Semaphore and synchronization tool class CountDownLatch in the JDK have the following source code screenshots.

AQS_use_in_jdk_examples.png

As we know from the above, we can customize the exclusive lock synchronization component and the shared lock synchronization component based on synchronizers. Lock interface to provide user-oriented methods, such as calling the lock method to obtain a lock, using unlock to release the lock, etc.

Inside the TripletsLock class there is a custom synchronizer Sync inherited from the synchronizer AQS, used to control the access and synchronization state of the thread, when the thread calls the lock method When a thread calls the lock method, the custom synchronizer Sync first calculates the synchronization state after the lock is acquired, and then uses the Unsafe class to ensure the atomicity of the synchronization state update. When a thread succeeds in acquiring the lock, the synchronization state state will be minus 1, and when a thread succeeds in releasing the lock, the synchronization state will be plus 1. The range of the synchronization state is 0, 1, 2 and 3, and a synchronization state of 0 means that no synchronization resources are available, so if a thread accesses it, it will be blocked. Here is the code to implement this custom synchronization component.

/**
 * @author javaisland
 * @date: 2022-04-19
 * @version: 1.0
 * @description:
 * @since JDK 1.8
 */
public class TripletsLock implements Lock {

  private final Sync sync = new Sync(3);

  private static final class Sync extends AbstractQueuedSynchronizer {
    public Sync(int state) {
      setState(state);
    }

    Condition newCondition() {
      return new ConditionObject();
    }

    @Override
    protected int tryAcquireShared(int reduceCount) {
      for (;;) {
        int currentState = getState();
        int newState = currentState - reduceCount;
        if (newState < 0 || compareAndSetState(currentState, newState)) {
          return newState;
        }
      }
    }

    @Override
    protected boolean tryReleaseShared(int count) {
      for (; ;) {
        int currentState = getState();
        int newState = currentState + count;
        if (compareAndSetState(currentState, newState)) {
          return true;
        }
      }
    }
  }

  @Override
  public void lock() {
    sync.acquireShared(1);
  }

  @Override
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }

  @Override
  public boolean tryLock() {
    return sync.tryAcquireShared(1) > 0;
  }

  @Override
  public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }

  @Override
  public void unlock() {
    sync.releaseShared(1);
  }

  @Override
  public Condition newCondition() {
    return sync.newCondition();
  }
}

Let’s start a test with 20 threads to see if the custom synchronization tool class TripletsLock meets our expectations. The test code is as follows.

/**
 * @author javaisland
 * @date: 2022-04-19
 * @version: 1.0
 * @description:
 * @since JDK 1.8
 */
public class TripletsLockTest {
  private final Lock lock = new TripletsLock();
  private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

  @Test
  public void testTripletsLock() {
    // start 20 threads
    for (int i = 0; i < 20; i++) {
      Thread worker = new Runner();
      worker.setDaemon(true);
      worker.start();
    }

    for (int i = 0; i < 20; i++) {
      second(2);
      System.out.println();
    }
  }

  private class Runner extends Thread {
    @Override
    public void run() {
      for (; ;) {
        lock.lock();
        try {
          second(1);
          System.out.println(dateFormat.format(new Date()) + " ----> " + Thread.currentThread().getName());
          second(1);
        } finally {
          lock.unlock();
        }
      }
    }
  }

  private static void second(long seconds) {
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

The test results are as follows.

AQS_TripletsLock_Test_Result.png

From the above test results, we can find that only three threads can get the lock at the same moment, as expected, and it should be clear here that this lock acquisition process is non-fair.

Summary

This article analyzes the basic data structures in the synchronizer, the process of obtaining and releasing exclusive and shared synchronization state. AbstractQueuedSynchronizer is the basic framework for implementing multi-threaded concurrency tools, and being familiar with it will help us to better use its features and related tools.