JUC并发—6.AQS源码分析二

大纲

1.ReentractReadWriteLock的基本原理

2.基于AQS实现的ReentractReadWriteLock

3.ReentractReadWriteLock如何竞争写锁

4.ReentractReadWriteLock如何竞争读锁

5.ReentractReadWriteLock的公平锁和非公平锁

6.ReentrantReadWriteLock中的锁降级

7.Condition的说明介绍

8.Condition的源码实现

1.ReentractReadWriteLock的基本原理

(1)读锁和写锁关系

表面上读锁和写锁是两把锁,但实际上只是同一把锁的两个视图。读锁和写锁在初始化的时候会共用一个Sync,也就是同一把锁、两类线程。其中读线程和读线程不互斥,读线程和写线程互斥,写线程和写线程互斥。

(2)锁状态设计

和独占锁一样,读写锁也是使用state变量来表示锁的状态。只是将state变量拆成两半:高16位表示读锁状态,低16位表示写锁状态。

读写锁是通过位运算来快速确定读和写的状态的。假设当前state = s,则写状态等于s & ((1 << 16) - 1),读状态等于s >>> 16。当写状态增加1时,state = s + 1。当读状态加1时,state = s + (1 << 16)。

将一个int型的state变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态,是因为无法用一次CAS同时操作两个int型变量。

1 << 0 等于 1,(1 << 0) - 1 = 0
1 << 1 等于 10,(1 << 1) - 1 = 01
1 << 2 等于 100,(1 << 2) - 1 = 011
1 << 4 等于 1000,(1 << 4) - 1 = 0111
1 << 8 等于 100000000,(1 << 8) - 1 = 011111111
1 << 16 等于 10000000000000000,(1 << 16) - 1 = 01111111111111111
//所以s & ((1 << 16) - 1)相当于将s的高16位全部抹去,只剩下低16位

//若s = 11111,则s >>> 2 = 00111
//所以s >>> 16,就是无符号补0右移16位

(3)写锁的获取与释放

写锁是一个可重入的排他锁,它只能被一个线程同时获取。如果当前线程已获取写锁,则增加写状态:s + 1。如果当前线程在获取写锁时,读锁已被获取或者自己不是已获写锁的线程,则进入等待状态。

(4)读锁的获取与释放

读锁是一个可重入的共享锁,它能被多个线程同时获取。在写状态为0时,读锁总会被成功获取,而所做的也只是增加读状态。如果当前线程已获取读锁,则增加读状态:state = s + (1 << 16)。如果当前线程在获取读锁时,写锁已被其他其他线程获取,则该线程进入等待状态。

2.基于AQS实现的ReentractReadWriteLock

(1)ReentractReadWriteLock的构造函数

(2)ReentractReadWriteLock的读锁和写锁

(3)ReentractReadWriteLock基于AQS的两对模版方法

(1)ReentractReadWriteLock的构造函数

readerLock变量表示读锁,writerLock变量表示写锁,sync变量表示公平锁还是非公平锁。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    //Inner class providing readlock
    private final ReentrantReadWriteLock.ReadLock readerLock;
    //Inner class providing writelock
    private final ReentrantReadWriteLock.WriteLock writerLock;
    //Performs all synchronization mechanics
    final Sync sync;

    //Creates a new ReentrantReadWriteLock with default (nonfair) ordering properties.
    public ReentrantReadWriteLock() {
        this(false);
    }

    //Creates a new ReentrantReadWriteLock with the given fairness policy.
    //@param fair {@code true} if this lock should use a fair ordering policy
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    
    //获取写锁
    public ReentrantReadWriteLock.WriteLock writeLock() {
        return writerLock;
    }
    
    //获取读锁
    public ReentrantReadWriteLock.ReadLock  readLock()  {
        return readerLock;
    }
    ...
}

(2)ReentractReadWriteLock的读锁和写锁

读锁和写锁都会通过内部类Sync来实现获取锁和释放锁的功能。加写锁会调用AQS的acquire()方法,然后调用tryAcquire()方法。加读锁会调用AQS的acquireShared()方法,然后调用acquireShared()方法。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    //Performs all synchronization mechanics
    final Sync sync;

    //The lock returned by method {@link ReentrantReadWriteLock#readLock}.
    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
      
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
      
        //Acquires the read lock.
        //Acquires the read lock if the write lock is not held by another thread and returns immediately.
        //If the write lock is held by another thread then the current thread becomes disabled 
        //for thread scheduling purposes and lies dormant until the read lock has been acquired.
        public void lock() {
            sync.acquireShared(1);
        }

        //Acquires the read lock unless the current thread is Thread#interrupt interrupted.
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

        //Acquires the read lock only if the write lock is not held by another thread at the time of invocation.
        public boolean tryLock() {
            return sync.tryReadLock();
        }

        //Acquires the read lock if the write lock is not held by another thread 
        //within the given waiting time and the current thread has not been Thread#interrupt interrupted.
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

        //Attempts to release this lock.
        //If the number of readers is now zero then the lock is made available for write lock attempts.
        public void unlock() {
            sync.releaseShared(1);
        }
        ...
    }

    //The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
      
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        //Acquires the write lock.
        //Acquires the write lock if neither the read nor write lock are held by another thread and returns immediately, 
        //setting the write lock hold count to one.
        //If the current thread already holds the write lock then the hold count is incremented by one and the method return immediately.
        //If the lock is held by another thread then the current thread becomes disabled 
        //for thread scheduling purposes and lies dormant until the write lock has been acquired, 
        //at which time the write lock hold count is set to one.
        public void lock() {
            sync.acquire(1);
        }

        //Acquires the write lock unless the current thread is Thread#interrupt interrupted.
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }

        //Acquires the write lock only if it is not held by another thread at the time of invocation.
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }

        //Acquires the write lock if it is not held by another thread 
        //within the given waiting time and the current thread has not been Thread#interrupt interrupted.
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }

        //Attempts to release this lock.
        //If the current thread is the holder of this lock then the hold count is decremented. 
        //If the hold count is now zero then the lock is released. 
        //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown.
        public void unlock() {
            sync.release(1);
        }
        ...
    }
    ...
}

(3)ReentractReadWriteLock基于AQS的两对模版方法

独占锁和读写锁的写锁都是基于AQS的acquire/release模版方法实现的,读写锁的读锁是基于AQS的acquireShared/releaseShared模版方法实现的。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...
    //Acquires in exclusive mode, ignoring interrupts.
    //Implemented by invoking at least once #tryAcquire, returning on success.
    //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success.
    //This method can be used to implement method Lock#lock.
    public final void acquire(int arg) {
        //tryAcquire()需要子类重写
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
            selfInterrupt();
        }
    }
    
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    //Releases in exclusive mode.
    //Implemented by unblocking one or more threads if #tryRelease returns true.
    //This method can be used to implement method Lock#unlock.
    public final boolean release(int arg) {
        //tryRelease()需要子类重写
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0) {
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }
    
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    //Acquires in shared mode, ignoring interrupts.
    //Implemented by first invoking at least once #tryAcquireShared, returning on success.
    //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquireShared until success.
    public final void acquireShared(int arg) {
        //tryAcquireShared()需要子类重写
        if (tryAcquireShared(arg) < 0) {
            doAcquireShared(arg);
        }
    }
    
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    //Releases in shared mode.
    //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
    public final boolean releaseShared(int arg) {
        //tryReleaseShared()需要子类重写
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    ...
}

3.ReentractReadWriteLock如何竞争写锁

(1)WriteLock的获取

(2)WriteLock的释放

(1)WriteLock的获取

WriteLock的lock()方法会调用AQS的acquire()模版方法来获取锁,而AQS的acquire()方法又会调用继承自AQS的Sync类的tryAcquire()方法。

在Sync类的tryAcquire()方法中,getState()方法会返回当前state变量的值。exclusiveCount()方法会从state变量中查找当前获得写锁的线程数量,writerShouldBlock()方法会判断当前写线程在抢占锁时是否应该阻塞。

情况一:c != 0 && w == 0

说明此时有线程持有读锁,所以当前线程获取不到写锁,返回false。由此可见,一个线程获取读锁后不能再继续重入获取写锁(不能锁升级)。但从后续可知,一个线程获取写锁后可以再继续重入获取读锁(能锁降级)。

情况二:c != 0 && w != 0

说明此时有线程持有写锁且不可能有线程持有读锁,所以需要判断持有写锁的线程是否是当前线程自己,如果不是则返回false。

情况三:c != 0 && w != 0 && current持有锁

说明此时当前线程正在持有写锁,属于重入写锁的情况,需要判断重入次数,锁重入的次数不能大于65535。

情况四:c == 0

说明此时没有线程持有锁,所以当前线程可以通过CAS操作抢占锁。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    //Performs all synchronization mechanics
    final Sync sync;

    //The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
      
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        //Acquires the write lock.
        //Acquires the write lock if neither the read nor write lock are held by another thread and returns immediately, 
        //setting the write lock hold count to one.
        //If the current thread already holds the write lock then the hold count is incremented by one and the method return immediately.
        //If the lock is held by another thread then the current thread becomes disabled 
        //for thread scheduling purposes and lies dormant until the write lock has been acquired, 
        //at which time the write lock hold count is set to one.
        public void lock() {
            //执行AQS的acquire()模版方法,获取写锁(独占锁)
            sync.acquire(1);
        }
        ...
    }
    
    //Synchronization implementation for ReentrantReadWriteLock.
    //Subclassed into fair and nonfair versions.
    abstract static class Sync extends AbstractQueuedSynchronizer {
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        //Returns the number of exclusive holds represented in count
        static int exclusiveCount(int c) {
            //获取写锁状态:c & ((1 << 16) - 1)
            //也就是从state变量中查找当前获得写锁的线程数量
            return c & EXCLUSIVE_MASK;
        }

        //获取写锁(独占锁)
        protected final boolean tryAcquire(int acquires) {
            //Walkthrough:
            //1.If read count nonzero or write count nonzero and owner is a different thread, fail.
            //2.If count would saturate, fail. (This can only happen if count is already nonzero.)
            //3. Otherwise, this thread is eligible for lock if it is either a reentrant acquire or queue policy allows it.
            //If so, update state and set owner.
            Thread current = Thread.currentThread();
            int c = getState();//获取锁的状态
            int w = exclusiveCount(c);//获取写锁的状态
            if (c != 0) {
                //如果c != 0 && w == 0时,说明有线程持有读锁,所以当前获取写锁的线程会被阻塞,会返回false
                //如果c != 0 && w != 0 && current没获取锁,说明有其他线程持有写锁(不可能有线程持有读锁),所以当前获取写锁的线程会被阻塞,会返回false
                if (w == 0 || current != getExclusiveOwnerThread()) {
                    return false;
                }
                //判断重入次数不能大于65535
                if (w + exclusiveCount(acquires) > MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
                //Reentrant acquire
                setState(c + acquires);
                return true;
            }
            //此时c == 0,说明没有线程持有锁,可以通过CAS操作抢占锁
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
                return false;
            }
            setExclusiveOwnerThread(current);
            return true;
        }
        ...
    }
}


public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...
    //获取写锁(独占锁)
    //Acquires in exclusive mode, ignoring interrupts.
    //Implemented by invoking at least once #tryAcquire, returning on success.
    //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success.
    //This method can be used to implement method Lock#lock.
    public final void acquire(int arg) {
        //tryAcquire()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryAcquire()方法
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
            selfInterrupt();
        }
    }
    ...
}

(2)WriteLock的释放

WriteLock的unlock()方法会调用AQS的release()模版方法来释放锁,而AQS的release()方法又会调用继承自AQS的Sync类的tryRelease()方法。

在Sync类的tryRelease()方法中,首先通过getState() - releases来递减写锁的次数。由于写锁的重入次数保存在低位,所以直接十进制相减即可。然后通过exclusiveCount()获取写锁的重入次数,如果为0说明锁释放成功。最后通过setState()方法修改state变量的值。由于写锁是独占锁,所以设置state变量的值不需要CAS操作。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    //Performs all synchronization mechanics
    final Sync sync;

    //The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
      
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        //Attempts to release this lock.
        //If the current thread is the holder of this lock then the hold count is decremented. 
        //If the hold count is now zero then the lock is released. 
        //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown.
        public void unlock() {
            //执行AQS的release()方法,释放写锁(独占锁)
            sync.release(1);
        }
        ...
    }
    
    //Synchronization implementation for ReentrantReadWriteLock.
    //Subclassed into fair and nonfair versions.
    abstract static class Sync extends AbstractQueuedSynchronizer {
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
      
        //Returns the number of exclusive holds represented in count
        static int exclusiveCount(int c) {
            //获取写锁状态:c & ((1 << 16) - 1)
            //也就是从state变量中查找当前获得写锁的线程数量
            return c & EXCLUSIVE_MASK;
        }  
     
        //Note that tryRelease and tryAcquire can be called by Conditions. 
        //So it is possible that their arguments contain both read and write holds 
        //that are all released during a condition wait and re-established in tryAcquire.
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            int nextc = getState() - releases;//递减写锁的次数
            boolean free = exclusiveCount(nextc) == 0;//计算写锁的重入次数
            if (free) {
                setExclusiveOwnerThread(null);
            }
            setState(nextc);
            return free;
        }
        ...
    }
    ...
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...
    //释放写锁(独占锁)
    //Releases in exclusive mode.
    //Implemented by unblocking one or more threads if #tryRelease returns true.
    //This method can be used to implement method Lock#unlock.
    public final boolean release(int arg) {
        //tryRelease()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryRelease()方法
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0) {
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }
    ...
}

4.ReentractReadWriteLock如何竞争读锁

(1)ReadLock的获取

(2)ReadLock的释放

(3)fullTryAcquireShared()方法

(1)ReadLock的获取

ReadLock的lock()方法会调用AQS的acquireShared()模版方法来获取读锁,而AQS的acquireShared()方法又会调用Sync类的tryAcquireShared()方法。

在继承自AQS的Sync类的tryAcquireShared()方法中:首先会判断是否有线程持有写锁 + 持有写锁的线程是否是当前线程。如果有线程持有写锁,但不是当前线程持有写锁,那么会阻塞当前线程。然后判断当前线程获取读锁是否应该阻塞,读锁重入次数是否小于65535,以及通过CAS修改state值来抢占读锁是否成功。

如果当前线程获取读锁不应该被阻塞,读锁重入次数也小于65535,且CAS抢占读锁也成功,那么就使用ThreadLocal记录线程重入读锁的次数。否则,就继续调用fullTryAcquireShared()方法通过自旋尝试获取锁。

如果调用Sync的tryAcquireShared()方法返回-1,则调用AQS的doAcquireShared()方法入队等待队列和阻塞当前线程。

在等待队列中,如果等待获取读锁的线程被唤醒,那么会继续循环把其后连续的所有等待获取读锁的线程都唤醒,直到遇到一个等待获取写锁的线程为止。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    //Performs all synchronization mechanics
    final Sync sync;

    //The lock returned by method {@link ReentrantReadWriteLock#readLock}.
    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
      
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
      
        //Acquires the read lock.
        //Acquires the read lock if the write lock is not held by another thread and returns immediately.
        //If the write lock is held by another thread then the current thread becomes disabled 
        //for thread scheduling purposes and lies dormant until the read lock has been acquired.
        public void lock() {
            //执行AQS的acquireShared()方法,获取读锁(共享锁)
            sync.acquireShared(1);
        }
        ...
    }
    
    //Synchronization implementation for ReentrantReadWriteLock.
    //Subclassed into fair and nonfair versions.
    abstract static class Sync extends AbstractQueuedSynchronizer {        
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
        //Returns the number of exclusive holds represented in count
        static int exclusiveCount(int c) {
            //获取写锁状态:c & ((1 << 16) - 1)
            //也就是从state变量中查找当前获得写锁的线程数量
            return c & EXCLUSIVE_MASK;
        }
      
        //Returns the number of shared holds represented in count
        static int sharedCount(int c) {
            //获取读锁状态:c >>> 16
            //也就是从state变量中查找当前获得读锁的线程数量
            return c >>> SHARED_SHIFT;
        }
   
        //A counter for per-thread read hold counts.
        //Maintained as a ThreadLocal; cached in cachedHoldCounter
        static final class HoldCounter {
            int count = 0;
            //Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }
       
        //ThreadLocal subclass. Easiest to explicitly define for sake of deserialization mechanics.
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        //The number of reentrant read locks held by current thread.
        //Initialized only in constructor and readObject.
        //Removed whenever a thread's read hold count drops to 0.
        private transient ThreadLocalHoldCounter readHolds;
       
        //The hold count of the last thread to successfully acquire readLock. 
        //This saves ThreadLocal lookup in the common case where the next thread to release is the last one to acquire. 
        //This is non-volatile since it is just used as a heuristic, and would be great for threads to cache.
        private transient HoldCounter cachedHoldCounter;

        //firstReader is the first thread to have acquired the read lock.
        //firstReaderHoldCount is firstReader's hold count.
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState());// ensures visibility of readHolds
        }
      
        //获取读锁(共享锁)
        protected final int tryAcquireShared(int unused) {
            //Walkthrough:
            //1.If write lock held by another thread, fail.
            //2.Otherwise, this thread is eligible for lock wrt state, 
            //so ask if it should block because of queue policy. 
            //If not, try to grant by CASing state and updating count.
            //Note that step does not check for reentrant acquires, 
            //which is postponed to full version to avoid having to check hold count in the more typical non-reentrant case.
            //3.If step 2 fails either because thread apparently not eligible or CAS fails or count saturated, 
            //chain to version with full retry loop.
            Thread current = Thread.currentThread();
            int c = getState();
            //如果写锁的持有线程不是当前线程,则直接阻塞
            //从而说明,如果一个线程先获取了写锁,那么是可以重入获取读锁的
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) {
                return -1;
            }
            int r = sharedCount(c);//获取读锁的状态
            //首先判断当前线程获取读锁是否应该阻塞,然后判断读锁重入次数是否小于65535,最后通过CAS修改state值抢占读锁
            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
                //使用ThreadLocal记录每个线程重入读锁的次数
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        cachedHoldCounter = rh = readHolds.get();
                    } else if (rh.count == 0) {
                        readHolds.set(rh);
                    }
                    rh.count++;
                }
                return 1;
            }
            //如果当前线程获取读锁失败,则调用fullTryAcquireShared()方法通过自旋尝试获取锁
            return fullTryAcquireShared(current);
        }
        ...
    }    
    ...
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...
    //获取读锁(共享锁)
    //Acquires in shared mode, ignoring interrupts.
    //Implemented by first invoking at least once #tryAcquireShared, returning on success.
    //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquireShared until success.
    public final void acquireShared(int arg) {
        //tryAcquireShared()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryAcquireShared()方法
        if (tryAcquireShared(arg) < 0) {
            //调用AQS的doAcquireShared()方法入队等待队列和阻塞当前线程
            doAcquireShared(arg);
        }
    }
    
    //Acquires in shared uninterruptible mode.
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //注意:如果等待获取读锁的线程被唤醒,那么会继续循环把其后连续的所有等待获取读锁的线程都唤醒
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted) {
                            selfInterrupt();
                        }
                        failed = false;
                        return;
                    }
                }
                //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL
                //执行parkAndCheckInterrupt()方法挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }
    ...
}

(2)ReadLock的释放

ReadLock的unlock()方法会调用AQS的releaseShared()模版方法来释放锁,而AQS的releaseShared()方法又会调用Sync类的tryReleaseShared()方法。

在Sync类的tryReleaseShared()方法中:首先会结合ThreadLocal处理当前线程重入读锁的次数,然后再通过自旋 + CAS设置state值来实现释放读锁,最后执行AQS的doReleaseShared()方法唤醒阻塞的线程。

tryRelease()和tryReleaseShared()的区别:读锁是共享锁,由多个线程持有,所以释放读锁需要通过自旋 + CAS完成。写锁是独占锁,由单个线程持有,所以释放写锁时不需要CAS操作。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    //Performs all synchronization mechanics
    final Sync sync;

    //The lock returned by method {@link ReentrantReadWriteLock#readLock}.
    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
      
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
      
        //Attempts to release this lock.
        //If the number of readers is now zero then the lock is made available for write lock attempts.
        public void unlock() {
            //执行AQS的releaseShared()方法,释放读锁(共享锁)
            sync.releaseShared(1);
        }
        ...
    }
    
    //Synchronization implementation for ReentrantReadWriteLock.
    //Subclassed into fair and nonfair versions.
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
        //释放读锁(共享锁)
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //首先结合ThreadLocal处理当前线程重入读锁的次数
            if (firstReader == current) {
                if (firstReaderHoldCount == 1) {
                    firstReader = null;
                } else {
                    firstReaderHoldCount--;
                }
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current)) {
                    rh = readHolds.get();
                }
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0) {
                        throw unmatchedUnlockException();
                    }
                }
                --rh.count;
            }
            //然后通过自旋 + CAS设置state值来实现释放读锁
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc)) {
                    //Releasing the read lock has no effect on readers,
                    //but it may allow waiting writers to proceed if both read and write locks are now free.
                    return nextc == 0;
                }
            }
        }
        ...
    }
    ...
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...
    //释放读锁(共享锁)
    //Releases in shared mode.
    //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
    public final boolean releaseShared(int arg) {
        //tryReleaseShared()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryReleaseShared()方法
        if (tryReleaseShared(arg)) {
            //执行AQS的doReleaseShared()方法唤醒阻塞的线程
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    //Release action for shared mode -- signals successor and ensures propagation. 
    //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
    private void doReleaseShared() {
        //Ensure that a release propagates, even if there are other in-progress acquires/releases.  
        //This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal. 
        //But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues.
        //Additionally, we must loop in case a new node is added while we are doing this. 
        //Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                        //loop to recheck cases
                        continue;
                    }
                    //执行AQS的unparkSuccessor()方法
                    unparkSuccessor(h);
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                    //loop on failed CAS
                    continue;
                }
            }
            //loop if head changed
            if (h == head) {
                break;
            }
        }
    }
    
    //Wakes up node's successor, if one exists.
    private void unparkSuccessor(Node node) {
        //If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling.  
        //It is OK if this fails or if status is changed by waiting thread.
        //Node结点的状态watiStatus可以分为如下几种:
        //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
        //默认情况下,watiStatus应该是0,或者是空
        //获得头结点的状态
        int ws = node.waitStatus;
        //需要设置头结点的状态为0
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }

        //Thread to unpark is held in successor, which is normally just the next node.  
        //But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
        //获取头结点的后继结点
        Node s = node.next;
        //如果头结点的后继结点为null或其状态为CANCELED
        if (s == null || s.waitStatus > 0) {
            s = null;
            //那么就从尾结点开始扫描,找到距离头结点最近的 + 状态不是取消的结点
            for (Node t = tail; t != null && t != node; t = t.prev) {
                if (t.waitStatus <= 0) {
                    s = t;
                }
            }
        }
        if (s != null) {
            //唤醒传入的头结点的后继结点对应的线程
            LockSupport.unpark(s.thread);
        }
    }
    ...
}

(3)fullTryAcquireShared()方法

在如下两种情况下,该方法会返回-1让当前线程加入等待队列进行等待。

情况一:此时有线程获得了写锁但不是当前线程获得写锁后重入读锁。

情况二:readerShouldBlock()方法返回true且不是重入读锁。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...        
    //Synchronization implementation for ReentrantReadWriteLock.
    //Subclassed into fair and nonfair versions.
    abstract static class Sync extends AbstractQueuedSynchronizer {        
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
        //Returns the number of exclusive holds represented in count
        static int exclusiveCount(int c) {
            //获取写锁状态:c & ((1 << 16) - 1)
            //也就是从state变量中查找当前获得写锁的线程数量
            return c & EXCLUSIVE_MASK;
        }
      
        //Returns the number of shared holds represented in count
        static int sharedCount(int c) {
            //获取读锁状态:c >>> 16
            //也就是从state变量中查找当前获得读锁的线程数量
            return c >>> SHARED_SHIFT;
        }
   
        //A counter for per-thread read hold counts.
        //Maintained as a ThreadLocal; cached in cachedHoldCounter
        static final class HoldCounter {
            int count = 0;
            //Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }
       
        //ThreadLocal subclass. Easiest to explicitly define for sake of deserialization mechanics.
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        //The number of reentrant read locks held by current thread.
        //Initialized only in constructor and readObject.
        //Removed whenever a thread's read hold count drops to 0.
        private transient ThreadLocalHoldCounter readHolds;
       
        //The hold count of the last thread to successfully acquire readLock. 
        //This saves ThreadLocal lookup in the common case where the next thread to release is the last one to acquire. 
        //This is non-volatile since it is just used as a heuristic, and would be great for threads to cache.
        private transient HoldCounter cachedHoldCounter;

        //firstReader is the first thread to have acquired the read lock.
        //firstReaderHoldCount is firstReader's hold count.
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState());// ensures visibility of readHolds
        }
        ...
        
        //Full version of acquire for reads, 
        //that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.
        final int fullTryAcquireShared(Thread current) {
            //This code is in part redundant with that in tryAcquireShared 
            //but is simpler overall by not complicating tryAcquireShared with interactions 
            //between retries and lazily reading hold counts.
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    //如果当前有其他线程获取到写锁,那么返回-1,当前线程会挂起阻塞并进入等待队列
                    if (getExclusiveOwnerThread() != current) {
                        return -1;
                    }
                    //else we hold the exclusive lock; blocking here would cause deadlock.
                } else if (readerShouldBlock()) {
                    //如果readerShouldBlock()返回true,表示当前线程获取读锁需要被阻塞
                
                    //结合ThreadLocal处理当前线程重入读锁的次数
                    //Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        //assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0) {
                                    readHolds.remove();
                                }
                            }
                        }
                        //如果不是重入
                        if (rh.count == 0) {
                            return -1;
                        }
                    }
                }
                if (sharedCount(c) == MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    //结合ThreadLocal处理当前线程重入读锁的次数
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                        }
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                        } else if (rh.count == 0) {
                            readHolds.set(rh);
                        }
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
        ...
    }
    ...
}

5.ReentractReadWriteLock的公平锁和非公平锁

(1)公平锁的实现代码

(2)非公平锁的实现代码

(1)公平锁的实现代码

对公平锁来说,用于判断读线程在抢占锁时是否应该阻塞的readerShouldBlock()方法,以及用于判断写线程在抢占锁时是否应该阻塞的writerShouldBlock()方法,都会通过hasQueuedPredecessors()方法判断当前队列中是否有线程排队。只要队列中有其他线程在排队,写线程和读线程都要排队尾,不能抢占锁。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    //Fair version of Sync
    static final class FairSync extends Sync {
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    
    //Synchronization implementation for ReentrantReadWriteLock.
    //Subclassed into fair and nonfair versions.
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
        //Returns true if the current thread, when trying to acquire the read lock, 
        //and otherwise eligible to do so, should block because of policy for overtaking other waiting threads.
        abstract boolean readerShouldBlock();

        //Returns true if the current thread, when trying to acquire the write lock, 
        //and otherwise eligible to do so, should block because of policy for overtaking other waiting threads.
        abstract boolean writerShouldBlock();
        ...
    }
    ...
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    //Queries whether any threads have been waiting to acquire longer than the current thread.
    //判断当前队列中是否有线程排队
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        //所以!hasQueuedPredecessors() 等价于:
        //h == t || (h.next != null && h.next.thread == Thread.currentThread())
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    ...
}

(2)非公平锁的实现代码

对非公平锁来说,writerShouldBlock()方法会直接返回false,因为写线程可以去抢非公平锁的情况一定是:没有其他线程持有锁或者是线程自己重入写锁,所以不需要阻塞。而readerShouldBlock()方法会调用一个方法来决定是否阻塞读线程,在一定程度上避免发生写锁无限等待的问题(死锁饥饿问题)。

readerShouldBlock()方法调用的这个方法就是AQS的apparentlyFirstQueuedIsExclusive()方法。如果当前等待队列头结点的后继结点是写锁结点,那么该方法就返回true,表示当前来获取读锁的读线程需要排队。如果当前等待队列头结点的后继结点是读锁结点,那么该方法就返回false,表示当前来获取读锁的读线程可以抢占锁。

由于读线程和读线程是不互斥的,如果当前正在有线程持有读锁,而新来的读线程还非公平地抢读锁,可能导致写线程永远拿不到写锁。

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    ...
    static final class NonfairSync extends Sync {
        //写线程调用
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
      
        //读线程调用
        final boolean readerShouldBlock() {
            //As a heuristic to avoid indefinite writer starvation,
            //block if the thread that momentarily appears to be head of queue, if one exists, is a waiting writer.  
            //This is only a probabilistic effect since a new reader will not block 
            //if there is a waiting writer behind other enabled readers that have not yet drained from the queue.
            return apparentlyFirstQueuedIsExclusive();
        }
    }
    ...
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    //如果当前等待队列头结点的后继结点是写锁结点,那么该方法就返回true,表示当前来获取读锁的读线程需要排队;
    //如果当前等待队列头结点的后继结点是读锁结点,那么该方法就返回false,表示当前来获取读锁的读线程可以抢占锁;
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
    ...
}

6.ReentrantReadWriteLock中的锁降级

(1)什么是ReentrantReadWriteLock中的锁降级

(2)ReentrantReadWriteLock中的锁降级例子

(1)什么是ReentrantReadWriteLock中的锁降级

这里的锁降级,指的是从写锁降级到读锁。也就是如果线程A获取了写锁,在写锁没有释放的情况下允许再去获取读锁。如果线程A获取了写锁,然后释放写锁,之后再获取读锁,这就不是锁降级。当然如果线程A获取了读锁,在读锁没有释放的情况下是不允许再获取写锁。

(2)ReentrantReadWriteLock中的锁降级例子

锁降级是为了提升性能。如果只使用写锁,那么长时间执行use(data)读数据时会阻塞其他读线程。所以通过将写锁降级为读锁,那么执行use(data)时也不会阻塞读线程。

Object data;
public void processData() {
    readLock.lock();//要先获取读锁,因为后面要读数据,比如update为true时,需要
    if (!update) {//发现要进行修改
        readLock.unlock();//必须先释放读锁,接下来才能获取写锁
        writeLock.lock();//锁降级从这里获取到写锁开始
        try {
            if (!update) {
                //准备修改数据,写数据data
                data = ...;
                update = true;        
            }  
            readLock.lock();//在获得写锁的基础上,继续获取读锁
        } finally {
            writeLock.unlock();//释放写锁,写锁降级为读锁,完成锁降级
        }
    }
    try {
        //使用数据,读数据data使用时间长
        use(data);    
    } finally {
        readLock.unlock();//释放读锁
    }
}

7.Condition的说明介绍

(1)Condition接口

(2)Condition说明

(1)Condition接口

public interface Condition {
    //Causes the current thread to wait until it is signalled or Thread#interrupt interrupted.
    void await() throws InterruptedException;
    //Causes the current thread to wait until it is signalled.
    void awaitUninterruptibly();
    //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses.
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //Wakes up one waiting thread.
    void signal();
    //Wakes up all waiting threads.
    void signalAll();
}

(2)Condition说明

一.Condition与wait/notify的对比

Condition的功能和wait/notify类似,可以实现等待/通知模式。wait/notify必须要和synchronized一起使用,Condition也必须要和Lock一起使用。Condition避免了wait/notify的生产者通知生产者,消费者通知消费者的问题。

二.Condition的使用

一般都会将Condition对象作为成员变量。当调用Condition的await()方法后,当前线程会释放锁并挂起等待。当其他线程线程调用Condition的signal()方法通知当前线程(释放锁)后,当前线程才会从Condition的await()方法处返回,并且返回的时候已经获得了锁。

三.两个线程在使用Condition的交互流程

线程1 -> 获取锁 -> 释放锁 + await()阻塞等待 ->

线程2 -> 获取锁 -> signal()唤醒线程1 + 释放锁 ->

线程1 -> 被唤醒 + 尝试获取锁 -> 释放锁

四.读写锁和独占锁是否支持Condition

独占锁和读写锁的写锁都支持Condition,但是读写锁的读锁是不支持Condition的。

ReentrantLock支持Condition。

ReentrantReadWriteLock的WriteLock支持Condition。

五.由AQS的内部类ConditionObject实现

每个Condition对象都有一个Condition队列,这个队列是Condition对象实现等待/通知功能的关键。

六.Condition的应用场景

LinkedBlockingQueue、ArrayBlockQueue、CyclicBarrier都用到了Condition来实现线程等待。

public class ConditionDemo() {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static void main(String[] args) throws Exception {
        new Thread() {
            public void run() {
                lock.lock();
                System.out.println("第一个线程加锁");
                try {
                    System.out.println("第一个线程释放锁以及阻塞等待");
                    condition.await();
                    System.out.println("第一个线程重新获取锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("第一个线程释放锁");
                lock.unlock();
            };
        }.start();
        Thread.sleep(3000);
        new Thread() {
            public void run() {
                lock.lock();
                System.out.println("第二个线程加锁");
                System.out.println("第二个线程唤醒第一个线程");
                condition.signal();
                lock.unlock();
                System.out.println("第二个线程释放锁");
            };
        }.start();
    }
}

8.Condition的源码实现

(1)创建ConditionObject对象

(2)ConditionObject的Condition队列

(3)ConditionObject的等待方法await()

(4)ConditionObject的通知方法signal()

(1)创建ConditionObject对象

调用ReentrantLock的newCondition()方法可以创建一个Condition对象,ReentrantLock的newCondition()方法会调用Sync的newCondition()方法,而Sync的newCondition()方法会创建ConditionObject对象。

public class ReentrantLock implements Lock, java.io.Serializable {
    ...
    //Synchronizer providing all implementation mechanics
    private final Sync sync;

    //Returns a {@link Condition} instance for use with this Lock instance.
    public Condition newCondition() {
        //执行ReentrantLock内部类Sync的newCondition()方法
        return sync.newCondition();
    }
    
    //Base of synchronization control for this lock.
    //Subclassed into fair and nonfair versions below. 
    //Uses AQS state to represent the number of holds on the lock.
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        ...
    }
    ...
}
    
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...
    public class ConditionObject implements Condition, java.io.Serializable {
        //First node of condition queue.
        private transient Node firstWaiter;
      
        //Last node of condition queue.
        private transient Node lastWaiter;

        //Creates a new ConditionObject instance.
        public ConditionObject() {
              
        }
        ...
    }
    ...
}

(2)ConditionObject的Condition队列

在ConditionObject对象中,会有一个Condition队列,ConditionObject对象维护的Condition队列是一个单向链表。

ConditionObject对象的属性firstWaiter和lastWaiter代表队列的头尾结点。当线程调用await()方法时,该线程就会被封装成Node结点。然后该Node结点就会被作为新增结点加入到Condition队列的尾部。

由于Condition对象拥有Condition队列的头尾结点的引用,所以只需将原来尾结点的nextWaiter指向新增结点,并更新尾结点即可。这个更新过程无须使用CAS保证,因为调用await()方法的线程已经获取锁了。

(3)ConditionObject的等待方法await()

ConditionObject的await()方法的主要逻辑:

一.通过addConditionWaiter()方法将当前线程添加到Condition队列

二.通过fullyRelease()方法释放锁

三.通过LockSupport.park()方法挂起当前线程

四.被signal()方法唤醒后,通过acquireQueued()方法尝试获取锁

其实相当于将等待队列的头结点(获取了锁的结点)移动到Condition队列中。但等待队列的头结点不会直接加入Condition队列,而是会把当前线程封装成一个新的Node结点加入到Condition队列尾部。

注意:调用await()方法的线程其实已成功获取锁,该线程对应等待队列的头结点。await()方法会将当前线程封装成结点并加入到Condition队列,然后释放锁,并唤醒等待队列头结点的后继结点对应线程,再挂起当前线程进入等待状态。当该线程被signal()方法唤醒后,便会通过acquireQueued()方法尝试获取锁。所以调用await()方法的线程在阻塞后被唤醒,也有可能获取锁失败继续阻塞。

Condition.await()原理总结:将自己加入Condition队列、释放锁、挂起自己。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...        
    public class ConditionObject implements Condition, java.io.Serializable {
        //First node of condition queue.
        private transient Node firstWaiter;
      
        //Last node of condition queue.
        private transient Node lastWaiter;
        ...
        
        //Implements interruptible condition wait.
        //If current thread is interrupted, throw InterruptedException.
        public final void await() throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            //1.执行ConditionObject的addConditionWaiter()方法,将当前线程封装成Node结点并加入Condition队列
            Node node = addConditionWaiter();
            //2.调用AQS的fullyRelease()方法释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                //3.阻塞当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
                    break;
                }
            }
            //4.当前线程被signal()方法唤醒后,执行AQS的acquireQueued()方法尝试获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                interruptMode = REINTERRUPT;
            }
            if (node.nextWaiter != null) {// clean up if cancelled
                unlinkCancelledWaiters();
            }
            if (interruptMode != 0) {
                reportInterruptAfterWait(interruptMode);
            }
        }

        //将当前线程封装成Node结点并加入Condition队列
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            //If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null) {
                firstWaiter = node;
            } else {
                t.nextWaiter = node;
            }
            lastWaiter = node;
            return node;
        }
        ...
    }
    
    //释放锁
    //Invokes release with current state value; returns saved state.
    //Cancels node and throws exception on failure.
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed) {
                node.waitStatus = Node.CANCELLED;
            }
        }
    }
    
    //Releases in exclusive mode.
    //Implemented by unblocking one or more threads if #tryRelease returns true.
    //This method can be used to implement method Lock#unlock.
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0) {
                //唤醒等待队列头结点的后继结点
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }
    
    //Acquires in exclusive uninterruptible mode for thread already in queue. 
    //Used by condition wait methods as well as acquire.
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL
                //执行parkAndCheckInterrupt()方法挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }
    ...
}

(4)ConditionObject的通知方法signal()

ConditionObject的signal()方法的主要逻辑:

一.首先从Condition队列中取出等待时间最长的结点,也就是first结点

二.然后将等待时间最长的结点(first结点)转移到AQS的等待队列(CLH队列)中

三.最后唤醒该first结点对应的线程

由于该first结点对应的线程在await()方法中加入Condition队列后被阻塞,所以该first结点对应的线程在被唤醒后,会回到await()方法中继续执行,也就是会执行AQS的acquireQueued()方法去尝试获取锁。

调用signal()方法的前提条件是当前线程必须获得了锁,所以signal()方法首先会检查当前线程是否获取了锁,接着才去获取Condition队列的first结点,然后才将first结点移动到等待队列,并唤醒该first结点对应的线程。

通过调用AQS的enq()方法,Condition队列的first结点会添加到等待队列。当first结点被移动到等待队列后,再唤醒first结点对应的线程尝试获取锁。

被唤醒的first结点对应的线程,将从await()方法中的while循环退出。因为已经在等待队列中,所以isOnSyncQueue()方法会返回true,从而会调用AQS的acquireQueued()方法来竞争获取锁。

Condition.signal()原理总结:把Condition队列中的头结点,转化为等待队列中的尾结点。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
    ...        
    public class ConditionObject implements Condition, java.io.Serializable {
        //First node of condition queue.
        private transient Node firstWaiter;
      
        //Last node of condition queue.
        private transient Node lastWaiter;
        ...
        
        //Moves the longest-waiting thread, if one exists, 
        //from the wait queue for this condition to the wait queue for the owning lock.
        public final void signal() {
            //通过isHeldExclusively()方法检查当前线程是否获取了锁
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            Node first = firstWaiter;
            if (first != null) {
                doSignal(first);
            }
        }
       
        //Removes and transfers nodes until hit non-cancelled one or null. 
        //Split out from signal in part to encourage compilers to inline the case of no waiters.
        private void doSignal(Node first) {
            do {
                //firstWaiter后移
                if ((firstWaiter = first.nextWaiter) == null) {
                    lastWaiter = null;
                }
                //firstWaiter出队
                first.nextWaiter = null;
            } while (!transferForSignal(first) && (first = firstWaiter) != null);
        }
       
        //Transfers a node from a condition queue onto sync queue.
        //Returns true if successful.
        final boolean transferForSignal(Node node) {
            //在addConditionWaiter()方法时,node就被封装为CONDITION类型
            //如果CAS失败,说明当前结点已被修改为CANCELED,此时需要继续查找Condition队列的下一个结点
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                return false;
            }
            //将node结点转移到等待队列中,返回等待队列的尾结点
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
                //唤醒node结点
                LockSupport.unpark(node.thread);
            }
            return true;
        }
     
        //Implements interruptible condition wait.
        //If current thread is interrupted, throw InterruptedException.
        public final void await() throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            //1.执行ConditionObject的addConditionWaiter()方法,将当前线程封装成Node结点并加入Condition队列
            Node node = addConditionWaiter();
            //2.调用AQS的fullyRelease()方法释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //一开始node结点并不在等待队列中,所以isOnSyncQueue()方法返回false,进行阻塞
            //后来其他线程调用signal()方法,该node结点就会被唤醒,然后发现该node结点已经在等待队列中,于是isOnSyncQueue()方法返回true 
            while (!isOnSyncQueue(node)) {
                //3.阻塞当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
                    break;
                }
            }
            //4.当前线程被signal()方法唤醒后,执行AQS的acquireQueued()方法尝试获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                interruptMode = REINTERRUPT;
            }
            if (node.nextWaiter != null) {// clean up if cancelled
                unlinkCancelledWaiters();
            }
            if (interruptMode != 0) {
                reportInterruptAfterWait(interruptMode);
            }
        }
        ...
    }
    
    //Inserts node into queue, initializing if necessary. See picture above.
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) {
                    tail = head;
                }
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    ...
}

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

点赞 评论 收藏
分享
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务