【Java多线程】并发工具之CyclicBarrier

转载自 https://www.cnblogs.com/jinggod/p/8494193.html


前言

JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
CountDownLatch、CyclicBarrier、Semphore、Phaser 这四个工具类提供一种并发流程的控制手段;而Exchanger工具类则提供了在线程之间交换数据的一种手段。

简介

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

构造方法摘要

方法名称 说明
CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

方法摘要

方法名称 说明
public int await() throws
InterruptedException, BrokenBarrierException
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
返回:到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程.
public int await(long timeout,TimeUnit unit) throws
InterruptedException,BrokenBarrierException,
ITimeoutException
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
public void reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在由于其他原因造成损坏之后,实行重置可能会变得很复杂;
public boolean isBroken() 查询此屏障是否处于损坏状态。
public int getNumberWaiting() 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
public int getParties() 返回要求启动此 barrier 的参与者数目。

注意:

  • 对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
  • 内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。

@ Example1 屏障操作的例子
public static void main(String[] args) {
    //设置5个屏障,并且有屏障操作
    CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {
        @Override
        public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"执行了屏障操作");        
        }
    });
    
    for(int i=0;i<5;i++){
       //创建5个线程
        Thread thread = new Thread(new MyRunable(barrier),"thread_"+i);
        thread.start();
    }
}
class MyRunable implements Runnable{

    CyclicBarrier barrier;
    public MyRunable(CyclicBarrier barrier ){
         this.barrier = barrier; 
    }
    
    @Override
    public void run() {
        //一系列操作...
         System.out.println("线程 "+Thread.currentThread().getName()+" 到达了屏障点!");
         try {
             int index = barrier.await();
            if(index== (barrier.getParties()-1)){
                //第一个到达屏障点的线程,执行特殊操作....
                System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是第一个到达屏障点");
            }else if(index == 0){//最后一个到达屏障点的线程
                System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!此线程是最后一个到达屏障点");
            }else{
                System.out.println("所有线程到达屏障点,线程 "+Thread.currentThread().getName()+" 被唤醒!!");
            }
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

线程 thread_1 到达了屏障点!
线程 thread_4 到达了屏障点!
线程 thread_3 到达了屏障点!
线程 thread_0 到达了屏障点!
线程 thread_2 到达了屏障点!
线程thread_3执行了屏障操作
所有线程到达屏障点,线程 thread_3 被唤醒!!此线程是最后一个到达屏障点
所有线程到达屏障点,线程 thread_0 被唤醒!!
所有线程到达屏障点,线程 thread_4 被唤醒!!
所有线程到达屏障点,线程 thread_1 被唤醒!!此线程是第一个到达屏障点
所有线程到达屏障点,线程 thread_2 被唤醒!!

上面的例子,使用了传入屏障操作的Runable参数的构造方法,屏障操作是由最后一个到达屏障点的线程执行的,这是不可以改变的。然而,在实际使用中,可能会出现由第n个到达屏障点的线程执行特殊的操作(或者说 屏障操作),那么就可以使用 CyclicBarrier.await()进行判断,如上面的例子,第一个和最后一个到达屏障点的线程都执行特殊的操作。

顺便说一下,可能会对本例子中前5个输出的顺序 有所疑惑:thread_3 通过awiat()方法返回的索引值,可知 thread_3 是最后一个到达屏障点的,但为什么输出的顺序却是第三个,而不是最后一个;在这就要真正理解CyclicBarrier,CyclicBarrier 本质上是一把锁,多个线程在使用CyclicBarrier 对象时,是需要先获取锁,即需要互斥访问,所以调用await( )方法不一定能够马上获取锁。上面的例子,是先打印输出,再去获取锁,所以输出顺序不是到达屏障点的顺序。


@ Example2 应用场景

下面的例子是:CyclicBarrier用于多线程计算数据,最后合并计算结果的场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

public class BankWaterService implements Runnable {
    
    //创建4个屏障,处理完后执行当前类的run方法
    private CyclicBarrier barrier = new CyclicBarrier(4,this);
    
    //假设只有4个sheet,所以只启动4个线程
    private Executor excutor = Executors.newFixedThreadPool(4);
    
    //保存每个sheet计算出的结果
    private ConcurrentHashMap< String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
    
    private void count(){
        for(int i=0;i<4;i++){
            excutor.execute(new Runnable() {
                
                @Override
                public void run() {
                    //计算过程.....
                    //存储计算结果
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    try {
                        //计算完成,插入屏障
                        barrier.await();
                        //后续操作,将会使用到四个线程的运行结果....
                        System.out.println("线程"+Thread.currentThread().getName()+"运行结束,最终的计算结果:"+sheetBankWaterCount.get("result"));
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    @Override
    public void run() {
        int result = 0;
        for(Entry<String, Integer> item : sheetBankWaterCount.entrySet()){
            result += item.getValue();
        }
        sheetBankWaterCount.put("result", result);
    }
    
    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}

运行结果:

线程pool-1-thread-4运行结束,最终的计算结果:4
线程pool-1-thread-2运行结束,最终的计算结果:4
线程pool-1-thread-1运行结束,最终的计算结果:4
线程pool-1-thread-3运行结束,最终的计算结果:4


CyclicBarrier和CountDownLatch的区别

  • CountDownLatch: 一个线程(或者多个线程), 等待另外N个线程完成某个事情之后才能执行。而这N个线程通过调用CountDownLatch.countDown()方法 来告知“某件事件”完成,即计数减一。而一个线程(或者多个线程)则通过CountDownLatch.awiat( ) 进入等待状态,直到 CountDownLatch的计数为0时,才会全部被唤醒
  • CyclicBarrier : N个线程相互等待,任何一个线程完成某个事情之前,所有的线程都必须等待。
    CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.
    而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
  • CountDownLatch只能使用一次,CyclicBarrier则可以通过reset( )方法重置后,重新使用。所以CyclicBarrier可以用于更复杂的业务场景。例如:计算错误,可以重置计数器,并让线程重新执行一次。


实现

虽然没有像其他工具类使用继承AQS的Sync内部类,但是同步屏障使用了ReentrantLock和Condition作为实现。
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
来看看比较关心的await方法:
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
}

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}
await方法有两个重载,你可以定义等待时间,但是两个重载都是要调用dowait方法的:
    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;//获得成员变量lock         
                lock.lock();
        try {
            final Generation g = generation;//每个屏障都代表一个Generation实例,他只有一个成员变量broken。

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {//线程中断,需要释放掉阻塞的线程,并抛出异常
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;//一个线程进入await,计数值-1
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
                        //自旋,直到跳出屏障、broken、中断或者超时
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//最后要释放重入锁
        }
    }



文献:





全部评论

相关推荐

object3:开始给部分🌸孝子上人生第一课了
点赞 评论 收藏
分享
11-28 17:48
中山大学 C++
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务