【详解】JUC之Phaser(移相器)
简介
java7中引入了一种新的可重复使用的同步屏障,称为移相器Phaser。Phaser拥有与CyclicBarrier
和CountDownLatch
类似的功劳.
但是这个类提供了更加灵活的应用。CountDownLatch和CyclicBarrier都是只适用于固定数量的参与者。移相器适用于可变数目的屏障,在这个意义上,可以在任何时间注册新的参与者。并且在抵达屏障是可以注销已经注册的参与者。因此,注册到同步移相器的参与者的数目可能会随着时间的推移而变化。
如CyclicBarrier一样,移相器可以重复使用,这意味着当前参与者到达移相器后,可以再一次注册自己并等待另一次到达.
移相器的另一个重要特征是:移相器可能是分层的,这允许你以树形结构来安排移相器以减少竞争
简单的使用
public class PhaserTest {
private final static Random RANDOM = new Random();
public static void main(String[] args) {
final Phaser phaser = new Phaser();
IntStream.rangeClosed(1,5).boxed().map(i->phaser).forEach(Task::new);
phaser.register();
phaser.arriveAndAwaitAdvance();//相当于CountDown
System.out.println("All of work are finished.");
}
static class Task extends Thread{
private final Phaser phaser;
Task(Phaser phaser) {
this.phaser = phaser;
phaser.register();//把自己加入计数器中
start();
}
@Override
public void run() {
System.out.println("The worker[ "+getName()+ " ]" +" is working.");
try {
TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();//自己完成,等待其他线程完成,相当于CyclicBarrier
}
}
}
结果:
The worker[ Thread-1 ] is working.
The worker[ Thread-2 ] is working.
The worker[ Thread-0 ] is working.
The worker[ Thread-4 ] is working.
The worker[ Thread-3 ] is working.
All of work are finished.
重复使用的例子
public class PhaserTest {
private final static Random RANDOM = new Random();
public static void main(String[] args) {
final Phaser phaser = new Phaser(5);
for (int i = 0; i < 6; i++) {
new Athletes(i,phaser).start();
}
}
static class Athletes extends Thread {
private final int no;
private final Phaser phaser;
Athletes(int no, Phaser phaser) {
this.no = no;
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(no + " start running.");
TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
System.out.println(no + " end running.");
phaser.arriveAndAwaitAdvance();
System.out.println(no + " start bicycle.");
TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
System.out.println(no + " end bicycle.");
phaser.arriveAndAwaitAdvance();
System.out.println(no + " start long jump.");
TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
System.out.println(no + " end long jump.");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果:
0 start running.
1 start running.
4 start running.
2 start running.
3 start running.
5 start running.
5 end running.
4 end running.
2 end running.
1 end running.
0 end running.
4 start bicycle.
......
动态减少
static class InjuredAthletes extends Thread {
private final int no;
private final Phaser phaser;
InjuredAthletes(int no, Phaser phaser) {
this.no = no;
this.phaser = phaser;
}
@Override
public void run() {
try {
sport(no, phaser, " start running.", " end running.");
sport(no, phaser, " start bicycle.", " end bicycle.");
System.out.println("I am injured.");
phaser.arriveAndDeregister();//动态减少
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Athletes extends Thread {
private final int no;
private final Phaser phaser;
Athletes(int no, Phaser phaser) {
this.no = no;
this.phaser = phaser;
}
@Override
public void run() {
try {
sport(no, phaser, " start running.", " end running.");
sport(no, phaser, " start bicycle.", " end bicycle.");
sport(no, phaser, " start long jump.", " end long jump.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void sport(int no, Phaser phaser, String s, String s2) throws InterruptedException {
System.out.println(no + s);
TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
System.out.println(no + s2);
phaser.arriveAndAwaitAdvance();
}
API
重要API
注册
public int register()
public int bulkRegister(int parties)
register
- 是注册一个线程数,比较常用
bulkRegister
- 可以批量注册
到达
public int arrive()
public int arriveAndDeregister()
public int arriveAndAwaitAdvance()
arrive
- 这个到达后,不会阻塞,相当于
countdown
机制
arriveAndAwaitAdvance
- 到达后会阻塞,相当于
CyclicBarrier
机制
arriveAndDeregister
- 当线程出现异常,不能正常到达时,可以调用该方法,
动态减少注册数
举例
public class PhaserTest {
private static final Random RANDOM = new Random();
public static void main(String[] args) throws InterruptedException {
final Phaser phaser = new Phaser(5);
for (int i = 0; i < 4; i++) {
new ArriveTask(i,phaser).start();
}
//等待全部任务进行完成
phaser.arriveAndAwaitAdvance();
System.out.println("The phase 1 work finish done.");
}
private static class ArriveTask extends Thread{
private final Phaser phaser;
private ArriveTask(int no,Phaser phaser) {
super(String.valueOf(no));
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(getName() + " start working. ");
threadSleep();
System.out.println(getName() + " The phase one is running.");
phaser.arrive();
threadSleep();
System.out.println(getName() + " keep to other thing. ");
}
}
private static void threadSleep() {
try {
TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
不再等待机制
protected boolean onAdvance(int phase, int registeredParties)
举例
public class PhaserTest {
public static void main(String[] args) throws InterruptedException {
final Phaser phaser = new Phaser(2){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return true;
}
};
new OnAdvanceTask("Alex",phaser).start();
new OnAdvanceTask("Jack",phaser).start();
TimeUnit.SECONDS.sleep(3);
System.out.println(phaser.getArrivedParties());
System.out.println(phaser.getUnarrivedParties());
}
static class OnAdvanceTask extends Thread{
private final Phaser phaser;
OnAdvanceTask(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
sout();
TimeUnit.SECONDS.sleep(1);
if (getName().equals("Alex")){
System.out.println(phaser.isTerminated());
sout();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sout() {
System.out.println(getName() + " I am start and the phase " + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println(getName() + " I am end !");
}
}
}
结果:
Jack I am start and the phase 0
Alex I am start and the phase 0
Alex I am end !
Jack I am end !
true
Alex I am start and the phase -2147483647
Alex I am end !
2
0
- 默认情况,当别人调用arriveAndDeregister时,使注册的数量减到0时,直接不会陷入阻塞,返回true,相当于销毁掉
监控子线程任务
public int awaitAdvance(int phase)
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
- 相当于起到监控的作用
- 如果子线程还没有执行完成,主线程就会阻塞
- 相较而言,可以不用增加注册量
举例
public static void main(String[] args) throws InterruptedException {
final Phaser phaser = new Phaser(4);
for (int i = 0; i < 4; i++) {
new AwaitAdvance(i,phaser).start();
}
//等待全部任务进行完成
phaser.awaitAdvance(phaser.getPhase());
System.out.println("The phase 1 work finish done.");
}
强制关闭
public void forceTermination()
public boolean isTerminated()
- 强制关闭phaser,但是
如果线程陷入阻塞,不会唤醒
调试API
获取阶段数
public final int getPhase()
- 返回当前相位数。 最大相位数为Integer.MAX_VALUE
- 每增加一轮就会加一
举例
public class PhaserTest {
public static void main(String[] args) {
final Phaser phaser = new Phaser(1);
System.out.println(phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println(phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println(phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println(phaser.getPhase());
}
}
结果:
0
1
2
3
获取注册的数
public int getRegisteredParties()
- 获得注册的线程数,相当于Countdown初始的的计数器
- 可以动态更改
获得到达和未到达的数目
public int getArrivedParties()
public int getUnarrivedParties()
getArrivedParties
- 获得已经到达的线程数,和没有到达的线程数
getUnarrivedParties
- 获得没有到达的线程数,和没有到达的线程数
总结
- Phaser 可以通过
register
() 方法和arriveAndDeregister
() 方法,动态的增加或者减少注册量 - 使用
arriveAndAwaitAdvance
,相当于CyclicBarrier
机制 - 使用
arrive
,相当于countdown
机制 - 可以利用
awaitAdvance
,让主线程等待子线程全部完成任务