AbstractQueuedSynchronizer implementation principle - 1

Preface

AbstractQueuedSynchronizer is the basic framework for implementing concurrency tools in the JDK, and a deeper understanding of it will help us to better use its features and related tools. We hope you will read this article carefully and gain something from it.

In Java, access to shared resources by multiple threads is controlled by lock. We know that the lock function can be implemented by the synchronized keyword, which can implicitly acquire locks, that is, we do not need to care about the process of acquiring and releasing locks by using this keyword, but while it provides convenience, it also means that its flexibility is reduced. For example, there is a scenario where lock A is acquired first, then lock B is acquired, when lock B is acquired, lock A is released and lock C is acquired, when lock C is acquired, then lock B is released and lock D is acquired, and so on. After Java SE 5, a new Lock interface and a series of implementation classes were added to provide the same functionality as the synchronized keyword, which requires us to display the lock acquisition and release, in addition to providing synchronization features such as interruptible lock acquisition operations and timeout lock acquisition. Most of the Lock interface implementation classes provided in the JDK aggregate a subclass of the synchronizer AQS to achieve multi-threaded access control, so let’s take a look at the basic framework for building locks and other synchronization components - AQS ( AbstractQueuedSynchronizer)

AQS basic data structure

Synchronized Queues

When a thread fails to obtain the synchronization status, the synchronizer encapsulates the current thread and the current waiting status into an internally defined node Node and then adds it to the queue. When the synchronization state is released, the first node in the synchronization queue is woken up and allowed to try to get the synchronization state again. The basic structure of the synchronization queue is as follows.

AQS_QUEUE.png

Queue Node

The synchronous queue uses the static internal class Node in the synchronizer to hold references to threads that get synchronized state, the wait state of the thread, the predecessor node and the successor node.

AQS_inner_class_node.png

The names and specific meanings of the attributes of the Node nodes in the synchronous queue are shown in the following table.

attribute type and name description
volatile int waitStatus The wait status of the current node in the queue
volatile Node prev The predecessor node that is assigned when the node is added to the synchronization queue (using the tail-add method)
volatile Node next the successor node
volatile Thread thread The thread that gets the synchronization status
Node nextWaiter waits for the successor node in the queue, or if the current node is shared, this field is a SHARED constant

Each node thread has two lock modes, SHARED means that the thread waits for the lock in shared mode and EXCLUSIVE means that the thread waits for the lock in exclusive mode. Also the wait status waitStatus of each node can only take enumerated values from the following table.

enumerated values description
SIGNAL A value of -1 means that the node’s thread is ready to wait for the resource to be released
CANCELLED A value of 1 indicates that the request for a lock has been cancelled by the thread of the node
A CONDITION value of -2 indicates that the node’s thread is waiting on Condition to be woken up by another thread
A PROPAGATE value of -3 means that the next shared synchronization state acquisition will continue indefinitely and will only be used in the SHARED case
0 value is 0, the initial state, the default value for initialization

Sync state

The synchronizer internally uses an int type variable named state to represent the synchronization state. The main way the synchronizer is used is through inheritance, where subclasses manage the synchronization state by inheriting and implementing its abstract methods. The synchronizer gives us the following three methods to make changes to the synchronization state.

method signature description
protected final int getState() Get the current synchronization state
protected final void setState(int newState) set the current synchronization state
protected final boolean compareAndSetState(int expect, int update) set the current state using CAS, which guarantees atomicity of state setting

In exclusive locks the value of the synchronization state state is usually 0 or 1 (in the case of reentrant locks the state value is the number of reentrants), in shared locks state is the number of locks held.

Exclusive synchronous state acquisition and release

The synchronizer provides the acquire(int arg) method to acquire the exclusive synchronous state, which is the same as acquiring the lock.

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
The ```

method will first call the `tryAcquire` method to try to acquire a lock, and if you look at the source code of the method, you can see that the synchronizer does not implement the method (it just throws an unsupported operation exception `UnsupportedOperationException`), and this method needs to be implemented by the developers of the subsequent synchronization components themselves, if the method If the method returns `true` then the current thread has successfully acquired the lock, and `selfInterrupt()` is called to interrupt the current thread (PS: `This leaves you with a question: why interrupt the thread after acquiring the lock? If the method returns `false` it means that the current thread failed to get the lock, that is to say, some other thread has already gotten the lock before.
The source code shows that when the lock is not acquired, the second half of the condition and operation `acquireQueued(addWaiter(Node.EXCLUSIVE), arg)` will be executed, first specifying the lock mode as `Node.

```java
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred ! = null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

Construct a Node node with the lock mode (shared or exclusive) specified by the method parameters and the current thread. If the synchronous queue has been initialized, an attempt to join the queue from the tail will be made first, using the compareAndSetTail method to ensure atomicity. The source code of this method is based on the Unsafe class provided in the sun.misc package. If the first attempt to join the synchronous queue fails, the enq method will be called again to perform the queue entry operation, and the source code of the enq method will be followed up as follows.

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

The source code is similar to the first attempt to join the queue, except that the method adds a synchronous queue initialization judgment, uses the compareAndSetHead method to ensure the atomicity of setting the head node, and is also based on the Unsafe class, and then has an outer for (;;) dead loop, and the only exit condition is from the The only exit condition is a successful queue entry from the end of the queue, that is, if the method returns successfully, it means that the queue has been successfully entered, so that the execution of addWaiter is completed and the current Node node is returned. The node is then used as an input to the acquireQueued method to continue with the other steps, which is shown below.

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        SIGNAL) /*
          * This node has already set status asking a release
          SIGNAL) /* This node has already set status asking a release * to signal it, so it can safely park.
          */
        return true;
    if (ws > 0) {
        /*
          * Skip over predecessors and
          * indicate retry.
          Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
          * Indicate that we
          * Caller will need to
          * retry to make sure it cannot acquire before parking.
          */Caller will need to * retry to make sure it cannot acquire before parking.
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    SIGNAL); }
    return false;
}

Method parameters are the current node’s predecessor node and the current node, mainly by the predecessor node to determine whether to block, first get the predecessor node’s wait status ws, if the node status ws is SIGNAL, that means the predecessor node’s thread is ready, waiting for the release of resources, the method returns true that can block, if ws > 0, the method returns CANCELLED. If ws > 0, the node has only one status CANCELLED (value 1) that satisfies this condition, it means that the node thread’s request for a lock has been cancelled, and a do-while loop goes forward to find the node with the CANCELLED status and remove it from the synchronization queue, otherwise it goes to the else branch, using the compareAndSetWaitStatus atomic operation to change the wait status of the predecessor node to SIGNAL, in both cases the blocking method returns false. When blocking is judged to be necessary, that is, when the compareAndSetWaitStatus method returns true, the current thread will be hung by blocking with the parkAndCheckInterrupt method and the interrupt flag of the current thread will be returned. The method is as follows.

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

Thread blocking is implemented through the LockSupport tool class, and a deep dive into its source code reveals that it is also based on the Unsafe class. If both methods return true, the break flag is updated. Another question here is when will a node’s wait status waitStatus be changed to CANCELLED to cancel the request of the node thread to obtain a lock? Careful friends may have noticed that the finally block in the source code of the acquireQueued method posted above will decide whether to call the cancelAcquire method based on the failed flag, which is used to modify the node status to CANCELLED, leaving the specific implementation of the method to everyone to The specific implementation of the method is left for you to explore. This completes the process of AQS exclusive synchronous state acquisition lock.

Here’s another look at the exclusive lock release process. The synchronizer uses the release method to allow us to perform an exclusive lock release with the following method source code.

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h ! = null && h.waitStatus ! = 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

First call the tryRelease method to try to perform the lock release operation, and then follow up the method to find that the synchronizer just throws an unsupported operation exception UnsupportedOperationException, which is the same as the tryAcquire method in the exclusive lock acquisition above, and requires the developer to define the lock release operation.

AQS_tryrelease.png

As you can see from its JavaDoc, if it returns false, it means that the lock release failed and the method is finished. If the method returns true, then the current thread has released the lock successfully and needs to notify the threads in the queue waiting to get the lock to do so. If the current head node is not null and its waiting state is not the initial state (0), the thread is released from the blocking state and the method unparkSuccessor is used to achieve the following source code.

private void unparkSuccessor(Node node) {
    /*
      * If status is negative (i.e., possibly needing signal) try
      * It is OK if this
      It is OK if this * fails or if status is changed by waiting thread.
      */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
      * Thread to unpark is held in successor, which is normally
      * But if cancelled or apparently null,
      But if cancelled or apparently null, * traverse backwards from tail to find the actual
      But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor.
      */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t ! = null && t ! = node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s ! = null)
        LockSupport.unpark(s.thread);
}

First get the wait state ws of the head node, if the state is negative (Node.SIGNAL or Node.PROPAGATE), change it to the initial state (0) by CAS operation, then get the successor node of the head node, if the successor node is null or the status of the successor node is CANCELLED (get lock request cancelled If the node is null or the status of the successor node is CANCELLED (the lock request is cancelled), we start looking for the first node with a status other than CANCELLED from the end of the queue, and if the node is not empty, we use the unpark method of LockSupport to wake it up, which is implemented by the unpark of the Unsafe class at the bottom. The reason for looking for nodes that are not in the CANCELLED state from the end of the queue is that in the previous implementation of the incoming addWaiter method when acquiring an exclusive lock fails, the method is as follows.

AQS_unparkSuccessor.png

Suppose a thread has reached ① in the above diagram and ② has not yet been executed, and another thread happens to execute the unparkSuccessor method, then it is not possible to find the node from front to back, because the node’s successor pointer next has not been assigned yet, so it is necessary to find it from back to front. At this point, the exclusive lock release operation is over.

Exclusive Interruptible Synchronization State Acquisition

The synchronizer provides the acquireInterruptibly method to perform interruptible interrupt-acquiring lock operations, and the source code for the method implementation is as follows.

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

method first checks the interrupt status of the current thread, if it has been interrupted, it directly throws the interrupt exception InterruptedException that responds to the interrupt, otherwise it calls the tryAcquire method to try to acquire the lock, if the acquisition is successful then the method ends and returns, the acquisition fails to call the doAcquireInterruptibly method. Follow up the method as follows.

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

A closer look reveals that the source code of this method is basically similar to the implementation of the acquireQueued method above, except that the queueing operation addWaiter has been placed inside the method, and there is also a difference that when the interrupt is judged to be needed inside the loop, an exception is thrown directly in response to the interrupt.

AQS_acquirequeued_interruptibly_compare.png

The other steps are the same as the exclusive lock acquisition, and the flowchart is roughly the same as the lock acquisition without interrupt response, except that there is an extra step at the beginning to check the interrupt status of the thread and the loop will throw an interrupt exception.

Exclusive timeout to obtain synchronization status

Synchronizer provides tryAcquireNanos method to obtain synchronized state (i.e. lock) on a timeout, this method provides the timeout feature which was not supported by the synchronized keyword before. otherwise, false is returned. The source code of the method is as follows.

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

The tryAcquire method will be called first to try to acquire a lock, and return immediately if the acquisition is successful, otherwise the doAcquireNanos method will be called to enter the timeout to acquire a lock. As you can see above, the acquireInterruptibly method of the synchronizer throws an interrupted exception InterruptedException and returns immediately if the current thread is interrupted while waiting for the synchronized state to be acquired. The timeout acquisition process actually adds the timeout acquisition feature to the response to interrupts. The source code of the doAcquireNanos method is as follows.

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

From the source code of the above method, we can see that the main idea of timeout acquisition here is: first use the current time plus the timeout interval deadline passed in as a parameter to calculate the timeout point, then use the timeout point deadline minus the current time to get the remaining time nanosTimeout each time the loop is performed, if the If the remaining time is less than 0, then the method will end and return false, and if the remaining time is greater than 0. You can see that the execution of the spin inside and the above exclusive synchronous acquisition of lock status acquireQueued method there is the same routine, that is, when the current node’s predecessor is the head node call tryAcquire to try to acquire a lock, if the acquisition is successful then return.

AQS_acquireQueued_doAcquireNanos_compare.png

If the current thread fails to acquire a lock, it will determine whether the remaining timeout nanosTimeout is less than 0. If it is less than 0, the method will return immediately. shouldParkAfterFailedAcquire method, it will further compare the size of the remaining timeout nanosTimeout and spinForTimeoutThreshold, and if it is less than or equal tospinForTimeoutThreshold value (1000 nanoseconds), the current thread will not wait for the timeout, but will go through the spin process again. The main reason for adding this latter judgment is that waiting in a very short time (less than 1000 nanoseconds) cannot be done very precisely, and if we wait for a timeout at this point, it will make the timeout we specify for nanosTimeout feel less precise in general. The process of acquiring a lock.

Summary

This article explains the acquisition and release of exclusive synchronization state in AbstractQueuedSynchronizer, and we will explain the acquisition and release of shared synchronization state in AbstractQueuedSynchronizer in the next articleAbstractQueuedSynchronizer implementation principle - 2.