【详解】JUC之CountDownLatch

一、引入

  • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量或者任务的数量
  • 每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
  • CountDownLatch的方便之处在于,你可以在一个线程中使用,也可以在多个线程上使用,一切只依据状态值,这样便不会受限于任何的场景。

二、分析

在java5提供的并发包下,有一个AbstractQueuedSynchronizer抽象类,也叫AQS,此类根据大部分并发共性作了一些抽象,便于开发者实现如排他锁,共享锁,条件等待等更高级的业务功能。

它通过使用CAS和队列模型,出色的完成了抽象任务

  • 一开始,我们创建了一个CountDownLatch实例

  • 此时,AQS中,状态值state=2,对于 CountDownLatch 来说,state=2表示所有调用await方法的线程都应该阻塞,等到同一个latch被调用两次countDown后才能唤醒沉睡的线程。接着线程3和线程4执行了 await方法,这会的状态图如下:

  • 上面的通知状态是节点的属性,表示该节点出队后,必须唤醒其后续的节点线程。
  • 当线程1和线程2分别执行完latch.countDown方法后,会把state值置为0,
  • 此时,通过CAS成功置为0的那个线程将会同时承担起唤醒队列中第一个节点线程的任务,从上图可以看出,第一个节点即为线程3,当线程3恢复执行之后,其发现状态值为通知状态,所以会唤醒后续节点,即线程4节点,然后线程3继续做自己的事情,到这里,线程3和线程4都已经被唤醒,CountDownLatch功成身退。

三、使用场景一

需求

  • 可能刚从数据库读取了一批数据
  • 利用并发处理这批数据
  • 当所有的数据处理完成后,再去执行后面的操作

解决方案

  • 第一种:可以利用 join 的方法,但是在线程池中,比较麻烦
  • 第二种:利用线程池的awaitTermination,阻塞一段时间
  • 第三种利用CountDownLatch,每当任务完成一个,就计数器减一
public class CountDownLatchExample {

    private static final Random RANDOM = new Random(System.currentTimeMillis());


    private static ExecutorService executor = Executors.newFixedThreadPool(2);//线程池

    private static CountDownLatch latch ;
    public static void main(String[] args) throws InterruptedException {

        int[] data = query();//模拟从数据库查询的一批数据

        latch = new CountDownLatch(data.length);

        //让线程并发的处理数据
        for (int i = 0; i < data.length; i++) {
            executor.execute(new SimpleRunnable(data,i, latch));
        }
        latch.await();
        executor.shutdown();
// executor.awaitTermination(1, TimeUnit.HOURS);//利用线程池的等待机制,会阻塞住

        System.out.println("all of finish done!!");

        //等待全部线程处理完

    }

    static class SimpleRunnable implements Runnable{
        private final int [] data;

        private final int index;

        private final CountDownLatch latch;

        SimpleRunnable(int[] data, int index, CountDownLatch latch) {
            this.data = data;
            this.index = index;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(RANDOM.nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int value = data[index];
            //数据处理逻辑
            if (value%2==0){
                data[index] = value*2;
            }else {
                data[index] = value*10;
            }
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + " is finished.");
        }
    }


    private static int[] query(){
        return new int[]{1,2,3,4,5};
    }
}

结果

pool-1-thread-2 is finished.
pool-1-thread-1 is finished.
pool-1-thread-2 is finished.
pool-1-thread-1 is finished.
pool-1-thread-2 is finished.
all of finish done!!

四、使用场景二

需求

  • 多个线程协同工作
  • 尝试多个线程需要等待其他线程的工作
  • 被唤醒后继续执行其他操作
public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " Do some initial working.");
            try {
                Thread.sleep(1000);
                latch.await();
                System.out.println(Thread.currentThread().getName() + " Do other working.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " Do some initial working.");
            try {
                Thread.sleep(1000);
                latch.await();
                System.out.println(Thread.currentThread().getName() + " Do other working.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();


        new Thread(() -> {
            System.out.println("asyn prepare for some data.");
            try {
                Thread.sleep(2000);
                System.out.println("Data prepare for done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                latch.countDown();
            }
        }).start();


    }

}

五、API使用

构造方法只有一个

  • CountDownLatch(int count) :构造一个以给定计数

实例方法

  • public void await()

    • 当前线程等到锁存器计数到零
    • 可以被打断
  • public boolean await(long timeout,TimeUnit unit)

    • 等待一段时间
    • timeout - 等待的最长时间 ,unit - timeout参数的时间单位
    • 如果指定的等待时间过去,则返回值false
    • 如果计数达到零,则方法返回值为true
  • public void countDown()

    • 减少锁存器的计数,如果计数达到零,释放所有等待的线程
  • public long getCount()

    • 返回当前计数

六、给离散的平行任务增加逻辑层次关系

需求

  • 并发的从很多的数据库读取大量数据
  • 在读取数据的过程中,某个表可能会出现:数据丢失、数据精度丢失、数据大小不匹配
  • 需要进行对数据的各个情况进行检测,这个检测是并发的完成的
  • 所以需要控制如果一个表所有的情况检测完成,再进行后续的操作

解决

  • 利用CountDownLatch的计数器
  • 每当一个检测完成,计数器减一
  • 如果计数为0,执行后面操作
public class CountDownLatchExample {

    private static final Random RANDOM= new Random();

    public static void main(String[] args) throws Exception {
        Event [] events = {new Event(1),new Event(2)};

        ExecutorService service = Executors.newFixedThreadPool(5);

        for (Event event : events) {
            List<Table> tables = capture(event);
            for (Table table : tables) {
                TaskBatch taskBatch = new TaskBatch(2);
                TrustSourceColumns sourceColumns = new TrustSourceColumns(table, taskBatch);
                TrustSourceRecordCount recordCount = new TrustSourceRecordCount(table, taskBatch);
                service.submit(sourceColumns);
                service.submit(recordCount);
            }
        }
    }


    static class Event{
        private int id;

        Event(int id) {
            this.id = id;
        }
    }

    interface Watcher{
        void done(Table table);
    }

    static class TaskBatch implements Watcher{

        private final CountDownLatch latch;

        TaskBatch(int size) {
            this.latch = new CountDownLatch(size);
        }



        @Override
        public void done(Table table) {
            latch.countDown();
            if (latch.getCount() == 0){
                System.out.println("The table " + table.tableName + " finished work , " + table.toString());
            }
        }
    }

    static class Table{
        String tableName;
        long sourceRecordCount;
        long targetCount;
        String columnSchema = " tableName = a | column1Type = varchar";

        String targetColumnSchema  = "";

        public Table(String tableName,long sourceRecordCount) {
            this.tableName = tableName;
            this.sourceRecordCount = sourceRecordCount;

        }

        @Override
        public String toString() {
            return "Table{" +
                    "tableName='" + tableName + '\'' +
                    ", sourceRecordCount=" + sourceRecordCount +
                    ", targetCount=" + targetCount +
                    ", columnSchema='" + columnSchema + '\'' +
                    ", targetColumnSchema='" + targetColumnSchema + '\'' +
                    '}';
        }
    }

    private static List<Table> capture(Event event){
        List<Table> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(new Table("table-"+event.id + "-" +i,i*1000));
        }
        return list;
    }

    static class TrustSourceRecordCount implements Runnable{

        private final Table table;

        private final TaskBatch taskBatch;

        TrustSourceRecordCount(Table table, TaskBatch taskBatch) {
            this.table = table;
            this.taskBatch = taskBatch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(RANDOM.nextInt(10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            table.targetCount = table.sourceRecordCount;

// System.out.println("The table : " + table.tableName + " record count capture done and update.");
            taskBatch.done(table);

        }

    }


    static class TrustSourceColumns implements Runnable{

        private final Table table;

        private final TaskBatch taskBatch;

        TrustSourceColumns(Table table, TaskBatch taskBatch) {
            this.table = table;
            this.taskBatch = taskBatch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(RANDOM.nextInt(10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            table.targetColumnSchema = table.columnSchema;
// System.out.println("The table : " + table.tableName + " target columns capture done and update.");
            taskBatch.done(table);
        }

    }
}
全部评论

相关推荐

不愿透露姓名的神秘牛友
07-02 15:39
希望奇迹发生的布莱克...:真的是 现在卷实习就是没苦硬吃
点赞 评论 收藏
分享
06-27 12:54
已编辑
门头沟学院 Java
累了,讲讲我的大学经历吧,目前在家待业。我是一个二本院校软件工程专业。最开始选专业是觉得计算机感兴趣,所以选择了他。本人学习计算机是从大二暑假结束开始的,也就是大三开始。当时每天学习,我个人认为Java以及是我生活的一部分了,就这样持续学习了一年半,来到了大四上学期末,大概是在12月中旬,我终于找的到了一家上海中厂的实习,但我发现实习生的工作很枯燥,公司分配的活也不多,大多时间也是自己在自学。就这样我秋招末才找到实习。时间来到了3月中旬,公司说我可以转正,但是转正工资只有7000,不过很稳定,不加班,双休,因为要回学校参加答辩了,同时当时也是心高气傲,认为可以找到更好的,所以放弃了转正机会,回学校准备论文。准备论文期间就也没有投递简历。然后时间来到了5月中旬,这时春招基本也结束了,然后我开始投递简历,期间只是约到了几家下场面试。工资也只有6-7k,到现在我不知道该怎么办了。已经没有当初学习的心劲了,好累呀,但是又不知道该干什么去。在家就是打游戏,boss简历投一投。每天日重一次。26秋招都说是针对26届的人,25怎么办。我好绝望。要不要参加考公、考研、央国企这些的。有没有大佬可以帮帮我。为什么感觉别人找工作都是顺其自然的事情,我感觉自己每一步都在艰难追赶。八股文背了又忘背了又忘,我每次都花很长时间去理解他,可是现在感觉八股、项目都忘完了。真的已经没有力气再去学习了。图片是我的简历,有没有大哥可以指正一下,或者说我应该走哪条路,有点不想在找工作了。
码客明:太累了就休息一下兄弟,人生不会完蛋的
如果实习可以转正,你会不...
点赞 评论 收藏
分享
05-26 10:24
门头沟学院 Java
qq乃乃好喝到咩噗茶:其实是对的,线上面试容易被人当野怪刷了
找工作时遇到的神仙HR
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务