概述
一个可重用的同步屏蔽,功能类似 CycliBarrier
和 CountDownLatch
,相较于它们,基于 Phaser 的实现可以更加灵活。它是 JDK 7 增加的同步辅助类,支持对任务进行动态调整,运行分层结构来达到更高的吞吐量。
设计思想
注册:Registration
在 Phaser
上注册的 parties
会随着时间的变化而变化。任务可以随时注册(register、bulkRegister),并且在任何抵达点可以随意撤销注册(arriveAndDeregister)。也就是说,我们可以在任意时间点添加任务,在任意抵达点移除任务。
同步机制:Synchronization
和 CylcicBarrier
一样,Phaser 内部也是可以重复使用。arriveAndAwaitAdvance
的效果等同于 CyclicBarrier.await
。Phaser
的每一代都关联一个 phaser number
,初始值为 0。当所有任务者到达后,phaser + 1
,到达 Integer.MAX_VALUE
重置为 0。使用 phaser number 可以独立控制到达 Phaser
和等待其它线程的动作。通过下面两种类型的方法:
- Arrival:到达机制。arrive 和 arriveAndDeregister 方法记录到达状态。这些方法不会阻塞。但是会返回一个相关的 arrival phaser number,可理解为阶段号。当所有任务都到达后,可以执行一个函数(可选),这个函数通过重写
onAdvance()
方法实现。通常可以用来控制终止状态。重写此方法类似为CyclicBarrier
提供一个 barrierAction,但它更加灵活。 - Waiting:等待机制。awaitAdvance 方法需要一个表示 arrival phase number 的参数,并且在 phaser 前进到与给定 phase 不同的 phase 时返回。和 CyclicBarrier 不同,即使等待线程已经被中断,awaitAdvance 方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变 phaser 的状态时会遭遇异常。如果有必要,在方法 forceTermination 之后可以执行这些异常的相关的 handler 进行恢复操作,Phaser 也可能被 ForkJoinPool 中的任务使用,这样在其他任务阻塞等待一个 phase 时可以保证足够的并行度来执行任务。
终止机制:Termination
可以用 isTerminated 方法检查 phaser 的终止状态。在终止时,所有同步方法立刻返回一个负值。在终止时尝试注册也没有效果。当调用 onAdvance 返回 true 时 Termination 被触发。当 deregistration 操作使已注册的 parties 变为 0 时,onAdvance 的默认实现就会返回 true。也可以重写 onAdvance 方法来定义终止动作。forceTermination 方法也可以释放等待线程并且允许它们终止。
分层结构:Tiering
Phaser 支持分层结构 (树状构造) 来减少竞争。注册了大量 parties 的 Phaser 可能会因为同步竞争消耗很高的成本, 因此可以设置一些子 Phaser 来共享一个通用的 parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。在一个分层结构的 phaser 里,子节点 phaser 的注册和取消注册都通过父节点管理。子节点 phaser 通过构造或方法 register、bulkRegister 进行首次注册时,在其父节点上注册。子节点 phaser 通过调用 arriveAndDeregister 进行最后一次取消注册时,也在其父节点上取消注册。
状态监控:Monitoring
由于同步方法可能只被已注册的 parties 调用,所以 phaser 的当前状态也可能被任何调用者监控。在任何时候,可以通过 getRegisteredParties 获取 parties 数,其中 getArrivedParties 方法返回已经到达当前 phase 的 parties 数。当剩余的 parties(通过方法 getUnarrivedParties 获取) 到达时,phase 进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用。
数据结构
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
Phaser
内部使用一个 Treiber stacks 用于存放需要等待的线程。这里为了避免写操作和读操作出现竞争,这里使用两个 QNode
并发队列,当我们选择使用时,交替使用 evenQ
和 oddQ
。
private AtomicReference<QNode> queueFor(int phase) {
return ((phase & 1) == 0) ? evenQ : oddQ;
}
源码解析
核心参数
// long 型,将 64 位分成四部分
private volatile long state;
// 父节点
private final Phaser parent;
// 树的根节点
private final Phaser root;
为了节省内存空间,Doug Lea 将 state 分成四部分,这些都是 Phaser
重要的数据。但是,在极端高并发场景下,多个线程同时操作 state
变量肯定会出现资源竞争热点。因此,通过分而治之的思想,将 Phaser
组织成树状结构,这样,每个子节点都可以承担部分竞争压力,然后由 root 节点统一调度执行。
构造函数
public Phaser() {
this(null, 0);
}
public Phaser(int parties) {
this(null, parties);
}
public Phaser(Phaser parent) {
this(parent, 0);
}
public Phaser(Phaser parent, int parties)
核心方法
//注册一个新的party
public int register()
//批量注册
public int bulkRegister(int parties)
//使当前线程到达phaser,不等待其他任务到达。返回arrival phase number
public int arrive()
//使当前线程到达phaser并撤销注册,返回arrival phase number
public int arriveAndDeregister()
/*
* 使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。
* 如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。
* 如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。
*/
public int arriveAndAwaitAdvance()
//等待给定phase数,返回下一个 arrival phase number
public int awaitAdvance(int phase)
//阻塞等待,直到phase前进到下一代,返回下一代的phase number
public int awaitAdvance(int phase)
//响应中断版awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException
//使当前phaser进入终止状态,已注册的parties不受影响,如果是分层结构,则终止所有phaser
public void forceTermination()
注册:register
注册就是新增一个 party
。
public int register() {
return doRegister(1);
}
// java.util.concurrent.Phaser#doRegister
private int doRegister(int registrations) {
// #1 将入参「registrations」格式化,因为内部state是一个 long 型变量,可以看成 4 段
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
// #2 如果当前 Phaser 对象有父类,则获取,否则为 null
final Phaser parent = this.parent;
int phase;
for (;;) {
// #3 如果父类为 null,则返回当前的 state,否则需要和父类协调 state 值。
// 协调操作是为了解决子节点滞后问题。通常要根节点已经向前推进(advance),但是
// 子 Phaser 还没有向前推进的情况。在这种情况下,子类必须通过设置 unarrived to parties
// 来完成推进。
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
// #4 获取已注册的 parties 数量
int parties = counts >>> PARTIES_SHIFT;
// #5 获取还未到达的 parties 数量
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
// #6 获取当前阶段(版本)
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
// #7 非首次注册
if (counts != EMPTY) { // not 1st registration
// #
if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
// #8 直接更新当前 phaser 对象的 parties
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust))
break;
}
}
// #8 首次 root 节点注册
else if (parent == null) { // 1st root registration
// #9 更新更新 phaser
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
// #9 首次子类phaser注册
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
// #10 交给父类完成注册。
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
// #11 更新当前的 phase
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}
大致流程如下:
- 如果当前操作不是首次注册,那么直接在当前 phaser 上更新注册 parties 数量。
- 如果是首次注册,并且当前 phaser 没有父节点,说明是 root 节点注册,直接更新 phaser。
- 如果当前操作是首次注册,并且当前 phaser 有父节点,则注册操作交由父节点完成,并且更新当前 phaser 的 phaser。
- 子 phaser 的 phase 在没有被真正使用之前,允许滞后它的父节点。非首次注册时,如果 phaser 有父节点,则需要调用
reconclieState()
方法解决父节点的 phase 延迟传递问题。
当父节点的 phase 已经推进到下一阶段(phase),但是子节点还没有跟上,此时,子节点必须通过更新未到达的 parties,完成它们的 advance 操作。
到达栅栏,成员完成阶段性操作:arrive
arrive
方法标记当前线程已经到达栅栏,并且这个方法不会被阻塞。arriveAwaitAdvance()
方法标记当前线程已经到达栅栏,并且这个方法不会被阻塞。但它执行 deregister
操作,将 parties - 1。arriveAndAwaitAdvance()
方法等待其它线程都到了栅栏再一起进入下一阶段。awaitAdvance(int phase)
当前线程会进行阻塞,直到 Phaser 开启指定的阶段才会被唤醒。onAdvance(int, resiteredPareies)
:它会在一个 phase 结束的时候被调用。