概述

一个可重用的同步屏蔽,功能类似 CycliBarrierCountDownLatch,相较于它们,基于 Phaser 的实现可以更加灵活。它是 JDK 7 增加的同步辅助类,支持对任务进行动态调整,运行分层结构来达到更高的吞吐量。

设计思想

Phaser 设计思想.png

注册:Registration

Phaser 上注册的 parties 会随着时间的变化而变化。任务可以随时注册(register、bulkRegister),并且在任何抵达点可以随意撤销注册(arriveAndDeregister)。也就是说,我们可以在任意时间点添加任务,在任意抵达点移除任务。

同步机制:Synchronization

CylcicBarrier 一样,Phaser 内部也是可以重复使用。arriveAndAwaitAdvance 的效果等同于 CyclicBarrier.awaitPhaser 的每一代都关联一个 phaser number,初始值为 0。当所有任务者到达后,phaser + 1,到达 Integer.MAX_VALUE 重置为 0。使用 phaser number 可以独立控制到达 Phaser 和等待其它线程的动作。通过下面两种类型的方法:

  1. Arrival:到达机制。arrive 和 arriveAndDeregister 方法记录到达状态。这些方法不会阻塞。但是会返回一个相关的 arrival phaser number,可理解为阶段号。当所有任务都到达后,可以执行一个函数(可选),这个函数通过重写 onAdvance() 方法实现。通常可以用来控制终止状态。重写此方法类似为 CyclicBarrier 提供一个 barrierAction,但它更加灵活。
  2. Waiting:等待机制。awaitAdvance 方法需要一个表示 arrival phase number 的参数,并且在 phaser 前进到与给定 phase 不同的 phase 时返回。和 CyclicBarrier 不同,即使等待线程已经被中断,awaitAdvance 方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变 phaser 的状态时会遭遇异常。如果有必要,在方法 forceTermination 之后可以执行这些异常的相关的 handler 进行恢复操作,Phaser 也可能被 ForkJoinPool 中的任务使用,这样在其他任务阻塞等待一个 phase 时可以保证足够的并行度来执行任务。

终止机制:Termination

可以用 isTerminated 方法检查 phaser 的终止状态。在终止时,所有同步方法立刻返回一个负值。在终止时尝试注册也没有效果。当调用 onAdvance 返回 true 时 Termination 被触发。当 deregistration 操作使已注册的 parties 变为 0 时,onAdvance 的默认实现就会返回 true。也可以重写 onAdvance 方法来定义终止动作。forceTermination 方法也可以释放等待线程并且允许它们终止。

分层结构:Tiering

Phaser 支持分层结构 (树状构造) 来减少竞争。注册了大量 parties 的 Phaser 可能会因为同步竞争消耗很高的成本, 因此可以设置一些子 Phaser 来共享一个通用的 parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。在一个分层结构的 phaser 里,子节点 phaser 的注册和取消注册都通过父节点管理。子节点 phaser 通过构造或方法 register、bulkRegister 进行首次注册时,在其父节点上注册。子节点 phaser 通过调用 arriveAndDeregister 进行最后一次取消注册时,也在其父节点上取消注册。

状态监控:Monitoring

由于同步方法可能只被已注册的 parties 调用,所以 phaser 的当前状态也可能被任何调用者监控。在任何时候,可以通过 getRegisteredParties 获取 parties 数,其中 getArrivedParties 方法返回已经到达当前 phase 的 parties 数。当剩余的 parties(通过方法 getUnarrivedParties 获取) 到达时,phase 进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用。

数据结构

  1. private final AtomicReference<QNode> evenQ;
  2. private final AtomicReference<QNode> oddQ;

Phaser 内部使用一个 Treiber stacks 用于存放需要等待的线程。这里为了避免写操作和读操作出现竞争,这里使用两个 QNode 并发队列,当我们选择使用时,交替使用 evenQoddQ

private AtomicReference<QNode> queueFor(int phase) {
    return ((phase & 1) == 0) ? evenQ : oddQ;
}

选择是根据 phase 是根据奇偶选择对应的队列。

源码解析

核心参数

// long 型,将 64 位分成四部分
private volatile long state;

// 父节点
private final Phaser parent;

// 树的根节点
private final Phaser root;

Phaser state 变量.png
为了节省内存空间,Doug Lea 将 state 分成四部分,这些都是 Phaser 重要的数据。但是,在极端高并发场景下,多个线程同时操作 state 变量肯定会出现资源竞争热点。因此,通过分而治之的思想,将 Phaser 组织成树状结构,这样,每个子节点都可以承担部分竞争压力,然后由 root 节点统一调度执行。
Phaser 树状结构.png

构造函数

public Phaser() {
    this(null, 0);
}

public Phaser(int parties) {
    this(null, parties);
}

public Phaser(Phaser parent) {
    this(parent, 0);
}

public Phaser(Phaser parent, int parties)

核心方法

//注册一个新的party
public int register()

//批量注册
public int bulkRegister(int parties)

//使当前线程到达phaser,不等待其他任务到达。返回arrival phase number
public int arrive() 

//使当前线程到达phaser并撤销注册,返回arrival phase number
public int arriveAndDeregister()

/*
 * 使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。
 * 如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。
 * 如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。
 */
public int arriveAndAwaitAdvance()

//等待给定phase数,返回下一个 arrival phase number
public int awaitAdvance(int phase)

//阻塞等待,直到phase前进到下一代,返回下一代的phase number
public int awaitAdvance(int phase) 

//响应中断版awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException

public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException

//使当前phaser进入终止状态,已注册的parties不受影响,如果是分层结构,则终止所有phaser
public void forceTermination()

注册:register

注册就是新增一个 party

public int register() {
    return doRegister(1);
}

// java.util.concurrent.Phaser#doRegister
private int doRegister(int registrations) {
    // #1 将入参「registrations」格式化,因为内部state是一个 long 型变量,可以看成 4 段
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;

    // #2 如果当前 Phaser 对象有父类,则获取,否则为 null
    final Phaser parent = this.parent;
    int phase;
    for (;;) {
        // #3 如果父类为 null,则返回当前的 state,否则需要和父类协调 state 值。
        // 协调操作是为了解决子节点滞后问题。通常要根节点已经向前推进(advance),但是
        // 子 Phaser 还没有向前推进的情况。在这种情况下,子类必须通过设置 unarrived to parties
        // 来完成推进。
        long s = (parent == null) ? state : reconcileState();
        int counts = (int)s;

        // #4 获取已注册的 parties 数量
        int parties = counts >>> PARTIES_SHIFT;

        // #5 获取还未到达的 parties 数量
        int unarrived = counts & UNARRIVED_MASK;
        if (registrations > MAX_PARTIES - parties)
            throw new IllegalStateException(badRegister(s));

        // #6 获取当前阶段(版本)
        phase = (int)(s >>> PHASE_SHIFT);

        if (phase < 0)
            break;

        // #7 非首次注册
        if (counts != EMPTY) {                  // not 1st registration
            // #
            if (parent == null || reconcileState() == s) {
                if (unarrived == 0)             // wait out advance
                    root.internalAwaitAdvance(phase, null);
                // #8 直接更新当前 phaser 对象的 parties
                else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust))
                    break;
            }
        }

        // #8 首次 root 节点注册
        else if (parent == null) {              // 1st root registration
            // #9 更新更新 phaser
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                break;
        }
        else {
            // #9 首次子类phaser注册
            synchronized (this) {               // 1st sub registration
                if (state == s) {               // recheck under lock
                    // #10 交给父类完成注册。
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        break;
                    // finish registration whenever parent registration
                    // succeeded, even when racing with termination,
                    // since these are part of the same "transaction".

                    // #11 更新当前的 phase
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    break;
                }
            }
        }
    }
    return phase;
}

大致流程如下:

  1. 如果当前操作不是首次注册,那么直接在当前 phaser 上更新注册 parties 数量。
  2. 如果是首次注册,并且当前 phaser 没有父节点,说明是 root 节点注册,直接更新 phaser。
  3. 如果当前操作是首次注册,并且当前 phaser 有父节点,则注册操作交由父节点完成,并且更新当前 phaser 的 phaser。
  4. 子 phaser 的 phase 在没有被真正使用之前,允许滞后它的父节点。非首次注册时,如果 phaser 有父节点,则需要调用 reconclieState() 方法解决父节点的 phase 延迟传递问题。

当父节点的 phase 已经推进到下一阶段(phase),但是子节点还没有跟上,此时,子节点必须通过更新未到达的 parties,完成它们的 advance 操作。

到达栅栏,成员完成阶段性操作:arrive

arrive 方法标记当前线程已经到达栅栏,并且这个方法不会被阻塞。
arriveAwaitAdvance() 方法标记当前线程已经到达栅栏,并且这个方法不会被阻塞。但它执行 deregister 操作,将 parties - 1。
arriveAndAwaitAdvance() 方法等待其它线程都到了栅栏再一起进入下一阶段。
awaitAdvance(int phase) 当前线程会进行阻塞,直到 Phaser 开启指定的阶段才会被唤醒。
onAdvance(int, resiteredPareies):它会在一个 phase 结束的时候被调用。