【详解】JUC之Phaser(移相器)

简介

java7中引入了一种新的可重复使用的同步屏障,称为移相器Phaser。Phaser拥有与CyclicBarrierCountDownLatch类似的功劳.

但是这个类提供了更加灵活的应用。CountDownLatch和CyclicBarrier都是只适用于固定数量的参与者。移相器适用于可变数目的屏障,在这个意义上,可以在任何时间注册新的参与者。并且在抵达屏障是可以注销已经注册的参与者。因此,注册到同步移相器的参与者的数目可能会随着时间的推移而变化。

如CyclicBarrier一样,移相器可以重复使用,这意味着当前参与者到达移相器后,可以再一次注册自己并等待另一次到达.

移相器的另一个重要特征是:移相器可能是分层的,这允许你以树形结构来安排移相器以减少竞争

简单的使用

public class PhaserTest {

    private final static Random RANDOM = new Random();

    public static void main(String[] args) {
        final Phaser phaser = new Phaser();
        IntStream.rangeClosed(1,5).boxed().map(i->phaser).forEach(Task::new);

        phaser.register();

        phaser.arriveAndAwaitAdvance();//相当于CountDown
        System.out.println("All of work are finished.");
    }

    static class Task extends Thread{
        private final Phaser phaser;

        Task(Phaser phaser) {
            this.phaser = phaser;
            phaser.register();//把自己加入计数器中
            start();
        }

        @Override
        public void run() {
            System.out.println("The worker[ "+getName()+ " ]" +" is working.");
            try {
                TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            phaser.arriveAndAwaitAdvance();//自己完成,等待其他线程完成,相当于CyclicBarrier
        }
    }
}

结果:

The worker[ Thread-1 ] is working.
The worker[ Thread-2 ] is working.
The worker[ Thread-0 ] is working.
The worker[ Thread-4 ] is working.
The worker[ Thread-3 ] is working.
All of work are finished.

重复使用的例子

public class PhaserTest {

    private final static Random RANDOM = new Random();


    public static void main(String[] args) {
        final Phaser phaser = new Phaser(5);

        for (int i = 0; i < 6; i++) {
            new Athletes(i,phaser).start();
        }
    }

    static class Athletes extends Thread {
        private final int no;
        private final Phaser phaser;


        Athletes(int no, Phaser phaser) {
            this.no = no;
            this.phaser = phaser;

        }

        @Override
        public void run() {
            try {
                System.out.println(no + " start running.");
                TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
                System.out.println(no + " end running.");
                phaser.arriveAndAwaitAdvance();

                System.out.println(no + " start bicycle.");
                TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
                System.out.println(no + " end bicycle.");
                phaser.arriveAndAwaitAdvance();


                System.out.println(no + " start long jump.");
                TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
                System.out.println(no + " end long jump.");
                phaser.arriveAndAwaitAdvance();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

结果

0 start running.
1 start running.
4 start running.
2 start running.
3 start running.
5 start running.
5 end running.
4 end running.
2 end running.
1 end running.
0 end running.
4 start bicycle.
......

动态减少

static class InjuredAthletes extends Thread {
    private final int no;
    private final Phaser phaser;


    InjuredAthletes(int no, Phaser phaser) {
        this.no = no;
        this.phaser = phaser;

    }

    @Override
    public void run() {
        try {
            sport(no, phaser, " start running.", " end running.");
            sport(no, phaser, " start bicycle.", " end bicycle.");
            System.out.println("I am injured.");
            phaser.arriveAndDeregister();//动态减少
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

static class Athletes extends Thread {
    private final int no;
    private final Phaser phaser;


    Athletes(int no, Phaser phaser) {
        this.no = no;
        this.phaser = phaser;

    }

    @Override
    public void run() {
        try {
            sport(no, phaser, " start running.", " end running.");
            sport(no, phaser, " start bicycle.", " end bicycle.");
            sport(no, phaser, " start long jump.", " end long jump.");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


}

private static void sport(int no, Phaser phaser, String s, String s2) throws InterruptedException {
    System.out.println(no + s);
    TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
    System.out.println(no + s2);
    phaser.arriveAndAwaitAdvance();
}

API

重要API

注册

public int register()
public int bulkRegister(int parties)

register

  • 是注册一个线程数,比较常用

bulkRegister

  • 可以批量注册

到达

public int arrive()
public int arriveAndDeregister()
public int arriveAndAwaitAdvance()

arrive

  • 这个到达后,不会阻塞,相当于countdown机制

arriveAndAwaitAdvance

  • 到达后会阻塞,相当于CyclicBarrier机制

arriveAndDeregister

  • 当线程出现异常,不能正常到达时,可以调用该方法,动态减少注册数

举例

public class PhaserTest {

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        final Phaser phaser = new Phaser(5);

        for (int i = 0; i < 4; i++) {
            new ArriveTask(i,phaser).start();
        }
        //等待全部任务进行完成
        phaser.arriveAndAwaitAdvance();
        System.out.println("The phase 1 work finish done.");

    }

    private static class ArriveTask extends Thread{
        private final Phaser phaser;

        private ArriveTask(int no,Phaser phaser) {
            super(String.valueOf(no));

            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println(getName() +  " start working. ");
            threadSleep();
            System.out.println(getName() + " The phase one is running.");
            phaser.arrive();

            threadSleep();
            System.out.println(getName() +  " keep to other thing. ");

        }
    }

    private static void threadSleep()  {
        try {
            TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

不再等待机制

protected boolean onAdvance(int phase, int registeredParties)

举例

public class PhaserTest {
    public static void main(String[] args) throws InterruptedException {
        final Phaser phaser = new Phaser(2){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {

                return true;
            }
        };

        new OnAdvanceTask("Alex",phaser).start();
        new OnAdvanceTask("Jack",phaser).start();

        TimeUnit.SECONDS.sleep(3);
        System.out.println(phaser.getArrivedParties());
        System.out.println(phaser.getUnarrivedParties());
    }

    static class OnAdvanceTask extends Thread{
        private final Phaser phaser;
        OnAdvanceTask(String name, Phaser phaser) {
            super(name);
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {

                sout();
                TimeUnit.SECONDS.sleep(1);
                if (getName().equals("Alex")){
                    System.out.println(phaser.isTerminated());
                    sout();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void sout() {
            System.out.println(getName() + " I am start and the phase " + phaser.getPhase());
            phaser.arriveAndAwaitAdvance();
            System.out.println(getName() + " I am end !");
        }
    }
}

结果

Jack I am start and the phase 0
Alex I am start and the phase 0
Alex I am end !
Jack I am end !
true
Alex I am start and the phase -2147483647
Alex I am end !
2
0
  • 默认情况,当别人调用arriveAndDeregister时,使注册的数量减到0时,直接不会陷入阻塞,返回true,相当于销毁掉

监控子线程任务

public int awaitAdvance(int phase)
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
  • 相当于起到监控的作用
  • 如果子线程还没有执行完成,主线程就会阻塞
  • 相较而言,可以不用增加注册量

举例

public static void main(String[] args) throws InterruptedException {
    final Phaser phaser = new Phaser(4);

    for (int i = 0; i < 4; i++) {
        new AwaitAdvance(i,phaser).start();
    }
    //等待全部任务进行完成
    phaser.awaitAdvance(phaser.getPhase());
    System.out.println("The phase 1 work finish done.");
}

强制关闭

public void forceTermination()
public boolean isTerminated()
  • 强制关闭phaser,但是如果线程陷入阻塞,不会唤醒

调试API

获取阶段数

public final int getPhase()
  • 返回当前相位数。 最大相位数为Integer.MAX_VALUE
  • 每增加一轮就会加一

举例

public class PhaserTest {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(1);
        System.out.println(phaser.getPhase());

        phaser.arriveAndAwaitAdvance();
        System.out.println(phaser.getPhase());

        phaser.arriveAndAwaitAdvance();
        System.out.println(phaser.getPhase());

        phaser.arriveAndAwaitAdvance();
        System.out.println(phaser.getPhase());

    }
}

结果

0
1
2
3

获取注册的数

public int getRegisteredParties()
  • 获得注册的线程数,相当于Countdown初始的的计数器
  • 可以动态更改

获得到达和未到达的数目

public int getArrivedParties()
public int getUnarrivedParties()

getArrivedParties

  • 获得已经到达的线程数,和没有到达的线程数

getUnarrivedParties

  • 获得没有到达的线程数,和没有到达的线程数

总结

  • Phaser 可以通过register() 方法和arriveAndDeregister() 方法,动态的增加或者减少注册量
  • 使用arriveAndAwaitAdvance,相当于CyclicBarrier机制
  • 使用arrive,相当于countdown机制
  • 可以利用awaitAdvance,让主线程等待子线程全部完成任务
全部评论

相关推荐

点赞 评论 收藏
分享
最近又搬回宿舍了,在工位坐不住,写一写秋招起伏不断的心态变化,也算对自己心态的一些思考表演式学习从开始为实习准备的时候就特别焦虑,楼主一开始选择的是cpp后端,但是24届这个方向已经炸了,同时自己又因为本科非92且非科班,所以感到机会更加迷茫。在某天晚上用java写出hello&nbsp;world并失眠一整晚后选择老本行干嵌入式。理想是美好的,现实情况是每天忙但又没有实质性进展,总是在配环境,调工具,顺带还要推科研。而这时候才发现自己一直在表演式学习,徘徊在设想如何展开工作的循环里,导致没有实质性进展。现在看来当时如果把精力专注在动手写而不是两只手端着看教程,基本功或许不会那么差。实习的焦虑5月,楼主...
耶比:哲学上有一个问题,玛丽的房间:玛丽知道眼睛识别色彩的原理知道各种颜色,但是她生活在黑白的房间里,直到有一天玛丽的房门打开了她亲眼看到了颜色,才知道什么是色彩。我现在最大可能的减少对非工作事情的思考,如果有一件事困扰了我, 能解决的我就直接做(去哪里或者和谁吵架等等……),解决不了的我就不想了,每一天都是最年轻的一天,珍惜今天吧
投递比亚迪等公司10个岗位 > 秋招被确诊为…… 牛客创作赏金赛
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务