JUC并发—7.AQS源码分析三
大纲
1.等待多线程完成的CountDownLatch介绍
2.CountDownLatch.await()方法源码
3.CountDownLatch.coutDown()方法源码
4.CountDownLatch总结
5.控制并发线程数的Semaphore介绍
6.Semaphore的令牌获取过程
7.Semaphore的令牌释放过程
8.同步屏障CyclicBarrier介绍
9.CyclicBarrier的await()方法源码
10.使用CountDownLatch等待注册的完成
11.使用CyclicBarrier将工作任务多线程分而治之
12.使用CyclicBarrier聚合服务接口的返回结果
13.使用Semaphore等待指定数量线程完成任务
volatile、synchronized、CAS、AQS、读写锁、锁优化和锁故障、并发集合、线程池、同步组件
1.等待多线程完成的CountDownLatch
(1)CountDownLatch的简介
(2)CountDownLatch的应用
(3)CountDownLatch的例子
(1)CountDownLatch的简介
CountDownLatch允许一个或多个线程等待其他线程完成操作。CountDownLatch提供了两个核心方法,分别是await()方法和countDown()方法。CountDownLatch.await()方法让调用线程进行阻塞进入等待状态,CountDownLatch.countDown()方法用于对计数器进行递减。
CountDownLatch在构造时需要传入一个正整数作为计数器初始值。线程每调用一次countDown()方法,都会对该计数器减一。当计数器为0时,会唤醒所有执行await()方法时被阻塞的线程。
(2)CountDownLatch的应用
应用一:
使用多线程去解析一个Excel里多个sheet的数据,每个线程解析一个sheet里的数据,等所有sheet解析完再提示处理完成。此时便可以使用CountDownLatch来实现,当然可以使用Thread.join()方法。
注意:Thread.join()方法是基于wait()和notify()来实现的。在main线程里开启一个线程A,main线程如果执行了线程A的join()方法,那么就会导致main线程被阻塞,main线程会等待线程A执行完毕才会继续往下执行。
应用二:
微服务注册中心的register-client,为了在注册线程执行成功后,才发送心跳。可以使用CountDownLatch,当然也可以使用Thread.join()方法。
应用三:
可以通过CountDownLatch实现类似并发的效果。把CountDownLatch的计数器设置为1,然后让1000个线程调用await()方法。当1000个线程初始化完成后,在main线程调用countDown()让计数器归零。这样这1000个线程就会在一个for()循环中,依次被唤醒。
(3)CountDownLatch的例子
public class CountDownLatchDemo { public static void main(String[] args) throws Exception { final CountDownLatch latch = new CountDownLatch(2); new Thread() { public void run() { try { Thread.sleep(1000); System.out.println("线程1开始执行,休眠2秒..."); Thread.sleep(1000); System.out.println("线程1准备执行countDown操作..."); latch.countDown(); System.out.println("线程1完成执行countDown操作..."); } catch (Exception e) { e.printStackTrace(); } } }.start(); new Thread() { public void run() { try { Thread.sleep(1000); System.out.println("线程2开始执行,休眠2秒..."); Thread.sleep(1000); System.out.println("线程2准备执行countDown操作..."); latch.countDown(); System.out.println("线程2完成执行countDown操作..."); } catch (Exception e) { e.printStackTrace(); } } }.start(); System.out.println("main线程准备执行countDownLatch的await操作,将会同步阻塞等待..."); latch.await(); System.out.println("所有线程都完成countDown操作,结束同步阻塞等待..."); } }
2.CountDownLatch.await()方法源码
(1)CountDownLatch.await()方法的阻塞流程
(2)CountDownLatch.await()方法的唤醒流程
(3)CountDownLatch.await()方法的阻塞总结
(1)CountDownLatch.await()方法的阻塞流程
CountDownLatch是基于AQS中的共享锁来实现的。从CountDownLatch的构造方法可知,CountDownLatch的count就是AQS的state。
调用CountDownLatch的await()方法时,会先调用AQS的acquireSharedInterruptibly()模版方法,然后会调用CountDownLatch的内部类Sync实现的tryAcquireShared()方法。tryAcquireShared()方法会判断state的值是否为0,如果为0,才返回1,否则返回-1。
当调用CountDownLatch内部类Sync的tryAcquireShared()方法获得的返回值是-1时,才会调用AQS的doAcquireSharedInterruptibly()方法,将当前线程封装成Node结点加入等待队列,然后挂起当前线程进行阻塞。
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. public class CountDownLatch { private final Sync sync; public CountDownLatch(int count) { if (count < 0) { throw new IllegalArgumentException("count < 0"); } this.sync = new Sync(count); } //Synchronization control For CountDownLatch. //Uses AQS state to represent count. private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { //Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) { return false; } int nextc = c-1; if (compareAndSetState(c, nextc)) { return nextc == 0; } } } } //Causes the current thread to wait until the latch has counted down to zero, //unless the thread is Thread#interrupt interrupted. public void await() throws InterruptedException { //执行AQS的acquireSharedInterruptibly()方法 sync.acquireSharedInterruptibly(1); } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in shared mode, aborting if interrupted. //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, //invoking #tryAcquireShared until success or the thread is interrupted. public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //执行CountDownLatch的内部类Sync实现的tryAcquireShared()方法,抢占共享锁 if (tryAcquireShared(arg) < 0) { //执行AQS的doAcquireSharedInterruptibly()方法 doAcquireSharedInterruptibly(arg); } } //Acquires in shared interruptible mode. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点 boolean failed = true; try { //第一次循环r = -1,所以会执行AQS的shouldParkAfterFailedAcquire()方法 //将node结点的有效前驱结点的状态设置为SIGNAL for (;;) { final Node p = node.predecessor();//node结点的前驱结点 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException(); } } } finally { if (failed) { cancelAcquire(node); } } } //Checks and updates status for a node that failed to acquire. //Returns true if thread should block. This is the main signal control in all acquire loops. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) { //This node has already set status asking a release to signal it, so it can safely park. return true; } if (ws > 0) { //Predecessor was cancelled. Skip over predecessors and indicate retry. do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //waitStatus must be 0 or PROPAGATE. //Indicate that we need a signal, but don't park yet. //Caller will need to retry to make sure it cannot acquire before parking. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //设置头结点和唤醒后续线程 //Sets head of queue, and checks if successor may be waiting in shared mode, //if so propagating if either propagate > 0 or PROPAGATE status was set. private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node);//将node结点设置为头结点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ... }
(2)CountDownLatch.await()方法的唤醒流程
调用await()方法时,首先会将当前线程封装成Node结点并添加到等待队列中,然后在执行第一次for循环时会设置该Node结点的前驱结点状态为SIGNAL,接着在执行第二次for循环时才会将当前线程进行挂起阻塞。
当该线程后续被唤醒时,该线程又会进入下一次for循环。如果该线程对应的node结点的前驱结点是等待队列的头结点且state值已为0,那么就执行AQS的setHeadAndPropagate()方法设置头结点 + 唤醒后续线程。
其中setHeadAndPropagate()方法有两个工作(设置头结点 + 唤醒传递):
工作一:设置当前被唤醒线程对应的结点为头结点
工作二:当满足如下这两个条件的时候需要调用doReleaseShared()方法唤醒后续的线程
条件一:propagate > 0,表示当前是共享锁,需要进行唤醒传递
条件二:s.isShared()判断当前结点为共享模式
CountDownLatch的实现中会在以下两个场景调用doReleaseShared()方法:
场景一:state为1时调用的countDown()方法会调用doReleaseShared()方法
场景二:当阻塞的线程被唤醒时,会调用setHeadAndPropagate()方法,进而调用doReleaseShared()方法,这样可以提升唤醒共享结点的速度
(3)CountDownLatch.await()方法的阻塞总结
只要state != 0,就会进行如下处理:
一.将当前线程封装成一个Node结点,然后添加到AQS的等待队列中
二.调用LockSupport.park()方法,挂起当前线程
3.CountDownLatch.coutDown()方法源码
(1)CountDownLatch.coutDown()的唤醒流程
(2)CountDownLatch.tryReleaseShared()
(3)AQS的doReleaseShared()方法
(1)CountDownLatch.coutDown()的唤醒流程
调用CountDownLatch的countDown()方法时,会先调用AQS的releaseShared()模版方法,然后会执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法。
如果tryReleaseShared()方法返回true,则执行AQS的doReleaseShared()方法,通过AQS的doReleaseShared()方法唤醒共享锁模式下的等待队列中的线程。
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. public class CountDownLatch { private final Sync sync; public CountDownLatch(int count) { if (count < 0) { throw new IllegalArgumentException("count < 0"); } this.sync = new Sync(count); } //Synchronization control For CountDownLatch. //Uses AQS state to represent count. private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { //Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) { return false; } int nextc = c-1; if (compareAndSetState(c, nextc)) { return nextc == 0; } } } } //Decrements the count of the latch, releasing all waiting threads if the count reaches zero. public void countDown() { //执行AQS的releaseShared()方法 sync.releaseShared(1); } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true. public final boolean releaseShared(int arg) { //执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法,释放共享锁 if (tryReleaseShared(arg)) { //执行AQS的doReleaseShared()方法 doReleaseShared(); return true; } return false; } //Release action for shared mode -- signals successor and ensures propagation. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal. private void doReleaseShared() { for (;;) { //每次循环时头结点都会发生变化 //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程 //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点 Node h = head;//获取最新的头结点 if (h != null && h != tail) {//等待队列中存在挂起线程的结点 int ws = h.waitStatus; if (ws == Node.SIGNAL) {//头结点的状态正常,表示对应的线程可以被唤醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) { continue;//loop to recheck cases } //唤醒头结点的后继结点 //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { //如果ws = 0表示初始状态,则修改结点为PROPAGATE状态 continue;//loop on failed CAS } } if (h == head) {//判断头结点是否有变化 break;//loop if head changed } } } //Wakes up node's successor, if one exists. private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) { if (t.waitStatus <= 0) { s = t; } } } if (s != null) { LockSupport.unpark(s.thread); } } ... }
(2)CountDownLatch.tryReleaseShared()
从tryReleaseShared()方法可知:每次countDown()其实就是把AQS的state值减1,然后通过CAS更新state值。如果CAS设置成功,那么就判断当前state值是否为0。如果是0那么就返回true,如果不是0那么就返回false。返回true的时候会调用AQS的doReleaseShared()方法,唤醒等待队列中的线程。
(3)AQS的doReleaseShared()方法
该方法要从AQS的等待队列中唤醒头结点的后继结点,需要满足:
条件一:等待队列中要存在挂起线程的结点(h != null && h != tail)
条件二:等待队列的头结点的状态正常(h.waitStatus = Node.SIGNAL)
在共享锁模式下,state为0时需要通过唤醒传递把所有挂起的线程都唤醒。首先doReleaseShared()方法会通过for(;;)进行自旋操作,每次循环都会通过Node h = head来获取等待队列中最新的头结点,然后通过if (h == head)来判断等待队列中的头结点是否发生变化。如果没有变化,则退出自旋。
注意:在共享锁模式下,被unparkSuccessor()唤醒的等待队列中的线程,会继续在在doAcquireSharedInterruptibly()方法中,执行setHeadAndPropagate()方法修改头结点,从而实现唤醒传递。
4.CountDownLatch总结
假设有两个线程A和B,分别调用了CountDownLatch的await()方法,此时state所表示的计数器不为0。所以线程A和B会被封装成SHARED类型的结点,并添加到AQS的等待队列中。
当线程C调用CountDownLatch的coutDown()方法后,如果state被递减到0,那么就会调用doReleaseShared()方法唤醒等待队列中的线程。然后被唤醒的线程会继续调用setHeadAndPropagate()方法实现唤醒传递,从而继续在doReleaseShared()方法中唤醒所有在等待队列中的被阻塞的线程。
5.控制并发线程数的Semaphore介绍
(1)Semaphore的作用
(2)Semaphore的方法
(3)Semaphore原理分析
(1)Semaphore的作用
Semaphore信号量用来控制同时访问特定资源的线程数量,有两核心方法。
方法一:acquire()方法,获取一个令牌
方法二:release()方法,释放一个令牌
多个线程访问某限制访问流量的资源时,可先调用acquire()获取访问令牌。如果能够正常获得,则表示允许访问。如果令牌不够,则会阻塞当前线程。当某个获得令牌的线程通过release()方法释放一个令牌后,被阻塞在acquire()方法的线程就有机会获得这个释放的令牌。
public class SemaphoreDemo { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(10, true);//初始化10个资源,使用公平锁 semaphore.acquire();//每次获取一个资源,如果获取不到,线程就会阻塞 semaphore.release();//释放一个资源 } }
(2)Semaphore的方法
Semaphore实际上并没有一个真实的令牌发给线程,Semaphore只是对一个可分配数量进行计数维护,或者说进行许可证管理。Semaphore可以在公共资源有限的场景下实现流量控制,如数据库连接。
一.Semaphore(permits, fair):permits表示令牌数,fair表示公平性 二.acquire(permits):获取指定数量的令牌,如果数量不足则阻塞当前线程 三.tryAcquire(permits):尝试获取指定数量的令牌,此过程是非阻塞的,成功返回true,失败返回false 四.release(permits):释放指定数量的令牌 五.drainPermits():当前线程获得剩下的所有令牌 六.hasQueuedThread():判断当前Semaphore实例上是否存在等待令牌的线程
(3)Semaphore原理分析
Semaphore也是基于AQS中的共享锁来实现的。在创建Semaphore实例时传递的参数permits,其实就是AQS中的state属性。每次调用Semaphore的acquire()方法,都会对state值进行递减。
所以从根本上说,Semaphore是通过重写AQS的两个方法来实现的:
方法一:tryAcquireShared(),抢占共享锁
方法二:tryReleaseShared(),释放共享锁
public class Semaphore implements java.io.Serializable { private final Sync sync; //Creates a Semaphore with the given number of permits and nonfair fairness setting. public Semaphore(int permits) { sync = new NonfairSync(permits); } static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } //Acquires a permit from this semaphore, blocking until one is available, //or the thread is Thread#interrupt interrupted. public void acquire() throws InterruptedException { //执行AQS的模版方法acquireSharedInterruptibly() sync.acquireSharedInterruptibly(1); } //Releases a permit, returning it to the semaphore. public void release() { //执行AQS的模版方法releaseShared() sync.releaseShared(1); } //Synchronization implementation for semaphore. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { //设置state的值为传入的令牌数 setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) { throw new Error("Maximum permit count exceeded"); } if (compareAndSetState(current, next)) { return true; } } } ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in shared mode, aborting if interrupted. //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, //invoking #tryAcquireShared until success or the thread is interrupted. public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //执行Semaphore的内部类Sync的子类实现的tryAcquireShared()方法,抢占共享锁 if (tryAcquireShared(arg) < 0) { //执行AQS的doAcquireSharedInterruptibly()方法 doAcquireSharedInterruptibly(arg); } } //Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true. public final boolean releaseShared(int arg) { //执行Semaphore的内部类Sync实现的tryReleaseShared()方法,释放共享锁 if (tryReleaseShared(arg)) { //执行AQS的doReleaseShared()方法 doReleaseShared(); return true; } return false; } ... }
6.Semaphore的令牌获取过程
(1)Semaphore的令牌获取过程
(2)Semaphore的公平策略
(3)Semaphore的非公平策略
(4)tryAcquireShared()后的处理
(1)Semaphore的令牌获取过程
在调用Semaphore的acquire()方法获取令牌时:首先会执行AQS的模版方法acquireSharedInterruptibly(),然后执行Sync子类实现的tryAcquireShared()方法来抢占锁。如果抢占锁失败,则执行AQS的doAcquireSharedInterruptibly()方法。该方法会将当前线程封装成Node结点并加入等待队列,然后挂起线程。
(2)Semaphore的公平策略
在执行Sync子类FairSync的tryAcquireShared()方法尝试获取令牌时,先通过AQS的hasQueuedPredecessors()判断是否已有线程在等待队列中。如果已经有线程在等待队列中,那么当前线程获取令牌就必然失败。否则,就递减state的值 + 判断state是否小于0 + CAS设置state的值。
(3)Semaphore的非公平策略
在执行Sync子类NonfairSync的tryAcquireShared()方法尝试获取令牌时,则会直接执行Sync的nonfairTryAcquireShared()方法来获取令牌,也就是递减state的值 + 判断state是否小于0 + CAS设置state的值。
(4)tryAcquireShared()后的处理
不管公平策略还是非公平策略,对应的tryAcquireShared()方法都是通过自旋来抢占令牌(CAS设置state),直到令牌数不够时才会让tryAcquireShared()方法返回小于0的数值。然后触发执行AQS的doAcquireSharedInterruptibly()方法,该方法会将当前线程封装成Node结点并加入等待队列,然后挂起线程。
public class Semaphore implements java.io.Serializable { private final Sync sync; //Creates a Semaphore with the given number of permits and nonfair fairness setting. public Semaphore(int permits) { sync = new NonfairSync(permits); } static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } //以非公平锁的方式获取令牌 protected int tryAcquireShared(int acquires) { //执行Sync的nonfairTryAcquireShared()方法 return nonfairTryAcquireShared(acquires); } } static final class FairSync extends Sync { FairSync(int permits) { super(permits); } //以公平锁的方式获取令牌 protected int tryAcquireShared(int acquires) { for (;;) { //如果已经有线程在等待队列中,那么就说明获取令牌必然失败 if (hasQueuedPredecessors()) { return -1; } int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } } //Acquires a permit from this semaphore, blocking until one is available, //or the thread is Thread#interrupt interrupted. public void acquire() throws InterruptedException { //执行AQS的模版方法acquireSharedInterruptibly() sync.acquireSharedInterruptibly(1); } //Synchronization implementation for semaphore. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { //设置state的值为传入的令牌数 setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in shared mode, aborting if interrupted. //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, //invoking #tryAcquireShared until success or the thread is interrupted. public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //执行Semaphore的内部类Sync的子类实现的tryAcquireShared()方法,抢占共享锁 if (tryAcquireShared(arg) < 0) { //执行AQS的doAcquireSharedInterruptibly()方法 doAcquireSharedInterruptibly(arg); } } //Queries whether any threads have been waiting to acquire longer than the current thread. public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } //Acquires in shared interruptible mode. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点 boolean failed = true; try { //第一次循环r = -1,所以会执行AQS的shouldParkAfterFailedAcquire()方法 //将node结点的有效前驱结点的状态设置为SIGNAL for (;;) { final Node p = node.predecessor();//node结点的前驱结点 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException(); } } } finally { if (failed) { cancelAcquire(node); } } } ... }
7.Semaphore的令牌释放过程
(1)Semaphore的令牌释放过程
(2)Semaphore的令牌释放本质
(1)Semaphore的令牌释放过程
在调用Semaphore的release()方法去释放令牌时:首先会执行AQS的模版方法releaseShared(),然后执行Sync实现的tryReleaseShared()方法来释放锁(累加state值)。如果释放锁成功,则执行AQS的doReleaseShared()方法去唤醒线程。
(2)Semaphore的令牌释放本质
Semaphore的release()方法释放令牌的本质就是对state字段进行累加,然后唤醒等待队列头结点的后继结点 + 唤醒传递来唤醒等待的线程。
注意:并非一定要执行acquire()方法的线程才能调用release()方法,任意一个线程都可以调用release()方法,也可以通过reducePermits()方法来减少令牌数。
public class Semaphore implements java.io.Serializable { private final Sync sync; //Creates a Semaphore with the given number of permits and nonfair fairness setting. public Semaphore(int permits) { sync = new NonfairSync(permits); } //Releases a permit, returning it to the semaphore. public void release() { //执行AQS的模版方法releaseShared() sync.releaseShared(1); } //Synchronization implementation for semaphore. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { //设置state的值为传入的令牌数 setState(permits); } //尝试释放锁,也就是对state值进行累加 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) { throw new Error("Maximum permit count exceeded"); } if (compareAndSetState(current, next)) { return true; } } } ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true. public final boolean releaseShared(int arg) { //执行Semaphore的内部类Sync实现的tryReleaseShared()方法,释放共享锁 if (tryReleaseShared(arg)) { //执行AQS的doReleaseShared()方法,唤醒等待队列中的线程 doReleaseShared(); return true; } return false; } //Release action for shared mode -- signals successor and ensures propagation. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal. private void doReleaseShared() { for (;;) { //每次循环时头结点都会发生变化 //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程 //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点 Node h = head;//获取最新的头结点 if (h != null && h != tail) {//等待队列中存在挂起线程的结点 int ws = h.waitStatus; if (ws == Node.SIGNAL) {//头结点的状态正常,表示对应的线程可以被唤醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) { continue;//loop to recheck cases } //唤醒头结点的后继结点 //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { //如果ws = 0表示初始状态,则修改结点为PROPAGATE状态 continue;//loop on failed CAS } } if (h == head) {//判断头结点是否有变化 break;//loop if head changed } } } //Wakes up node's successor, if one exists. private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) { if (t.waitStatus <= 0) { s = t; } } } if (s != null) { LockSupport.unpark(s.thread); } } ... }
8.同步屏障CyclicBarrier介绍
(1)CyclicBarrier的作用
(2)CyclicBarrier的基本原理
(1)CyclicBarrier的作用
CyclicBarrier的字面意思就是可循环使用的屏障。CyclicBarrier的主要作用就是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时屏障才会打开,接着才让所有被屏障拦截的线程一起继续往下执行。线程进入屏障是通过CyclicBarrier的await()方法来实现的。
(2)CyclicBarrier的基本原理
假设有3个线程在运行中都会调用CyclicBarrier的await()方法,而每个线程从开始执行到执行await()方法所用时间可能不一样,最终当执行时间最长的线程到达屏障时,会唤醒其他较早到达屏障的线程继续往下执行。
CyclicBarrier包含两个层面的意思:
一是Barrier屏障点,线程调用await()方法都会阻塞在屏障点,直到所有线程都到达屏障点后再放行。
二是Cyclic循环,当所有线程通过当前屏障点后,又可以进入下一轮的屏障点进行等待,可以不断循环。
9.CyclicBarrier的await()方法源码
(1)CyclicBarrier的成员变量
(2)CyclicBarrier的await()方法源码
(3)CountDownLatch和CyclicBarrier对比
(1)CyclicBarrier的成员变量
//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. //CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. //The barrier is called cyclic because it can be re-used after the waiting threads are released. public class CyclicBarrier { ... private static class Generation { boolean broken = false; } private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition();//用于线程之间相互唤醒 private final int parties;//参与的线程数量 private int count;//初始值是parties,每调用一次await()就减1 private final Runnable barrierCommand;//回调任务 private Generation generation = new Generation(); ... }
CyclicBarrier是基于ReentrantLock + Condition来实现的。
parties表示每次要求到达屏障点的线程数,只有到达屏障点的线程数满足指定的parties数量,所有线程才会被唤醒。
count是一个初始值为parties的计数器,每个线程调用await()方法会对count减1,当count为0时会唤醒所有线程,并且结束当前的屏障周期generation,然后所有线程进入下一个屏障周期,而且count会恢复成parties。
(2)CyclicBarrier的await()方法源码
线程调用CyclicBarrier的await()方法时,会触发调用CyclicBarrier的dowait()方法。
CyclicBarrier的dowait()方法会对count计数器进行递减。如果count递减到0,则会调用CyclicBarrier的nextGeneration()唤醒所有线程,同时如果异步回调任务barrierCommand不为空,则会执行该任务。如果count还没递减到0,则调用Condition的await()方法阻塞当前线程。
被阻塞的线程,除了会被CyclicBarrier的nextGeneration()方法唤醒外,还会被Thread的interrupt()方法唤醒、被中断异常唤醒,而这些唤醒会调用CyclicBarrier的breakBarrier()方法。
在CyclicBarrier的nextGeneration()方法和CyclicBarrier的breakBarrier()方法中,都会通过Condition的signalAll()方法唤醒所有被阻塞等待的线程。
//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. //CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. //The barrier is called cyclic because it can be re-used after the waiting threads are released. public class CyclicBarrier { ... private static class Generation { boolean broken = false;//用来标记屏障是否被中断 } private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition();//用于线程之间相互唤醒 private final int parties;//参与的线程数量 private int count;//初始值是parties,每调用一次await()就减1 private final Runnable barrierCommand;//回调任务 private Generation generation = new Generation(); public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } //Waits until all #getParties have invoked await on this barrier. public int await() throws InterruptedException, BrokenBarrierException { try { //执行CyclicBarrier的dowait()方法 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } //Main barrier code, covering the various policies. private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//使用Condition需要先获取锁 try { //获取当前的generation final Generation g = generation; //确认当前generation的barrier是否有效,如果generation的broken为true,则抛出屏障中断异常 if (g.broken) { throw new BrokenBarrierException(); } if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //统计已经到达当前generation的线程数量 int index = --count; //如果index为0,则表示所有线程都到达了屏障点 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) { //触发回调 command.run(); } ranAction = true; //执行nextGeneration()方法唤醒所有线程,同时进入下一个屏障周期 nextGeneration(); return 0; } finally { if (!ranAction) { breakBarrier(); } } } //loop until tripped, broken, interrupted, or timed out //如果index > 0,则阻塞当前线程 for (;;) { try { if (!timed) { //通过Condition的await()方法,在阻塞当前线程的同时释放锁 //这样其他线程就能获取到锁执行上面的index = --count trip.await(); } else if (nanos > 0L) { nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) { throw new BrokenBarrierException(); } if (g != generation) { return index; } if (timed && nanos <= 0L) { //中断屏障,设置generation.broken为true breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } //Updates state on barrier trip and wakes up everyone. //Called only while holding lock. private void nextGeneration() { //通过Condition的signalAll()唤醒所有等待的线程 trip.signalAll(); //还原count count = parties; //进入新的generation generation = new Generation(); } //Sets current barrier generation as broken and wakes up everyone. //Called only while holding lock. private void breakBarrier() { generation.broken = true; count = parties; //通过Condition的signalAll()唤醒所有等待的线程 trip.signalAll(); } ... }
(3)CountDownLatch和CyclicBarrier对比
一.CyclicBarrier可以被重用、可以响应中断
二.CountDownLatch的计数器只能使用一次,但可以通过reset()方法重置
10.使用CountDownLatch等待注册的完成
Hadoop HDFS(分布式存储系统)的NameNode分为主备两个节点,各个DataNode在启动时都会向两个NameNode进行注册,此时就可以使用CountDownLatch等待向主备节点注册的完成。
//DataNode启动类 public class DataNode { //是否还在运行 private volatile Boolean shouldRun; //负责和一组NameNode(主NameNode + 备NameNode)通信的组件 private NameNodeGroupOfferService offerService; //初始化DataNode private void initialize() { this.shouldRun = true; this.offerService = new NameNodeGroupOfferService(); this.offerService.start(); } //运行DataNode private void run() { try { while(shouldRun) { Thread.sleep(10000); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { DataNode datanode = new DataNode(); datanode.initialize(); datanode.run(); } } //负责某个NameNode进行通信的线程组件 public class NameNodeServiceActor { //向某个NameNode进行注册 public void register(CountDownLatch latch) { Thread registerThread = new RegisterThread(latch); registerThread.start(); } //负责注册的线程,传入一个CountDownLatch class RegisterThread extends Thread { CountDownLatch latch; public RegisterThread(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { //发送rpc接口调用请求到NameNode去进行注册 System.out.println("发送请求到NameNode进行注册..."); Thread.sleep(1000); latch.countDown(); } catch (Exception e) { e.printStackTrace(); } } } } //负责跟一组NameNode(主NameNode + 备NameNode)进行通信的线程组件 public class NameNodeGroupOfferService { //负责跟NameNode主节点通信的ServiceActor组件 private NameNodeServiceActor activeServiceActor; //负责跟NameNode备节点通信的ServiceActor组件 private NameNodeServiceActor standbyServiceActor; //构造函数 public NameNodeGroupOfferService() { this.activeServiceActor = new NameNodeServiceActor(); this.standbyServiceActor = new NameNodeServiceActor(); } //启动OfferService组件 public void start() { //直接使用两个ServiceActor组件分别向主备两个NameNode节点进行注册 register(); } //向主备两个NameNode节点进行注册 private void register() { try { CountDownLatch latch = new CountDownLatch(2); this.activeServiceActor.register(latch); this.standbyServiceActor.register(latch); latch.await();//阻塞等待主备都完成注册 System.out.println("主备NameNode全部注册完毕..."); } catch (Exception e) { e.printStackTrace(); } } }
11.使用CyclicBarrier将工作任务多线程分而治之
//输出结果: //线程1执行自己的一部分工作... //线程2执行自己的一部分工作... //线程3执行自己的一部分工作... //所有线程都完成自己的任务,可以合并结果了... //最终结果合并完成,线程3可以退出... //最终结果合并完成,线程1可以退出... //最终结果合并完成,线程2可以退出... public class CyclicBarrierDemo { public static void main(String[] args) { final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { public void run() { System.out.println("所有线程都完成自己的任务,可以合并结果了..."); } }); new Thread() { public void run() { try { System.out.println("线程1执行自己的一部分工作..."); barrier.await(); System.out.println("最终结果合并完成,线程1可以退出..."); } catch (Exception e) { e.printStackTrace(); } } }.start(); new Thread() { public void run() { try { System.out.println("线程2执行自己的一部分工作..."); barrier.await(); System.out.println("最终结果合并完成,线程2可以退出..."); } catch (Exception e) { e.printStackTrace(); } } }.start(); new Thread() { public void run() { try { System.out.println("线程3执行自己的一部分工作..."); barrier.await(); System.out.println("最终结果合并完成,线程3可以退出..."); } catch (Exception e) { e.printStackTrace(); } } }.start(); } }
12.使用CyclicBarrier聚合服务接口的返回结果
当然也可以使用CountDownLatch来实现聚合服务接口的返回结果;
public class ApiServiceDemo { public Map<String, Object> queryOrders() throws Exception { final List<Object> results = new ArrayList<Object>(); final Map<String, Object> map = new ConcurrentHashMap<String, Object>(); CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { map.put("price", results.get(0)); map.put("order", results.get(1)); map.put("stats", results.get(2)); } }); //请求价格接口 new Thread() { public void run() { try { System.out.println("请求价格服务..."); Thread.sleep(1000); results.add(new Object()); barrier.await(); } catch (Exception e) { e.printStackTrace(); } }; }.start(); //请求订单接口 new Thread() { public void run() { try { System.out.println("请求订单服务..."); Thread.sleep(1000); results.add(new Object()); barrier.await(); } catch (Exception e) { e.printStackTrace(); } }; }.start(); //请求统计接口 new Thread() { public void run() { try { System.out.println("请求订单统计服务..."); Thread.sleep(1000); results.add(new Object()); barrier.await(); } catch (Exception e) { e.printStackTrace(); } }; }.start(); while(map.size() < 3) { Thread.sleep(100); } return map; } }
13.使用Semaphore等待指定数量线程完成任务
可以通过Semaphore实现等待指定数量的线程完成任务才往下执行。
//输出结果如下: //线程2执行一个计算任务 //等待1个线程完成任务即可... //线程1执行一个计算任务 public class SemaphoreDemo { public static void main(String[] args) throws Exception { final Semaphore semaphore = new Semaphore(0); new Thread() { public void run() { try { Thread.sleep(2000); System.out.println("线程1执行一个计算任务"); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }.start(); new Thread() { public void run() { try { Thread.sleep(1000); System.out.println("线程2执行一个计算任务"); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }.start(); semaphore.acquire(1); System.out.println("等待1个线程完成任务即可..."); } }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等