上节介绍的 CountDownLatch 在解决多个线程同步方面相对于调用线程的 join 方法已经有了不少优化,但是 CountDownLatch 的计数器是一次性的,也就是等到计数器值变为 0 后,再调用 CountDownLatch 的 await 和 countdown 方法都会立刻返回,这就起不到线程同步的效果了。所以为了满足计数器可以重置的需要,JDK 开发组提供了 CyclicBarrier 类,并且 CyclicBarrier 类的功能并不限于 CountDownLatch 的功能。
从字面意思理解,CyclicBarrier 是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置 CyclicBarrier 的状态后它可以被重用。之所以叫作屏障是因为线程调用 await 方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了 await 方法后,线程们就会冲破屏障,继续向下运行。
案例介绍
在介绍原理前先介绍几个实例以便加深理解。在下面的例子中,我们要实现的是,使用两个线程去执行一个被分解的任务 A,当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。
public class CycleBarrierTest1 {
// 创建一个 CyclicBarrier 实例,添加一个所有子线程全部到达屏障后执行的任务
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable()
{
public void run() {
System.out.println(Thread.currentThread() + 「 task1 merge result」);
}
});
public static void main(String[] args) throws InterruptedException {
//创建一个线程个数固定为 2 的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程 A 添加到线程池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + 「 task1-1」);
System.out.println(Thread.currentThread() + 「 enter in
barrier」);
cyclicBarrier.await();
System.out.println(Thread.currentThread() + 「 enter out
barrier」);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 将线程 B 添加到线程池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + 「 task1-2」);
System.out.println(Thread.currentThread() + 「 enter in
barrier」);
cyclicBarrier.await();
System.out.println(Thread.currentThread() + 「 enter out
barrier」);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 关闭线程池
executorService.shutdown();
}
}
输出结果如下。
如上代码创建了一个 CyclicBarrier 对象,其第一个参数为计数器初始值,第二个参数 Runable 是当计数器值为 0 时需要执行的任务。在 main 函数里面首先创建了一个大小为 2 的线程池,然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用 await 方法。一开始计数器值为 2,当第一个线程调用 await 方法时,计数器值会递减为 1。由于此时计数器值不为 0,所以当前线程就到了屏障点而被阻塞。然后第二个线程调用 await 时,会进入屏障,计数器值也会递减,现在计数器值为 0,这时就会去执行 CyclicBarrier 构造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第二个线程,这时候第一个线程也会退出屏障点继续向下运行。
上面的例子说明了多个线程之间是相互等待的,假如计数器值为N,那么随后调用 await 方法的N-1 个线程都会因为到达屏障点而被阻塞,当第N个线程调用 await 后,计数器值为 0 了,这时候第N个线程才会发出通知唤醒前面的N-1 个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行。对于这个例子来说,使用 CountDownLatch 也可以得到类似的输出结果。下面再举个例子来说明 CyclicBarrier 的可复用性。
假设一个任务由阶段 1、阶段 2 和阶段 3 组成,每个线程要串行地执行阶段 1、阶段 2 和阶段 3,当多个线程执行该任务时,必须要保证所有线程的阶段 1 全部完成后才能进入阶段 2 执行,当所有线程的阶段 2 全部完成后才能进入阶段 3 执行。下面使用 CyclicBarrier 来完成这个需求。
public class CycleBarrierTest2 {
// 创建一个 CyclicBarrier 实例
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程 A 添加到线程池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + 「 step1」);
cyclicBarrier.await();
System.out.println(Thread.currentThread() + 「 step2」);
cyclicBarrier.await();
System.out.println(Thread.currentThread() + 「 step3」);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
// 将线程 B 添加到线程池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + 「 step1」);
cyclicBarrier.await();
System.out.println(Thread.currentThread() + 「 step2」);
cyclicBarrier.await();
System.out.println(Thread.currentThread() + 「 step3」);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//关闭线程池
executorService.shutdown();
}
}
输出结果如下。
在如上代码中,每个子线程在执行完阶段 1 后都调用了 await 方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段 1 后才会开始执行阶段 2。然后在阶段 2 后面调用了 await 方法,这保证了所有线程都完成了阶段 2 后,才能开始阶段 3 的执行。这个功能使用单个 CountDownLatch 是无法完成的。
实现原理探究
为了能够一览 CyclicBarrier 的架构设计,下面先看下 CyclicBarrier 的类图结构,如图 10-2 所示。
图 10-2
由以上类图可知,CyclicBarrier 基于独占锁实现,本质底层还是基于 AQS 的。parties 用来记录线程个数,这里表示多少线程调用 await 后,所有线程才会冲破屏障继续往下运行。而 count 一开始等于 parties,每当有线程调用 await 方法就递减 1,当 count 为 0 时就表示所有线程都到了屏障点。你可能会疑惑,为何维护 parties 和 count 两个变量,只使用 count 不就可以了?别忘了 CycleBarier 是可以被复用的,使用两个变量的原因是,parties 始终用来记录总的线程个数,当 count 计数器值变为 0 后,会将 parties 的值赋给 count,从而进行复用。这两个变量是在构造 CyclicBarrier 对象时传递的,如下所示。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
还有一个变量 barrierCommand 也通过构造函数传递,这是一个任务,这个任务的执行时机是当所有线程都到达屏障点后。使用 lock 首先保证了更新计数器 count 的原子性。另外使用 lock 的条件变量 trip 支持线程间使用 await 和 signal 操作进行同步。
最后,在变量 generation 内部有一个变量 broken,其用来记录当前屏障是否被打破。注意,这里的 broken 并没有被声明为 volatile 的,因为是在锁内使用变量,所以不需要声明。
private static class Generation {
boolean broken = false;
}
下面来看 CyclicBarrier 中的几个重要的方法。
1.int await() 方法
当前线程调用 CyclicBarrier 的该方法时会被阻塞,直到满足下面条件之一才会返回:parties 个线程都调用了 await()方法,也就是线程都到了屏障点;其他线程调用了当前线程的 interrupt()方法中断了当前线程,则当前线程会抛出 InterruptedException 异常而返回;与当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时,会抛出 BrokenBarrierException 异常,然后返回。
由如下代码可知,在内部调用了 dowait 方法。第一个参数为 false 则说明不设置超时时间,这时候第二个参数没有意义。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
2.boolean await(long timeout, TimeUnit unit)方法
当前线程调用 CyclicBarrier 的该方法时会被阻塞,直到满足下面条件之一才会返回:parties 个线程都调用了 await()方法,也就是线程都到了屏障点,这时候返回 true;设置的超时时间到了后返回 false;其他线程调用当前线程的 interrupt()方法中断了当前线程,则当前线程会抛出 InterruptedException 异常然后返回;与当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时,会抛出 BrokenBarrierException 异常,然后返回。
由如下代码可知,在内部调用了 dowait 方法。第一个参数为 true 则说明设置了超时时间,这时候第二个参数是超时时间。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
3.int dowait(boolean timed, long nanos)方法
该方法实现了 CyclicBarrier 的核心功能,其代码如下。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
...
//(1)如果 index==0 则说明所有线程都到了屏障点,此时执行初始化时传递的任务
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
//(2)执行任务
if (command ! = null)
command.run();
ranAction = true;
//(3)激活其他因调用 await 方法而被阻塞的线程,并重置 CyclicBarrier
nextGeneration();
//返回
return 0;
} finally {
if (! ranAction)
breakBarrier();
}
}
// (4)如果 index! =0
for (; ; ) {
try {
//(5)没有设置超时时间,
if (! timed)
trip.await();
//(6)设置了超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
...
}
...
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
//(7)唤醒条件队列里面阻塞线程
trip.signalAll();
//(8)重置 CyclicBarrier
count = parties;
generation = new Generation();
}
以上是 dowait 方法的主干代码。当一个线程调用了 dowait 方法后,首先会获取独占锁 lock,如果创建 CycleBarrier 时传递的参数为 10,那么后面 9 个调用线程会被阻塞。然后当前获取到锁的线程会对计数器 count 进行递减操作,递减后 count=index=9,因为 index!=0 所以当前线程会执行代码(4)。如果当前线程调用的是无参数的 await()方法,则这里 timed=false,所以当前线程会被放入条件变量 trip 的条件阻塞队列,当前线程会被挂起并释放获取的 lock 锁。如果调用的是有参数的 await 方法则 timed=true,然后当前线程也会被放入条件变量的条件队列并释放锁资源,不同的是当前线程会在指定时间超时后自动被激活。
当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的 9 个线程中有一个会竞争到 lock 锁,然后执行与第一个线程同样的操作,直到最后一个线程获取到 lock 锁,此时已经有 9 个线程被放入了条件变量 trip 的条件队列里面。最后 count=index 等于 0,所以执行代码(2),如果创建 CyclicBarrier 时传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他 9 个线程,并重置 CyclicBarrier,然后这 10 个线程就可以继续向下运行了。
小结
本节首先通过案例说明了 CycleBarrier 与 CountDownLatch 的不同在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。然后分析了 CycleBarrier,其通过独占锁 ReentrantLock 实现计数器原子性更新,并使用条件变量队列来实现线程同步。