Java Phaser 源码分析与应用
Phaser
Phaser
是 JDK1.7 推出的,一个可重用的同步障碍,与CyclicBarrier
,CountDownLatch
功能类似,但是它支持更灵活的用法。
先简单说明这个类的作用。假设有一个大工程,可以分为多个阶段,每一个阶段有多个人参与,并且每一个阶段需要参与的所有人都完成这个阶段的事情,才可以进入下一个阶段,然后所有人又继续做下一个阶段的事,直到所有阶段都完成,当然这途中每个人都可以随时退出,整个工程也可以中途终止。
例如某天陈皮约小美,小雪去他家吃饭。这个事情可以分为三个阶段,第一阶段去超市买食材,第二阶段炒菜,第三阶段吃饭,假设每一个阶段完成后才能继续下一个阶段。
首先定义阶段器类,继承 Phaser 重写 onAdvance 方法来对每一个阶段进行不同的操作。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
接下来定义参与的任务,编写每一个任务在每一个阶段需要干的事情。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表陈皮
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class ChenPiTask implements Runnable {
private Phaser phaser;
public ChenPiTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好猪肉了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好猪肉了...");
// 第二阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 吃饱了...");
// 第三阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
}
}
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小美
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoMeiTask implements Runnable {
private Phaser phaser;
public XiaoMeiTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好白菜了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好白菜了...");
// 第二阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 吃饱了...");
// 第三阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
}
}
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小雪
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoXueTask implements Runnable {
private Phaser phaser;
public XiaoXueTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好鲍鱼了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好鲍鱼了...");
// 第二阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 吃饱了...");
// 第三阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
}
}
最后,编写测试类,进行测试验证。
package com.chenpi;
/**
* @Description
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class ChenPiMain {
public static void main(String[] args) {
// 创建吃饭阶段器,注册3个任务(人)
DiningPhaser diningPhaser = new DiningPhaser();
diningPhaser.bulkRegister(3);
// 三个人同时开始干活
Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
thread1.setName("陈皮");
thread1.start();
Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
thread2.setName("小美");
thread2.start();
Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
thread3.setName("小雪");
thread3.start();
}
}
最后,启动服务,显示结果如下。
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦...
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦...
小美 吃饱了...
陈皮 吃饱了...
小雪 吃饱了...
第三阶段,吃完饭啦...
Phaser 详解
通过以上简单例子已知道 Phaser 的作用了。其实它的作用不止这些。
可以动态调整注册任务的数量(最大注册的任务数量为65535)。任务可以在任何时间注册(register
和 bulkRegister
方法,或者以构造方法形式注册初始任务数量),也可以在任何到达时注销注册的任务(arriveAndDeregister
方法)。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小雪
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoXueTask implements Runnable {
private Phaser phaser;
public XiaoXueTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好鲍鱼了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好鲍鱼了...");
// 第二阶段的事干完了,小雪有事先走了
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().getName() + "有事先走了");
}
}
我们修改小雪这个任务,她干完第二阶段的事情就有事先走了,即注销任务。结果如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
小雪有事先走了
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦!总共参与人数:2
小美 吃饱了...
陈皮 吃饱了...
第三阶段,吃完饭啦!总共参与人数:2
注册和注销只影响内部计数,内部没有记录具体的注册任务,所以不能查询哪个任务是否已注册。但是我们可以编写 Phaser 的子类来实现记录注册的具体任务。
package com.chenpi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
// 记录注册的任务
private List<Runnable> registeredTask = new ArrayList<>();
public int register(Runnable task) {
registeredTask.add(task);
return super.register();
}
public List<Runnable> getRegisteredTask() {
return registeredTask;
}
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
测试类中,每一个任务单独注册并记录。
package com.chenpi;
/**
* @Description
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class ChenPiMain {
public static void main(String[] args) {
// 创建吃饭阶段器,注册3个任务(人)
DiningPhaser diningPhaser = new DiningPhaser();
// 三个人同时开始干活
Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
thread1.setName("陈皮");
diningPhaser.register(thread1);
thread1.start();
Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
thread2.setName("小美");
diningPhaser.register(thread2);
thread2.start();
Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
thread3.setName("小雪");
diningPhaser.register(thread3);
thread3.start();
System.out.println("注册的任务:" + diningPhaser.getRegisteredTask());
}
}
启动服务,打印了在阶段器中注册的任务。
陈皮 买好猪肉了...
注册的任务:[Thread[陈皮,5,main], Thread[小美,5,main], Thread[小雪,5,main]]
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦!总共参与人数:3
陈皮 吃饱了...
小雪 吃饱了...
小美 吃饱了...
第三阶段,吃完饭啦!总共参与人数:3
对于同步性质,像 CyclicBarrier 一样,Phaser 可以重复等待。Phaser 的 arriveAndAwaitAdvance
方法的作用类似于 CyclicBarrier 的 await 方法。
每一个 Phaser 对象都会关联一个阶段数。这个数从0开始,当所有注册的任务都到达每一个阶段的时候,这个数就递增一次。特别地,如果这个数到达 Integer.MAX_VALUE 后就会重新变回到0。
arrive
和 arriveAndDeregister
方法记录到达,这2个方法不会阻塞,它们会返回已到达的阶段数。
arrive 方法表示当前任务已到达某个阶段,但是不会等待其他任务到达此阶段。arriveAndDeregister 方法表示当前任务已到达某个阶段,并且注销任务。
在每一个阶段中,当所有任务都到达的时候,onAdvance
方法会被最后一个触发阶段到达的任务执行,然后进入下一个阶段。onAdvance 方法可以控制一个 Phaser 的终止,如果我们的 Phaser 对象是继承 Phaser 的子类,可以重写 onAdvance 方法,在每一个阶段到达时这个方法就会被调用从而在每一个阶段做我们想做的事情。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName() + " 调用了onAdvance方法");
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
我们在 onAdvance 方法中打印当前线程,结果表明确实是最后一个触发阶段到达的任务执行 onAdvance 方法,如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
小雪 调用了onAdvance方法
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
小美 调用了onAdvance方法
第二阶段,炒菜完成啦!总共参与人数:3
小美 吃饱了...
陈皮 吃饱了...
小雪 吃饱了...
小雪 调用了onAdvance方法
第三阶段,吃完饭啦!总共参与人数:3
phaser 可以随时被终止。当终止时,所有同步方法(例如 arriveAndAwaitAdvance )会立即返回而不用阻塞等待,并且返回一个负数。同样地,被终止后无法再注册任务。isTerminated
方法可以判断是否已经终止。
phaser 可以在 onAdvance 方法中返回 true 来达到终止的效果。例如我们继承 Phaser 编写的子类可以重写此方法,当到达某个阶段后返回 true 来终止阶段器。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName() + " 调用了onAdvance方法");
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
默认的 onAdvance 方法实现是当注册的任务注销到0的时候返回 true,源码如下:
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
phaser 的 forceTermination
方法其实也可以强制终止阶段器。还是以上述例子,小美干完第一阶段的事情后,觉得这样等来等去太费时间了,所以终止这个阶段器。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小美
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoMeiTask implements Runnable {
private Phaser phaser;
public XiaoMeiTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好白菜了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好白菜了...");
// 小美觉得这样等来等去太费时间了,所以终止这个阶段器
phaser.forceTermination();
System.out.println(Thread.currentThread().getName() + " 吃饱了...");
// 第三阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
}
}
再执行程序,结果第一阶段后,后续的任务执行就不再按阶段来了,结果如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
小雪 调用了onAdvance方法
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
小美 炒好白菜了...
小美 吃饱了...
小雪 吃饱了...
陈皮 炒好猪肉了...
陈皮 吃饱了...
Phaser 支持层次结构,可以通过构造函数 Phaser(Phaser parent) 和 Phaser(Phaser parent, int parties) 创建一个树形结构的阶段器。这样可以减轻在一个 Phaser 上注册过多的任务而导致的竞争,从而提升吞吐量,缺点是会增加单个操作的开销。
你会发现 arriveAndAwaitAdvance 方法没有抛出 InterruptedException,即使当前线程被中断,这个方法也不会返回,而是继续等待。所以如果希望在等待时可中断,或者可超时,则可以选择使用以下方法:
// 当当前阶段值等于这个方法的参数phase时,等待;不相等时候立即返回。
awaitAdvanceInterruptibly(int phase)
// 当当前阶段值等于这个方法的参数phase时,等待;不相等时候立即返回。超时会抛出超时异常
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
下面罗列一些其他常用方法:
// 获取当前到达的任务数
int getArrivedParties()
// 获取上级Phaser
Phaser getParent()
// 获取当前属于第几阶段
final int getPhase()
// 获取当前注册的任务数
int getRegisteredParties()
// 获取当前未到达的任务数
int getUnarrivedParties()
// 判断当前阶段器是否被终止
boolean isTerminated()
Doug Lea 大神
Doug Lea 大神真的很喜欢用一个整数的不同位来表示各种状态,事物数量等。然后各种位运算骚操作,不得不佩服。例如 Phaser 类中只使用一个 long 类型的变量就表示了未到达屏障的任务数,注册的任务数,阶段数,以及阶段器的终止状态等。如果我们来表示这些信息,可能就是定义4个变量了吧!
/**
*
* unarrived -- the number of parties yet to hit barrier (bits 0-15)
* parties -- the number of parties to wait (bits 16-31)
* phase -- the generation of the barrier (bits 32-62)
* terminated -- set if barrier is terminated (bit 63 / sign)
*
*/
private volatile long state;
而且你会发现大神在 JUC
并发包下的许多类,定义的变量赋值一般会使用十六进制,或者位移赋值,位运算。无可厚非,位运算速度是极快的,这可能也是他为什么钟爱于使用位操作。
private static final int MAX_PARTIES = 0xffff;
private static final int MAX_PHASE = Integer.MAX_VALUE;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;
// some special values
private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
private static final int EMPTY = 1;
// The following unpacking methods are usually manually inlined
private static int unarrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
private static int partiesOf(long s) {
return (int)s >>> PARTIES_SHIFT;
}
private static int phaseOf(long s) {
return (int)(s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 :
(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
下面这位便是 Doug Lea 大神!!