JUC工具之CountDownLatch源码解读

CountdownLatch 倒计时计时器

      犹如倒计时的计时器,调用CountdownLatch对象的countDown()方法,就将计时器减一当计时器到达0时,则所有线程或单个线程开始执行。

     可以实现一个人(多人)等待其他所有人都来通知他,可以实现一个通知多个人的效果。类似裁判一声令下运动员同时开始奔跑。

import java.util.concurrent.CountDownLatch;

public class test2 {
	static CountDownLatch c = new CountDownLatch(10);
	public static void main(String[] args) {
		for(int i=0;i<10;i++){
			int n = i;
			Thread t = new Thread(()->{
				System.out.println("第"+n+"个到达");
				c.countDown();
			});
			t.start();
		}
		
		try {
			c.await();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("搞事情");
	}
}


        这个并发工具算是最简单的一个了,就几个方法,下面分析下常用的方法

await源码解读

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

        里面调用了AQS的acquireSharedInterruptibly方法 

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

        因为这个是带中断的获取,首先判断一下中断,很简单然后执行tryAcquireShared方法,由工具自己实现

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        刚刚开始看很奇怪,state==0才返回1,没有资源才返回1?这是什么道理?因为tryAcquireShared方法虽然由AQS定义好模板,具体实现交给各个子类,但是定义好了返回值,正数表示还剩余多少资源,0表示成功获取资源但是没有剩余了,负数代表失败,我刚刚开始就让我失败?因为我刚刚开始一定是有资源的,这是什么鬼?继续往下看

        失败就失败吧,继续doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

        这里讲解一下这些代码的含义

  1. 首先判断下中断
  2. 将节点加入sync队列,节点类型为共享
  3. 获取node节点的前驱,如果是head节点就尝试获取下资源,但是这里肯定获取不到,返回肯定是-1,因为state还不是0
  4. 然后找到安全的休息点,休息(park)

     这些代码在AQS里面,不重点讲解,如果想了解、可以看看关于AQS的讲解


CountDown方法源码分析

public void countDown() {
        sync.releaseShared(1);
    }

        调用一下releaseShared方法,看看这个方法有什么

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }    

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        首先尝试释放一下,将state数量减少,state不等于0肯定是返回false,直到返回true执行doReleaseShared方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//唤醒后继
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}

        唤醒后继节点执行unparkSuccessor方法,但是唤醒了h出不去啊,这是个死循环。这里先记住一下,h与head相同则break,后面讲解

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
       
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

         唤醒节点后,节点又回到执行park了的地方,也就是doAcquireSharedInterruptibly中的parkAndCheckInterrupt方法里跳出来,回到doAcquireSharedInterruptibly方法,然后再一次的遍历,尝试去获取资源,这回因为state是0,tryAcquireShared方法肯定得到1,大于0则进入setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
      
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

        先把head设置为自己,也就是刚刚被唤醒的节点h,这样那个doReleaseShared方法死循环就能跳出,现在我们根据资源的剩余量再一次调用doReleaseShared方法,唤醒后面的Node节点。重复上面的过程,直到全部节点被唤醒



总结

  1. 首先将调用了await的线程做成node节点放入sync队列
  2. 调用countdown方法直到把state减少到0开始唤醒线程
  3. 调用doReleaseShared方法唤醒head后的一个Node节点、这里有个死循环
  4. 被唤醒的节点再一次的尝试获取资源,tryAcquireShared方法当state为0返回1表示成功获取到了资源
  5. 资源大于等于0,开始把自己设置为head,这个时候退出第3步方法中的死循环,当资源还有剩余继续调用doReleaseShared方法,重复之前步骤3-5


全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务